import asyncio import aiofiles import numpy as np import pydub class AsyncWriter: def __init__(self, path): self.queue = asyncio.Queue(maxsize=240) self.path = path async def pipeline(self): try: async with aiofiles.open(self.path, "wb", 0) as f: while (bytes := await self.queue.get()) is not None: await f.write(bytes) except: pass async def write_bytes(self, bytes): await self.queue.put(bytes) class VideoAsyncWriter(AsyncWriter): def __init__(self, path, fps): super().__init__(path) self.fps = fps async def write(self, video: np.array): return await self.write_bytes(video.astype(np.uint8).tobytes()) class AudioAsyncWriter(AsyncWriter): def __init__(self, path, audio_sample_rate): super().__init__(path) self.audio_sample_rate = audio_sample_rate async def write(self, audio: pydub.AudioSegment): return await self.write_bytes( audio.set_frame_rate(self.audio_sample_rate).raw_data )