Spaces:
Sleeping
Sleeping
import threading | |
import time | |
import traceback | |
from queue import Queue | |
import numpy as np | |
import pydub | |
class ThreadWriter: | |
def __init__(self, path): | |
queue = Queue(maxsize=240) | |
self.finished = False | |
def write_bytes(): | |
try: | |
with open(path, "wb", 0) as f: | |
while (bytes := queue.get()) is not None: | |
f.write(bytes) | |
except Exception as e: | |
traceback.print_exc() | |
self.finished = True | |
self.thread = threading.Thread(target=write_bytes) | |
self.queue = queue | |
self.thread.start() | |
def write_bytes(self, bytes): | |
if self.finished: | |
return | |
self.queue.put(bytes) | |
def finish(self, forced=False): | |
self.queue.put(None) | |
if forced: | |
self.finished = True | |
else: | |
self.thread.join() | |
class VideoThreadWriter(ThreadWriter): | |
def __init__(self, path, fps): | |
super().__init__(path) | |
self.fps = fps | |
def write(self, video: np.array): | |
return self.write_bytes(video.astype(np.uint8).tobytes()) | |
class AudioThreadWriter(ThreadWriter): | |
def __init__(self, path, audio_sample_rate): | |
super().__init__(path) | |
self.audio_sample_rate = audio_sample_rate | |
def write(self, audio: pydub.AudioSegment): | |
return self.write_bytes(audio.set_frame_rate(self.audio_sample_rate).raw_data) | |