-
Notifications
You must be signed in to change notification settings - Fork 1
/
Main.py
91 lines (77 loc) · 2.93 KB
/
Main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
if __name__ == '__main__':
#{NON-THREADING RELATED}
from networktables import NetworkTables
rioURL = '10.42.56.2'
NetworkTables.initialize(server = rioURL)
NetworkTables.setNetworkIdentity('TX2')
NetworkTables.setUpdateRate(.020)
table = NetworkTables.getTable('ZED')
robot_data = NetworkTables.getTable('Faraday')
from cv2 import VideoCapture
portL, portR = (False, False)
for port in range(8):
cam = VideoCapture(port)
found, frame = cam.read()
if found and frame.shape[1]//frame.shape[0] is 1:
if not portL: portL = port
else: portR = port
print('Found ports: ' + str((portL, portR)))
#{THREADING RELATED}
#{import overarching packages}
from queue import Queue
from CustomThread import CustomThread
#{import task-specific classes}
from Cameras import USB, ZED
from Stitching import ThreadableStitcher
from Servers import Web, NT
#{declare queues}
queue_cameraZED = Queue()
queue_odometry = Queue()
queue_stitched = Queue()
queue_cubes = Queue()
#{initialize cameras}
cameraL = USB.USB(portL, 'Resources/ELPFisheyeL/')
cameraR = USB.USB(portR, 'Resources/ELPFisheyeR/')
#{declare threads}
thread_cameraZED = CustomThread(ZED.ThreadableGrabber(queue_cameraZED, queue_odometry))
thread_stitcher = CustomThread(ThreadableStitcher(cameraL, cameraR, queue_stitched, queue_cubes))
thread_mjpeg = CustomThread(Web.ThreadableMJPGSender(queue_stitched, robot_data))
thread_nt_odometry = CustomThread(NT.ThreadableOdometrySender(table, queue_odometry))
thread_nt_cubes = CustomThread(NT.ThreadableCubeSender(table, queue_cubes))
#{start threads}
thread_cameraZED.start()
thread_stitcher.start()
thread_mjpeg.start()
thread_nt_odometry.start()
thread_nt_cubes.start()
zed_running = True
from time import sleep
while True:
try:
#{stopping ZED when it is no longer needed}
if not table.getBoolean('Enable Odometry', True):
if zed_running:
thread_cameraZED.stop()
thread_cameraZED.join()
zed_running = False
sleep(1.0)
except KeyboardInterrupt:
#{ending the program entirely}
thread_stitcher.stop()
thread_mjpeg.stop()
thread_nt_odometry.stop()
thread_nt_cubes.stop()
thread_stitcher.join()
thread_mjpeg.join()
if zed_running:
thread_cameraZED.stop()
thread_cameraZED.join()
zed_running = False
# Wake up thread_nt's if they're stuck in a get()
queue_odometry.put(None)
queue_cubes.put(None)
thread_nt_odometry.join()
thread_nt_cubes.join()
NetworkTables.stopClient()
break
print('--------program terminated--------')