import threading import time import traceback from queue import Queue import numpy as np import pydub class ThreadWriter: def __init__(self, path): queue = Queue(maxsize=240) self.finished = False def write_bytes(): try: with open(path, "wb", 0) as f: while (bytes := queue.get()) is not None: f.write(bytes) except Exception as e: traceback.print_exc() self.finished = True self.thread = threading.Thread(target=write_bytes) self.queue = queue self.thread.start() def write_bytes(self, bytes): if self.finished: return self.queue.put(bytes) def finish(self, forced=False): self.queue.put(None) if forced: self.finished = True else: self.thread.join() class VideoThreadWriter(ThreadWriter): def __init__(self, path, fps): super().__init__(path) self.fps = fps def write(self, video: np.array): return self.write_bytes(video.astype(np.uint8).tobytes()) class AudioThreadWriter(ThreadWriter): def __init__(self, path, audio_sample_rate): super().__init__(path) self.audio_sample_rate = audio_sample_rate def write(self, audio: pydub.AudioSegment): return self.write_bytes(audio.set_frame_rate(self.audio_sample_rate).raw_data)