import torch import os from concurrent.futures import ThreadPoolExecutor from pydub import AudioSegment import cv2; cv2.setNumThreads(0); cv2.ocl.setUseOpenCL(False) from pathlib import Path import subprocess from pathlib import Path import av import imageio import numpy as np from rich.progress import track from tqdm import tqdm import zipfile import shutil import stf_alternative def exec_cmd(cmd): subprocess.run( cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) def images2video(images, wfp, **kwargs): fps = kwargs.get("fps", 24) video_format = kwargs.get("format", "mp4") # default is mp4 format codec = kwargs.get("codec", "libx264") # default is libx264 encoding quality = kwargs.get("quality") # video quality pixelformat = kwargs.get("pixelformat", "yuv420p") # video pixel format image_mode = kwargs.get("image_mode", "rgb") macro_block_size = kwargs.get("macro_block_size", 2) ffmpeg_params = ["-crf", str(kwargs.get("crf", 18))] writer = imageio.get_writer( wfp, fps=fps, format=video_format, codec=codec, quality=quality, ffmpeg_params=ffmpeg_params, pixelformat=pixelformat, macro_block_size=macro_block_size, ) n = len(images) for i in track(range(n), description="writing", transient=True): if image_mode.lower() == "bgr": writer.append_data(images[i][..., ::-1]) else: writer.append_data(images[i]) writer.close() # print(f':smiley: Dump to {wfp}\n', style="bold green") print(f"Dump to {wfp}\n") def merge_audio_video(video_fp, audio_fp, wfp): if osp.exists(video_fp) and osp.exists(audio_fp): cmd = f"ffmpeg -i {video_fp} -i {audio_fp} -c:v copy -c:a aac {wfp} -y" exec_cmd(cmd) print(f"merge {video_fp} and {audio_fp} to {wfp}") else: print(f"video_fp: {video_fp} or audio_fp: {audio_fp} not exists!") class STFPipeline: def __init__(self, stf_path: str = "/home/user/app/stf/", device: str = "cuda:0", template_video_path: str = "templates/front_one_piece_dress_nodded_cut.webm", config_path: str = "front_config.json", checkpoint_path: str = "089.pth", #root_path: str = "works" root_path: str = "/tmp/works", female_video: bool=True ): #os.makedirs(root_path, exist_ok=True) shutil.copytree('/home/user/app/stf/works', '/tmp/works', dirs_exist_ok=True) if female_video: dir_zip= os.path.join(root_path, 'preprocess/nasilhong_f_v1_front/crop_video_front_one_piece_dress_nodded_cut.zip') dir_target=os.path.join(root_path,'preprocess/nasilhong_f_v1_front/') zipfile.ZipFile(dir_zip, 'r').extractall(dir_target) dir_zip=os.path.join(root_path,'preprocess/nasilhong_f_v1_front/front_one_piece_dress_nodded_cut.zip') dir_target=os.path.join(root_path,'preprocess/nasilhong_f_v1_front/') zipfile.ZipFile(dir_zip, 'r').extractall(dir_target) else: dir_zip= os.path.join(root_path, 'preprocess/Ian_v3_front/crop_video_Cam2_2309071202_0012_Natural_Looped.zip') dir_target=os.path.join(root_path,'preprocess/Ian_v3_front/') zipfile.ZipFile(dir_zip, 'r').extractall(dir_target) dir_zip=os.path.join(root_path,'preprocess/Ian_v3_front/Cam2_2309071202_0012_Natural_Looped.zip') dir_target=os.path.join(root_path,'preprocess/Ian_v3_front/') zipfile.ZipFile(dir_zip, 'r').extractall(dir_target) self.config_path = os.path.join(stf_path, config_path) self.checkpoint_path = os.path.join(stf_path, checkpoint_path) #self.work_root_path = os.path.join(stf_path, root_path) self.work_root_path = os.path.join(root_path) self.device = device self.template_video_path=os.path.join(stf_path, template_video_path) # model = stf_alternative.create_model( # config_path=config_path, # checkpoint_path=checkpoint_path, # work_root_path=work_root_path, # device=device, # wavlm_path="microsoft/wavlm-large", # ) # self.template = stf_alternative.Template( # model=model, # config_path=config_path, # template_video_path=template_video_path, # ) print('STFPipeline init') def execute(self, audio: str): print('STFPipeline execute') model = stf_alternative.create_model( config_path=self.config_path, checkpoint_path=self.checkpoint_path, work_root_path=self.work_root_path, device=self.device, wavlm_path="microsoft/wavlm-large", ) print('STFPipeline execute 1') self.template = stf_alternative.Template( model=model, config_path=self.config_path, template_video_path=self.template_video_path, ) print('STFPipeline execute 2') # Path("dubbing").mkdir(exist_ok=True) # save_path = os.path.join("dubbing", Path(audio).stem+"--lip.mp4") Path("/tmp/dubbing").mkdir(exist_ok=True) save_path = os.path.join("/tmp/dubbing", Path(audio).stem+"--lip.mp4") reader = iter(self.template._get_reader(num_skip_frames=0)) print('execute,reader====', reader) audio_segment = AudioSegment.from_file(audio) pivot = 0 results = [] # try: # gen_infer = self.template.gen_infer( # audio_segment, # pivot, # ) # for idx, (it, chunk) in enumerate(gen_infer, pivot): # frame = next(reader) # composed = self.template.compose(idx, frame, it) # frame_name = f"{idx}".zfill(5)+".jpg" # results.append(it['pred']) # pivot = idx + 1 # except StopIteration as e: # pass with ThreadPoolExecutor(1) as p: try: gen_infer = self.template.gen_infer_concurrent( p, audio_segment, pivot, ) for idx, (it, chunk) in enumerate(gen_infer, pivot): frame = next(reader) composed = self.template.compose(idx, frame, it) frame_name = f"{idx}".zfill(5)+".jpg" results.append(it['pred']) pivot = idx + 1 except StopIteration as e: pass print('STFPipeline execute 3') images2video(results, save_path) save_path_aud = save_path.replace('.mp4', '_aud.mp4') merge_audio_video(save_path, audio, save_path_aud) return save_path_aud #save_path