Spaces:
Sleeping
Sleeping
File size: 2,578 Bytes
36cb39e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
import os
import queue
import shutil
import subprocess
import tempfile
import threading
import time
import traceback
from abc import ABC, abstractmethod
from contextlib import contextmanager
from pathlib import Path
from queue import Queue
import ffmpeg
import numpy as np
import pydub
from pydub import AudioSegment
from stf_tools.writers._async import AudioAsyncWriter, VideoAsyncWriter
from stf_tools.writers._thread import AudioThreadWriter, VideoThreadWriter
video_pipe_name = "video"
audio_pipe_name = "audio"
class BaseFFMPEGWriter(ABC):
def __init__(
self,
path,
width,
height,
fps,
crf=17,
audio_sample_rate=16000,
quiet=True,
):
self.path = Path(path)
self.width = width
self.height = height
self.fps = fps
self.crf = crf
self.path.parent.mkdir(exist_ok=True, parents=True)
pipe_root = tempfile.mkdtemp()
self.pipe_dir = Path(pipe_root)
self.video_pipe_path = self.pipe_dir / video_pipe_name
self.audio_pipe_path = self.pipe_dir / audio_pipe_name
os.mkfifo(self.video_pipe_path)
os.mkfifo(self.audio_pipe_path)
self.audio_sample_rate = audio_sample_rate
self.write_process = self._run_ffmpeg(
quiet=quiet,
)
@abstractmethod
def _run_ffmpeg(self, quiet):
"""ffmpeg writer using named pipe"""
class ThreadFFMPEGWriter(BaseFFMPEGWriter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.video_writer = VideoThreadWriter(self.video_pipe_path, self.fps)
self.audio_writer = AudioThreadWriter(
self.audio_pipe_path, self.audio_sample_rate
)
def finish(self, forced=False):
self.video_writer.finish(forced=forced)
self.audio_writer.finish(forced=forced)
if forced:
self.write_process.kill()
else:
self.write_process.wait()
shutil.rmtree(self.pipe_dir, ignore_errors=True)
class AsyncFFMPEGWriter(BaseFFMPEGWriter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.video_writer = VideoAsyncWriter(self.video_pipe_path, self.fps)
self.audio_writer = AudioAsyncWriter(
self.audio_pipe_path, self.audio_sample_rate
)
def finish(self, forced=False):
if forced:
self.write_process.kill()
else:
self.write_process.wait()
shutil.rmtree(self.pipe_dir, ignore_errors=True)
|