import os import queue import shutil import subprocess import tempfile import threading import time import traceback from abc import ABC, abstractmethod from contextlib import contextmanager from pathlib import Path from queue import Queue import ffmpeg import numpy as np import pydub from pydub import AudioSegment from stf_tools.writers._async import AudioAsyncWriter, VideoAsyncWriter from stf_tools.writers._thread import AudioThreadWriter, VideoThreadWriter video_pipe_name = "video" audio_pipe_name = "audio" class BaseFFMPEGWriter(ABC): def __init__( self, path, width, height, fps, crf=17, audio_sample_rate=16000, quiet=True, ): self.path = Path(path) self.width = width self.height = height self.fps = fps self.crf = crf self.path.parent.mkdir(exist_ok=True, parents=True) pipe_root = tempfile.mkdtemp() self.pipe_dir = Path(pipe_root) self.video_pipe_path = self.pipe_dir / video_pipe_name self.audio_pipe_path = self.pipe_dir / audio_pipe_name os.mkfifo(self.video_pipe_path) os.mkfifo(self.audio_pipe_path) self.audio_sample_rate = audio_sample_rate self.write_process = self._run_ffmpeg( quiet=quiet, ) @abstractmethod def _run_ffmpeg(self, quiet): """ffmpeg writer using named pipe""" class ThreadFFMPEGWriter(BaseFFMPEGWriter): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.video_writer = VideoThreadWriter(self.video_pipe_path, self.fps) self.audio_writer = AudioThreadWriter( self.audio_pipe_path, self.audio_sample_rate ) def finish(self, forced=False): self.video_writer.finish(forced=forced) self.audio_writer.finish(forced=forced) if forced: self.write_process.kill() else: self.write_process.wait() shutil.rmtree(self.pipe_dir, ignore_errors=True) class AsyncFFMPEGWriter(BaseFFMPEGWriter): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.video_writer = VideoAsyncWriter(self.video_pipe_path, self.fps) self.audio_writer = AudioAsyncWriter( self.audio_pipe_path, self.audio_sample_rate ) def finish(self, forced=False): if forced: self.write_process.kill() else: self.write_process.wait() shutil.rmtree(self.pipe_dir, ignore_errors=True)