-
Notifications
You must be signed in to change notification settings - Fork 0
/
smart_sleep.py
executable file
·1122 lines (1001 loc) · 45.3 KB
/
smart_sleep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/sudo python3.8
"""
12-hour-server/smart_sleep: a python script to put the home "server" to sleep when wifi is disconnected at night,
and wake up early the next day
Copyright (C) 2021-22 Roguedbear
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import datetime
from time import sleep
import colorama
import netifaces
import requests
import logging
import subprocess
from colorama import Fore
from logging.handlers import TimedRotatingFileHandler
from typing import Literal, Tuple, Dict, List, Callable
# program constants
CHAT_ID = ""
BOT_TOKEN = ""
CONNECTION_TYPE: Literal["any", "wired", "wireless"] = "any"
NIGHT_PHASE: dict = dict()
MORNING_PHASE: dict = dict()
TIMEOUT = 500
SLEEP_INTERVAL = 0 # 0 means disabled
# This controls whether the computer sleeps for real or not. change it acc to your needs
DEBUG = False
# Logging formatter
FORMATTER = {
"format": "{color}[{asctime}] :--{levelname:-^9s}--: [{funcName}()] {message}",
# "format": "[%(asctime)s] | %(levelname)-6s | [%(funcName)s()]: %(message)s",
"datefmt": "%d/%b/%Y %H:%M:%S",
"colors": {
"CRITICAL": Fore.RED,
"INFO": Fore.WHITE,
"ERROR": Fore.RED,
"WARNING": Fore.YELLOW,
"DEBUG": Fore.LIGHTWHITE_EX,
},
}
class ColoredFormatter(logging.Formatter):
def __init__(self, msg, datefmt=None, style=None):
logging.Formatter.__init__(self, msg, datefmt, style)
def format(self, record):
if record.levelname in FORMATTER["colors"]:
record.color = FORMATTER["colors"][str(record.levelname)]
return logging.Formatter.format(self, record)
def config_loader(filename: str = "config.yaml") -> dict:
"""
Loads the config from the `filename` and returns the dictionary from it and sets the program constants
:param filename: str, the filename
:return: dictionary
"""
"""
888 888 .d888
888 888 d88P"
888 888 888
888 .d88b. 8888b. .d88888 .d8888b .d88b. 88888b. 888888
888 d88""88b "88b d88" 888 d88P" d88""88b 888 "88b 888
888 888 888 .d888888 888 888 888 888 888 888 888 888
888 Y88..88P 888 888 Y88b 888 Y88b. Y88..88P 888 888 888
888 "Y88P" "Y888888 "Y88888 "Y8888P "Y88P" 888 888 888
"""
try:
import yaml
with open(filename) as config_file:
config = yaml.safe_load(config_file)
except ModuleNotFoundError:
logger.exception(
"pyyaml module nto found!\nare you sure you have installed requirements.txt"
)
quit(1)
except FileNotFoundError:
logger.exception(
"config.yaml file not made. Please make it according to the specifications"
)
quit(1)
global CONNECTION_TYPE, NIGHT_PHASE, MORNING_PHASE, CHAT_ID, BOT_TOKEN, TIMEOUT, SLEEP_INTERVAL
# try and load each of the important stuff
# Wifi
try:
CONNECTION_TYPE = config["connection type"]
except KeyError:
logger.exception("CONNECTION_TYPE value not provided in config file!")
quit(1)
# logger.info("CONNECTION_TYPE not provided. Moving on...")
else:
logger.info(f'CONNECTION_TYPE: "{CONNECTION_TYPE}" loaded...')
# night phase timing
try:
NIGHT_PHASE = config["night phase"]
assert "start time" in NIGHT_PHASE, "`start time` value missing in config file"
assert "end time" in NIGHT_PHASE, "`end time` value missing in config file"
except KeyError:
logger.exception("`night phase` value not provided in config file!")
quit(1)
except AssertionError:
logger.exception("Values for `night phase` are missing in config file!")
quit(1)
else:
logger.info(f"Night Phase and it's timings loaded...")
NIGHT_PHASE["name"] = "NIGHT PHASE"
# morning phase timing
try:
MORNING_PHASE = config["morning phase"]
assert (
"start time" in MORNING_PHASE
), "`start time` value missing in config file"
assert "end time" in MORNING_PHASE, "`end time` value missing in config file"
except KeyError:
logger.exception("`morning phase` value not provided in config file!")
quit(1)
except AssertionError:
logger.exception("Values for `morning phase` are missing in config file!")
quit(1)
else:
logger.info(f"Morning Phase and it's timings loaded...")
MORNING_PHASE["name"] = "MORNING PHASE"
# try loading Telegram bot token
try:
telegram = config["telegram"]
assert "BOT_TOKEN" in telegram, "BOT_Token key in `telegram` not provided!"
assert "CHAT_ID" in telegram, "CHAT_ID in `telegram` not provided!"
except KeyError:
logger.info("Telegram tokens not found. Moving over...")
quit(1)
except AssertionError as e:
logger.exception(e)
quit(1)
else:
BOT_TOKEN = telegram["BOT_TOKEN"]
CHAT_ID = telegram["CHAT_ID"]
logger.info("Telegram bot_token and chat id loaded...")
try:
TIMEOUT = config["timeout"]
assert (
isinstance(TIMEOUT, int) is True
), f"TIMEOUT not of correct type.\n Expected type int, got {type(TIMEOUT)}"
except KeyError:
logger.debug("timeout key not found. will use default")
except AssertionError as e:
logger.exception(e)
quit(1)
else:
logger.warning(
f"Custom TIMEOUT({TIMEOUT} seconds) has been loaded, but prefer adding `timeout` inside the "
"morning & night phase key in future. Refer config[TEMPLATE].yaml for reference. "
"\n This key is will only change the default timeout. warning will be removed in a future "
"release"
)
# if morning or night timeouts are detected
for phase_name in ["morning phase", "night phase"]:
try:
__phase_timeout = config[phase_name]["timeout"]
assert isinstance(
__phase_timeout, int
), f"expected integer for {phase_name} got: {type(__phase_timeout)} \t {__phase_timeout}"
except KeyError:
logger.debug(f"{phase_name} timeout not found, using default")
config[phase_name]["timeout"] = TIMEOUT
except AssertionError as e:
logger.exception(e)
quit(1)
finally:
logger.info(
"%s timeout loaded... %d seconds",
phase_name,
config[phase_name]["timeout"],
)
# load sleep_interval
try:
SLEEP_INTERVAL = config["sleep_interval"]
assert isinstance(
SLEEP_INTERVAL, (int, float)
), "sleep_interval is not an integer"
except KeyError:
logger.debug("no sleep_interval defined")
except AssertionError as e:
logger.exception(e)
quit(1)
else:
logger.info(f"Loaded sleep interval of: {SLEEP_INTERVAL} seconds")
# load connectivity method
try:
if "connectivity_method" in config:
method = config["connectivity_method"]
assert method in [
"v2",
"v3",
"v2+v3",
], f"`connectivity_method` should be one of 'v2'/'v3'/'v2+v3'. got: {method}"
else:
config["connectivity_method"] = "v2"
except AssertionError as e:
logger.exception(e)
quit(1)
else:
logger.info("Using connectivity method... %s", config["connectivity_method"])
"""
8888888b. 88888888888 d8b
888 Y88b 888 Y8P
888 888 888
888 d88P 8888b. 888d888 .d8888b .d88b. 888 888 88888b.d88b. .d88b.
8888888P" "88b 888P" 88K d8P Y8b 888 888 888 "888 "88b d8P Y8b
888 .d888888 888 "Y8888b. 88888888 888 888 888 888 888 88888888
888 888 888 888 X88 Y8b. 888 888 888 888 888 Y8b.
888 "Y888888 888 88888P' "Y8888 888 888 888 888 888 "Y8888
"""
# Parsing Night Phase + Morning Phase timings
for phase in [NIGHT_PHASE, MORNING_PHASE]:
try:
times = parse_time(phase)
except AssertionError as e:
logger.error(
"Datetime value in config.yml for '%s' not in range\nValueError:%s",
phase["name"].lower(),
e,
)
quit(1)
except TypeError as e:
logger.error(
"Datetime value in config.yml for '%s' not in right format\nTypeError:%s",
phase["name"].lower(),
e,
)
quit(1)
else:
phase["start time"] = times["start time"]
phase["end time"] = times["end time"]
logger.info("Parsing time for %s... %sDone", phase["name"], Fore.GREEN)
logger.info(
"Time range loaded for %s: (%s — %s)",
phase["name"],
str(phase["start time"]),
str(phase["end time"]),
)
return config
def alert_onTelegram(message: str):
"""
This function will send an alert to telegram notifying about the change
If BOT_TOKEN is not provided, this function will not run
:param message: The message
:return: None
"""
if BOT_TOKEN and CHAT_ID:
x = requests.get(
"https://api.telegram.org/bot"
+ BOT_TOKEN
+ "/sendMessage?chat_id="
+ str(CHAT_ID)
+ "&parse_mode=Markdown"
"&text=" + message[:1000]
)
logger.debug(x)
"""
888 d8b
888 Y8P
888
888 .d88b. .d88b. .d88b. 888 88888b. .d88b.
888 d88""88b d88P"88b d88P"88b 888 888 "88b d88P"88b
888 888 888 888 888 888 888 888 888 888 888 888
888 Y88..88P Y88b 888 Y88b 888 888 888 888 Y88b 888
888 "Y88P" "Y88888 "Y88888 888 888 888 "Y88888
888 888 888
Y8b d88P Y8b d88P Y8b d88P
"Y88P" "Y88P" "Y88P"
"""
# Initialise colorama
colorama.init(autoreset=True)
# Starting program logger
logger = logging.getLogger(__name__)
# Color logs on screen
colors = ColoredFormatter(FORMATTER["format"], datefmt=FORMATTER["datefmt"], style="{")
console = logging.StreamHandler()
console.setFormatter(colors)
# File logs uncolored
file_logs_uncolored = TimedRotatingFileHandler(
"logs/logs.log", backupCount=7, when="midnight"
)
uncolored_formatter = logging.Formatter(
FORMATTER["format"].replace("{color}", ""), datefmt=FORMATTER["datefmt"], style="{"
)
file_logs_uncolored.setFormatter(uncolored_formatter)
# File logs colored
file_logs_colored = TimedRotatingFileHandler(
"logs/logs_color.log", backupCount=7, when="midnight"
)
file_logs_colored.setFormatter(colors)
# Adding those handlers
logger.addHandler(console)
logger.addHandler(file_logs_colored)
logger.addHandler(file_logs_uncolored)
logger.setLevel(10)
"""
ooooo ooooo oooo
`888' `888' `888
888 888 .ooooo. 888 oo.ooooo. .ooooo. oooo d8b
888ooooo888 d88' `88b 888 888' `88b d88' `88b `888""8P
888 888 888ooo888 888 888 888 888ooo888 888
888 888 888 .o 888 888 888 888 .o 888
o888o o888o `Y8bod8P' o888o 888bod8P' `Y8bod8P' d888b
888
o888o
oooooooooooo . o8o
`888' `8 .o8 `"'
888 oooo oooo ooo. .oo. .ooooo. .o888oo oooo .ooooo. ooo. .oo. .oooo.o
888oooo8 `888 `888 `888P"Y88b d88' `"Y8 888 `888 d88' `88b `888P"Y88b d88( "8
888 " 888 888 888 888 888 888 888 888 888 888 888 `"Y88b.
888 888 888 888 888 888 .o8 888 . 888 888 888 888 888 o. )88b
o888o `V88V"V8P' o888o o888o `Y8bod8P' "888" o888o `Y8bod8P' o888o o888o 8""888P'
"""
def parse_time(phase_dict: dict) -> Dict[str, datetime.timedelta]:
"""
Returns the new dictionary with parsed datetime from string
:param phase_dict: the phase dictionary containing datetime string
:return:
"""
logger.debug("Data received: %s", phase_dict)
ultimate_time = datetime.timedelta(hours=23, minutes=59, seconds=59)
start_time = phase_dict["start time"]
end_time = phase_dict["end time"]
converted_time = []
for index, time in enumerate([start_time, end_time]):
time_key = "start time" if index == 0 else "end time"
try:
# now we convert those "seconds" to "hours". aka instead of 1600 hrs -> 16min -> 960s to 1600hrs -> 57600
time = datetime.timedelta(seconds=time * 60)
except TypeError as e:
try:
time = datetime.datetime.strptime(time, "%H:%M")
time = datetime.timedelta(hours=time.hour, minutes=time.minute)
except ValueError:
raise TypeError(str(e) + " [\"{}\" in '{}']".format(time, time_key))
assert datetime.timedelta(seconds=0) <= time <= ultimate_time, (
f"'{time_key}' value '{phase_dict[time_key] // 60}:{phase_dict[time_key] % 60}' out of range! Ensure "
f"the "
"time is in 24hr format and lies between 00:00 <= time <= 23:59:59"
)
converted_time.append(time)
# breakpoint()
return {"start time": converted_time[0], "end time": converted_time[1]}
def get_current_time_delta() -> datetime.timedelta:
"""
returns the current time in timedelta format
:return:
"""
current_time = datetime.datetime.now()
return datetime.timedelta(
hours=current_time.hour,
minutes=current_time.minute,
seconds=current_time.second,
)
def current_time_within_time_range(phase: dict) -> bool:
"""
checks whether the current time is within the start-time <= time < end-time range
:param phase: dictionary containing the phase's start and end time
:return: bool
"""
# thank you https://stackoverflow.com/questions/30529437/python-time-comparison-at-midnight
now = get_current_time_delta()
logger.debug("Phase start time: %s\t Current time:%s", phase["start time"], now)
if phase["start time"] <= phase["end time"]:
return phase["start time"] <= now < phase["end time"]
else:
return phase["start time"] <= now or phase["end time"] > now
# return phase["start time"] <= get_current_time_delta() < phase["end time"]
def get_nearest_phase(
*args, key: Literal["start time", "end time"] = "start time"
) -> Tuple[str, datetime.timedelta]:
"""
This function will return the phase which is nearest to the current time.
We'll get a list/tuple of all the phase names and their start timings.
Then run the super complex python expression to get the nearest phase
:param key: the key to check within range against
:param args:
:return: tuple of the nearest phase and the time left until then
"""
now = get_current_time_delta()
def key_function(dict_):
"""
This function will be the "key" function to be put in max()'s key arg
:param dict_: the dict
:return: the output of the difference b/w current time and the start time of the phase
"""
return dict_[key] - now
logger.debug("%s\n\t%s", args, list(map(key_function, args)))
answer = min(args, key=key_function)
return answer["name"], answer["start time"] - now
def get_nearest_phaseV2(phase1, phase2) -> Tuple[str, datetime.timedelta]:
"""
This function will return the phase which is nearest to the current time.
:param phase1: the first phase time + name dict
:param phase2: the second phase time + name dict
:return: tuple of the nearest phase and the time left until then
"""
now = get_current_time_delta()
range_1 = {
"name": phase2["name"],
"start time": phase1["end time"],
"end time": phase2["start time"],
}
range_2 = {
"name": phase1["name"],
"start time": phase2["end time"],
"end time": phase1["start time"],
}
if current_time_within_time_range(range_1):
return range_1["name"], range_1["start time"] - now
elif current_time_within_time_range(range_2):
return range_2["name"], range_2["start time"] - now
def repr_time_delta(time: datetime.timedelta):
"""
represents the passed in time delta as a proper date
:param time:
:return:
"""
formated_time = datetime.datetime.combine(
datetime.datetime.today().date(), (datetime.datetime.min + time).time()
)
return formated_time.strftime("%d/%b/%Y %H:%M:%S")
def get_last_sleep_time() -> datetime.datetime:
"""
returns tha last sleep time in datetime.datetime format.
uses the command: grep -E "PM: suspend entry" /var/log/syslog
to get the last sleep time from system logs
Since the year is not present in log file, assumes the last sleep time is the current year
:return: the last sleep time. datetime.datetime.min if unable to find the last sleep time.
"""
try:
sleep_entry_entries = subprocess.check_output(
'grep -E "PM: suspend entry" /var/log/syslog; exit 0', shell=True
).decode()
assert len(sleep_entry_entries) != 0, "No last sleep time recorded"
except (subprocess.CalledProcessError, AssertionError):
last_sleep_entry = datetime.datetime.min
else:
sleep_entry_entries = sleep_entry_entries.split("\n")[:-1]
last_sleep_entry = sleep_entry_entries[-1].split()
month_date_time = " ".join(last_sleep_entry[0:3])
last_sleep_entry = datetime.datetime.strptime(month_date_time, "%b %d %H:%M:%S")
last_sleep_entry.replace(year=datetime.date.today().year)
return last_sleep_entry
def connected_to_wifi(ssid: str) -> bool:
"""
checks whether the device is connected to wifi using linux's nmcli command.
Assumes, that the device automatically connects to `ssid` if `ssid` is in range.
so we just need to check if wifi scan discovers the ssid
:param ssid: str, the name of the wifi network
:return: bool
"""
scan_output = subprocess.check_output(["nmcli", "device", "wifi", "list"]).decode()
return ssid in scan_output
def check_connected_to_internetV2(
connection_type: Literal["any", "wired", "wireless"] = "any"
) -> Tuple[bool, Tuple[str]]:
"""
check's internet connectivity based on system's reporting.
src: https://sourcedigit.com/20684-how-to-check-eth0-status-on-linux-ubuntu-find-network-interface-card-details-on-
ubuntu/
:param connection_type: the type of connection to check whether connected to internet or not. by default scans for
ethernet and for wifi
accepted arguments: 'wired', 'wireless', 'any'
:return: bool
"""
def get_card_status(card_name: str) -> int:
"""
checks and returns the status of network cards [wired and/or wireless] and if they're connected to internet
or not as reported by the system
:param card_name: the name of the network card
:return: int, 0 or 1
"""
try:
result = int(
subprocess.check_output(["cat", f"/sys/class/net/{card_name}/carrier"])
.decode()
.strip("\n")
)
except subprocess.CalledProcessError:
result = (
subprocess.check_output(
["cat", f"/sys/class/net/{card_name}/operstate"]
)
.decode()
.strip("\n")
)
result = 1 if result == "up" else 0
return result
network_devices = subprocess.check_output(["ls", "/sys/class/net"]).decode().split()
wired = [e for e in network_devices if e[0] == "e"]
wireless = [w for w in network_devices if w[0] == "w"]
logger.debug(f"wired devices found ({len(wired)}): {wired}")
logger.debug(f"wireless devices found ({len(wireless)}): {wireless}")
CONNECTED_TO_INTERNET = False
DEVICE_CONNECTED: List[str] = []
devices = {"wired": wired, "wireless": wireless, "any": wired + wireless}
for card in devices[connection_type]:
logger.debug("Testing device... " + Fore.CYAN + card)
if get_card_status(card) == 1:
CONNECTED_TO_INTERNET = True
DEVICE_CONNECTED.append(card)
logger.debug(
f"Device: {Fore.CYAN + card + Fore.WHITE} is connected to internet"
)
if not DEVICE_CONNECTED:
logger.debug(
"No device connected to internet. Computer is currently %sOFFLINE",
Fore.YELLOW,
)
return CONNECTED_TO_INTERNET, tuple(DEVICE_CONNECTED)
def check_connected_to_internetV3(
connection_type: Literal["any", "wired", "wireless"] = "any"
) -> Tuple[bool, Tuple[str]]:
"""
Checks for internet connectivity by pinging the default gateway.
any -> pings the default gateway
wired -> same as `any`, but checks in AF_LINK gateways
(PS: i dunno if practically there could be 2 default gateways, so i just made the method assuming there could be
multiple gateways that work)
:param connection_type: any/wired/wireless; which connection device to check
:return: (bool, str) ; if device is connected and the connected device's name
"""
def ping(ip: str) -> bool:
"""
Pings the given ip and returns boolean value on success or fail
:param ip: the ip address. correctness of ip check is not performed
:return: True/False if ping was successful or failed
"""
_ping_output = ""
try:
_ping_output = subprocess.check_output(["ping", "-c", "1", ip]).decode()
assert (
"1 received" in _ping_output
), "Ping 'successful', but packet not received"
except subprocess.CalledProcessError:
return False
except AssertionError:
logger.error("ping output: %s", _ping_output, exc_info=True)
return False
else:
return True
"""
Pseudocode:
* pick a device: default/AF_LINK/AF_INET (else: valuerror)
* default: ping the ip
* else:
* iterate through the list, if name starts with w/e ping it
"""
_CONNECTED_TO_INTERNET = False
_DEVICES_CONNECTED: List[str] = []
gateways = netifaces.gateways()
to_check = []
if connection_type == "any":
if netifaces.AF_INET in gateways["default"]:
to_check.append(
gateways["default"][netifaces.AF_INET]
) # AF_INET is ipv4 addresses
elif connection_type in ["wired", "wireless"]:
if netifaces.AF_INET in gateways:
to_check.extend(gateways[netifaces.AF_INET])
else:
raise ValueError(
f'parameter connection_type expected one of wired/wireless/any, got: "{connection_type}"'
)
logger.debug("gateways found: %s", gateways)
logger.debug("devices to check: %s", to_check)
for gateway in to_check:
# only ping if:
# - connection == any
# OR
# - gateway[0] == w && connection == wireless
# OR
# - gateway[0] == e && connection == wired
if (
(connection_type == "any")
or (connection_type == "wireless" and gateway[1][0] == "w")
or (connection_type == "wired" and gateway[1][0] == "e")
):
logger.debug("Testing device... " + Fore.CYAN + gateway[1])
output = ping(gateway[0])
if output is True:
_CONNECTED_TO_INTERNET = True
_DEVICES_CONNECTED.append(gateway[1])
logger.debug(
f"Device: {Fore.CYAN + gateway[1] + Fore.WHITE} is connected to internet"
)
if _CONNECTED_TO_INTERNET is False:
logger.debug(
"No device connected to internet. Computer is currently %sOFFLINE",
Fore.YELLOW,
)
return _CONNECTED_TO_INTERNET, tuple(_DEVICES_CONNECTED)
def check_connected_to_internetV2V3(
connection_type: Literal["any", "wired", "wireless"] = "any"
) -> Tuple[bool, Tuple[str, ...]]:
"""
uses v2 and v3 connectivity functions.
:param connection_type: the type of device to check whether connected to internet or not.
scans for any device by default
:return: boolean and a tuple of devices that were detected to be connected
"""
v2 = check_connected_to_internetV2(connection_type)
if v2[0]:
v3 = check_connected_to_internetV3(connection_type)
return v3[0], v2[1] + v3[1]
else:
return v2
def connectivity_function_factory(
version: Literal["v2", "v3", "v2+v3"] = "v2"
) -> Callable[[Literal["any", "wired", "wireless"]], Tuple[bool, Tuple[str, ...]]]:
"""
returns the right method based on version requirements
:param version: the version of connectivity method to use
:return: the method to use
"""
methods = {
"v2": check_connected_to_internetV2,
"v3": check_connected_to_internetV3,
"v2+v3": check_connected_to_internetV2V3,
}
return methods[version]
# noinspection PyShadowingNames
def sleep_computer_but_wake_at(time: datetime.timedelta, debug: bool = False):
"""
sleeps the computer when func is executed and sets the wake timer till `time` seconds using rtcwake
Assumes the time is greater than `now` time.
:param time: the datetime.timedelta object of the time you need the computer to wake up at
:return: None
:param debug: true to debug and not make the computer sleep but just display the waketimer in stdout
Raises a ValueError if time is less than current time
"""
# get the time in seconds left until wake time
time_to_wake_up = time - get_current_time_delta()
# # check if time isn't negative
# if time_to_wake_up.total_seconds() <= 0:
if time_to_wake_up < datetime.timedelta(seconds=0):
time_to_wake_up += datetime.timedelta(hours=24)
# now we go schleep schleep
logger.info("Going schleep schleep, and will wake up at %s.", repr_time_delta(time))
if not debug:
output = subprocess.check_output(
["sudo", "rtcwake", "-m", "mem", "-s", str(time_to_wake_up.seconds)]
)
else:
logger.info("DEBUG enabled, dry-running rtcwake")
output = subprocess.check_output(
["sudo", "-s", "rtcwake", "-m", "on", "-s", str(time_to_wake_up.seconds)]
)
logger.debug(output.decode())
sleep(2)
logger.info('"Good Mawrning!" [read that in Tim Cook way] I am awake now.')
def suspend_thread_until(time: datetime.timedelta):
"""
suspends the current thread until the `time` time reaches using time.sleep()
If `time` is greater than current time, assumes that `time` happens on same day.
If `time` is lesser than current time, assumes that `time` happens on next day
:param time: the time to wake the thread
:return: None
"""
def __sleep_or_rtc_wake(time_s: datetime.timedelta):
"""based on SLEEP_INTERVAL decides if it should use rtcwake or python sleep"""
logger.debug(
"Sleeping the program until %s for duration %s...",
(datetime.datetime.now() + time_s).strftime("%d/%b/%Y %H:%M:%S"),
str(time_s),
)
if time_s.seconds <= 0:
return
if bool(SLEEP_INTERVAL) is False: # bool(x) because `0 is False` is false. TIL
sleep(time_s.seconds)
else:
output = subprocess.check_output(
["sudo", "-s", "rtcwake", "-m", "on", "-s", str(time_s.seconds)]
)
logger.debug(output.decode())
# convert timedelta to datetime
wakeup_date = datetime.date.today()
if (
time <= get_current_time_delta()
): # if time is less, then assume wakeup is tomorrow
wakeup_date += datetime.timedelta(days=1)
time_to_wakeup = datetime.datetime.combine(wakeup_date, datetime.time.min) + time
logger.info(
"Sleeping thread till: %s", time_to_wakeup.strftime("%d/%b/%Y %H:%M:%S")
)
# the sleep loop
_one_timedelta = datetime.timedelta(seconds=1)
_sleep_interval_timedelta = datetime.timedelta(seconds=SLEEP_INTERVAL)
while _one_timedelta < (
remaining_sleep_time := time_to_wakeup - datetime.datetime.now()
):
# if sleep interval is defined use that, else use logarithmic time inspired by jgillick/python-pause
if SLEEP_INTERVAL:
__sleep_or_rtc_wake(min(remaining_sleep_time, _sleep_interval_timedelta))
else:
__sleep_or_rtc_wake(max(_one_timedelta, remaining_sleep_time / 2))
logger.debug("exit")
# extra delay seconds
sleep(1)
def sleep_or_suspend_until(time: int, mode: Literal["suspend", "sleep"]):
"""
This function will take the time, and choice of the user to either: sleep the computer, or suspend the thread. aka
time.sleep() suspend.
Will also pause the program for a few seconds to give the computer time to connect to the wifi.
:param time: the "duration" of seconds to wake the computer/thread after the "current" time
:param mode:
- 'suspend': will time.sleep() the thread.
- 'sleep': will make the computer go to sleep.
:return:
"""
time = get_current_time_delta() + datetime.timedelta(seconds=time)
if mode == "suspend":
suspend_thread_until(time)
elif mode == "sleep":
sleep_computer_but_wake_at(time, debug=DEBUG)
else:
logger.error(
"'%s' is not a valid argument for parameter 'mode'. 'mode' can either be 'sleep' or 'suspend'",
mode,
)
DELAY = 5
logger.debug(
"%s is awake now, suspending the thread for %d more seconds...",
"computer" if mode == "sleep" else "program",
DELAY,
)
sleep(DELAY)
"""
88 88 88 88
88 "" 88 ""
88 88
88,dPPYba, 88 ,adPPYb,d8 88,dPPYba, 8b,dPPYba, ,adPPYYba, 88 8b,dPPYba,
88P' "8a 88 a8" `Y88 88P' "8a 88P' "Y8 "" `Y8 88 88P' `"8a
88 d8 88 8b 88 88 d8 88 ,adPPPPP88 88 88 88
88b, ,a8" 88 "8a, ,d88 88b, ,a8" 88 88, ,88 88 88 88
8Y"Ybbd8"' 88 `"YbbdP"Y8 8Y"Ybbd8"' 88 `"8bbdP"Y8 88 88 88
aa, ,88
"Y8bbdP"
88 88
88 ""
88
88 ,adPPYba, ,adPPYb,d8 88 ,adPPYba,
88 a8" "8a a8" `Y88 88 a8" ""
88 8b d8 8b 88 88 8b
88 "8a, ,a8" "8a, ,d88 88 "8a, ,aa
88888888888 `"YbbdP"' `"YbbdP"Y8 88 `"Ybbd8"'
aa, ,88
"Y8bbdP"
"""
def wait_for_connectivity_to_change_to(
req_connection_status: Literal["connected", "disconnected"],
action: Literal["suspend", "sleep"],
start_time: datetime.timedelta,
end_time: datetime.timedelta,
timeout: int = -1,
use_v2: Literal["v2", "v3", "v2+v3"] = "v2",
) -> bool:
"""
This function is partly big brain logic.
It will take in two arguments. one to check what part to respond to. either disconnection from internet, or
connection to internet
the second argument will tell whether to suspend the thread or sleep the computer if status of connectivity remains
same.
This function will keep on running until the connectivity becomes what was specified. eg: if wait for
connection, then function will continue until computer connects to internet
if wait for disconnection, then program will continue until the connectivity drops
If computer/server is connected to the internet, then function will run until the connection drops
:param req_connection_status: the connection stage to wait
:param action: sleep/suspend the computer/thread when waiting for TIMEOUT duration to check for connectivity changes
:param start_time: the starting time of the current phase
:param end_time: the time till the function should check for connection changes
:param timeout: *Optional. -1 by default to use the default TIMEOUT. otherwise specify timeout
:param use_v2: *Optional. which connectivity function to use. uses v2 by default
:return: True/False if connection changed to what was asked
"""
# convert connection types to a boolean dictionary
con_val = {"connected": True, "disconnected": False}
# We start with an infinite loop.
logger.info("Waiting for connectivity to change to '%s'...", req_connection_status)
# PRocess end_time to account for cur_time < 12am but end_time > 12am
phase = {
"start time": start_time,
"end time": end_time,
}
# the connectivity check function to use
connectivity_method = connectivity_function_factory(use_v2)
while current_time_within_time_range(phase):
# We check now the status of internet connection.
status, _ = connectivity_method(CONNECTION_TYPE)
# Now check if the internet status is the same as required status' boolean value
if status == con_val[req_connection_status]:
old = "disconnected" if con_val[req_connection_status] else "connected"
new = "connected" if status else "disconnected"
logger.info("Connection status has been changed! (%s -> %s)", old, new)
return True
# If the internet status is not what is required, then we just wait for TIMEOUT duration and check back
# again
else:
# refresh timeout
timeout = TIMEOUT if timeout == -1 else timeout
sleep_or_suspend_until(timeout, action)
continue
logger.info("Connectivity did not change in the specified time.")
return False
"""
888b d888 d8888 8888888 888b 888
8888b d8888 d88888 888 8888b 888
88888b.d88888 d88P888 888 88888b 888
888Y88888P888 d88P 888 888 888Y88b 888
888 Y888P 888 d88P 888 888 888 Y88b888
888 Y8P 888 d88P 888 888 888 Y88888
888 " 888 d8888888888 888 888 Y8888
888 888 d88P 888 8888888 888 Y888
"""
if __name__ == "__main__":
print(
"""
12-hour-server/smart_sleep Copyright (C) 2021-22 Roguedbear
This program comes with ABSOLUTELY NO WARRANTY.
This is free software, and you are welcome to redistribute it
under certain conditions
"""
)
sleep(2)
LAST_SLEEP_TIME = get_last_sleep_time()
LAST_SLEEP_TIME_byProgram = datetime.datetime.min
_wake_up_message_sent = False
debug = False
# if debug:
# config_loader()
# logger.debug("Testing check_connected_to_internetV2() function:")
# logger.debug("Result:" + Fore.CYAN + str(check_connected_to_internetV2()))
#
# # test sleep function
# delta = get_current_time_delta() + datetime.timedelta(seconds=10)
# # sleep_computer_but_wake_at(delta, debug=True)
#
# print(get_nearest_phaseV2(MORNING_PHASE, NIGHT_PHASE))
#
# sleep_or_suspend_until(10, "suspend")
# The main big brain logic of the program
"""Pseudocode:
*check which phase is the nearest to us.
*check if we are in the night phase or morning phase.
-NIGHT PHASE
* check if internet is still connected
* if yes. no problem. check again in night_phase->timeout seconds.
* if internet is not there, wait 5-10 minute (or WIFI_Wait).
* if internet still not there, sleep until morning phase
* if we're within ~night phase->timeout seconds of `end time` of the NIGHT PHASE, go to sleep
- MORNING PHASE
* wakey wakey and see if internet is connected. after waiting 60 seconds giving the computer oppurtunity to connect
* if internet not there, sleep and check again in morning phase->timeout seconds
* if internet there no problem. awake, sleep until NighPhase
* if we're within ~morning-phase->timeout seconds of `end time` of morning phase, we'll go awake and sleep the
thread until NIGHT PHASE arrives
- NEITHER
* if we're in neither phase (possibly cuz you ran the program at your own will), suspend the thread if nearest is
NIGHT, else sleep if the nearest is morning
"""
config_data = config_loader()
logging_level = config_data.get("logging level", 10)
logger.setLevel(logging_level)
connectivity_method = config_data["connectivity_method"]
# Alright, let's start.
while True:
go_to_sleep = False
be_awake = False
# Check if we're in NIGHT PHASE
if current_time_within_time_range(NIGHT_PHASE):
logger.info(f"Computer is in {Fore.LIGHTMAGENTA_EX} night phase")
_wake_up_message_sent = False
# In the night mode, check if we're nearing the end time, if yes then check *vigorously* for connectivity
# changes
if current_time_within_time_range(
{
"start time": NIGHT_PHASE["end time"]
- datetime.timedelta(seconds=NIGHT_PHASE["timeout"]),
"end time": NIGHT_PHASE["end time"],