Spaces:
Sleeping
Sleeping
File size: 5,368 Bytes
b52caaa 83de6ee b52caaa |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
import cv2
from datetime import datetime, timedelta
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import threading
import os
import time
class Camera:
def __init__(self, id, name, index=0, fps=1, save_duration=0.1, rtsp=None):
self.name = name
self.id = id
self.fps = fps
self.save_duration = save_duration
self.index = rtsp if rtsp else index
# Initialize camera in a separate thread to avoid blocking
self.out = None
self.cap = None
self.frame_width = 1980
self.frame_height = 720
self.frame_count = 0
self.start_time = time.time()
self.initialization_thread = threading.Thread(target=self.initialize_camera)
self.initialization_thread.start()
def initialize_camera(self):
self.cap = cv2.VideoCapture(self.index)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.frame_width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.frame_height)
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
# Wait for camera to open or timeout after 10 seconds
start_time = time.time()
while not self.cap.isOpened():
time.sleep(0.1)
if time.time() - start_time > 10: # 10 seconds timeout
print(f"Timeout: Unable to open camera {self.id} - {self.name}")
break
def save_frame(self, frame):
filename = "video/frames/frame_{}.jpg".format(self.frame_count)
cv2.imwrite(filename, frame)
self.frame_count += 1
def create_video_from_frames(self):
filename = "video/outpy_{}.mp4".format(time.strftime("%Y%m%d-%H%M%S"))
self.out = cv2.VideoWriter(filename, cv2.VideoWriter_fourcc(*'mp4v'), self.fps, (self.frame_width, self.frame_height))
for i in range(self.frame_count):
frame = cv2.imread("video/frames/frame_{}.jpg".format(i))
self.out.write(frame)
os.remove("video/frames/frame_{}.jpg".format(i))
self.out.release()
self.frame_count = 0
def gen_frames(self):
i = 0
if not self.cap or not self.cap.isOpened():
yield b'' # Return empty bytes if camera is not available
return
while True:
success, frame = self.cap.read()
if not success:
break
else:
if i % 20 == 0:
self.save_frame(frame)
ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 100])
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
i += 1
if time.time() - self.start_time > self.save_duration * 3600:
self.create_video_from_frames()
self.start_time = time.time()
def close_camera(self):
try:
self.cap.release()
except: print("No camera to release")
try:
self.out.release()
except: print("No video to release")
class Sensor:
def __init__(self, id, name):
self.id = id
self.name = name
self.time = datetime.now()
self.measurement = ["Spotřeba","Napětí","Výkon","Teplota","Vlhkost","Vlhkost"]
self.values = [12,None,50,None,None,None] # store sensor values in a dictionary
self.units = ["A","V","D",None,None,None] # store sensor units in a dictionary
def update_values(self, data, units):
self.values = [round(float(d), 4) for d in data]
self.units = units
self.time = datetime.now()
print(self.values, self.units)
def find(devices, id):
for device in devices:
if device.id == id:
return device
return None # device not found
def plot(self, Db_class, title = "graph"):
# Get the current date and time
now = datetime.now()
# Calculate the date and time 10 days ago
ten_days_ago = now - timedelta(days=300)
# Query the database for records from the last 10 days
ele_1 = Db_class.query.filter(Db_class.sensor_id == self.id, Db_class.time >= ten_days_ago).all()
x = [record.time for record in ele_1]
y1 = [record.value4 for record in ele_1]
y2 = [record.value3 for record in ele_1] # Assuming value2 exists
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Add traces
fig.add_trace(
go.Scatter(x=x, y=y1, name="Spotřeba['kWh'])"),
secondary_y=False,
)
fig.add_trace(
go.Scatter(x=x, y=y2, name="Aktuální výkon['W']"),
secondary_y=True,
)
fig.update_layout(
autosize=True,
title=title,
width=800,
height=512,
margin=dict(
l=40,
r=20,
b=20,
t=60,
pad=2
),
paper_bgcolor='#F5F5F5',
plot_bgcolor='#FFFFFF'
)
fig.update_xaxes(title_text='Datum a čas')
fig.update_yaxes(title_text="Spotřeba [kWh]", secondary_y=False)
fig.update_yaxes(title_text="Aktuální výkon [W]", secondary_y=True)
return fig |