Spaces:
Sleeping
Sleeping
import os | |
import queue | |
import shutil | |
import subprocess | |
import tempfile | |
import threading | |
import time | |
import traceback | |
from abc import ABC, abstractmethod | |
from contextlib import contextmanager | |
from pathlib import Path | |
from queue import Queue | |
import ffmpeg | |
import numpy as np | |
import pydub | |
from pydub import AudioSegment | |
from stf_tools.writers._async import AudioAsyncWriter, VideoAsyncWriter | |
from stf_tools.writers._thread import AudioThreadWriter, VideoThreadWriter | |
video_pipe_name = "video" | |
audio_pipe_name = "audio" | |
class BaseFFMPEGWriter(ABC): | |
def __init__( | |
self, | |
path, | |
width, | |
height, | |
fps, | |
crf=17, | |
audio_sample_rate=16000, | |
quiet=True, | |
): | |
self.path = Path(path) | |
self.width = width | |
self.height = height | |
self.fps = fps | |
self.crf = crf | |
self.path.parent.mkdir(exist_ok=True, parents=True) | |
pipe_root = tempfile.mkdtemp() | |
self.pipe_dir = Path(pipe_root) | |
self.video_pipe_path = self.pipe_dir / video_pipe_name | |
self.audio_pipe_path = self.pipe_dir / audio_pipe_name | |
os.mkfifo(self.video_pipe_path) | |
os.mkfifo(self.audio_pipe_path) | |
self.audio_sample_rate = audio_sample_rate | |
self.write_process = self._run_ffmpeg( | |
quiet=quiet, | |
) | |
def _run_ffmpeg(self, quiet): | |
"""ffmpeg writer using named pipe""" | |
class ThreadFFMPEGWriter(BaseFFMPEGWriter): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.video_writer = VideoThreadWriter(self.video_pipe_path, self.fps) | |
self.audio_writer = AudioThreadWriter( | |
self.audio_pipe_path, self.audio_sample_rate | |
) | |
def finish(self, forced=False): | |
self.video_writer.finish(forced=forced) | |
self.audio_writer.finish(forced=forced) | |
if forced: | |
self.write_process.kill() | |
else: | |
self.write_process.wait() | |
shutil.rmtree(self.pipe_dir, ignore_errors=True) | |
class AsyncFFMPEGWriter(BaseFFMPEGWriter): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.video_writer = VideoAsyncWriter(self.video_pipe_path, self.fps) | |
self.audio_writer = AudioAsyncWriter( | |
self.audio_pipe_path, self.audio_sample_rate | |
) | |
def finish(self, forced=False): | |
if forced: | |
self.write_process.kill() | |
else: | |
self.write_process.wait() | |
shutil.rmtree(self.pipe_dir, ignore_errors=True) | |