Spaces:
Running
Running
import gradio as gr | |
import os | |
import sys | |
import shutil | |
import uuid | |
import subprocess | |
from glob import glob | |
from huggingface_hub import snapshot_download | |
# Download models | |
os.makedirs("checkpoints", exist_ok=True) | |
snapshot_download( | |
repo_id = "chunyu-li/LatentSync", | |
local_dir = "./checkpoints" | |
) | |
import tempfile | |
from moviepy.editor import VideoFileClip | |
from pydub import AudioSegment | |
def process_video(input_video_path, temp_dir="temp_dir"): | |
""" | |
Crop a given MP4 video to a maximum duration of 10 seconds if it is longer than 10 seconds. | |
Save the new video in the specified folder (default is temp_dir). | |
Args: | |
input_video_path (str): Path to the input video file. | |
temp_dir (str): Directory where the processed video will be saved. | |
Returns: | |
str: Path to the cropped video file. | |
""" | |
# Ensure the temp_dir exists | |
os.makedirs(temp_dir, exist_ok=True) | |
# Load the video | |
video = VideoFileClip(input_video_path) | |
# Determine the output path | |
input_file_name = os.path.basename(input_video_path) | |
output_video_path = os.path.join(temp_dir, f"cropped_{input_file_name}") | |
# Crop the video to 10 seconds if necessary | |
# if video.duration > 10: | |
# video = video.subclip(0, 10) | |
# Write the cropped video to the output path | |
video.write_videofile(output_video_path, codec="libx264", audio_codec="aac") | |
# Return the path to the cropped video | |
return output_video_path | |
def process_audio(file_path, temp_dir): | |
# Load the audio file | |
audio = AudioSegment.from_file(file_path) | |
# Check and cut the audio if longer than 4 seconds | |
max_duration = 8 * 1000 # 4 seconds in milliseconds | |
if len(audio) > max_duration: | |
audio = audio[:max_duration] | |
# Save the processed audio in the temporary directory | |
output_path = os.path.join(temp_dir, "trimmed_audio.wav") | |
audio.export(output_path, format="wav") | |
# Return the path to the trimmed file | |
print(f"Processed audio saved at: {output_path}") | |
return output_path | |
import argparse | |
from omegaconf import OmegaConf | |
import torch | |
from diffusers import AutoencoderKL, DDIMScheduler | |
from latentsync.models.unet import UNet3DConditionModel | |
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline | |
from diffusers.utils.import_utils import is_xformers_available | |
from accelerate.utils import set_seed | |
from latentsync.whisper.audio2feature import Audio2Feature | |
def main(video_path, audio_path, progress=gr.Progress(track_tqdm=True)): | |
inference_ckpt_path = "checkpoints/latentsync_unet.pt" | |
unet_config_path = "configs/unet/second_stage.yaml" | |
config = OmegaConf.load(unet_config_path) | |
print(f"Input video path: {video_path}") | |
print(f"Input audio path: {audio_path}") | |
print(f"Loaded checkpoint path: {inference_ckpt_path}") | |
is_shared_ui = True if "fffiloni/LatentSync" in os.environ['SPACE_ID'] else False | |
temp_dir = None | |
if is_shared_ui: | |
temp_dir = tempfile.mkdtemp() | |
cropped_video_path = process_video(video_path) | |
print(f"Cropped video saved to: {cropped_video_path}") | |
video_path=cropped_video_path | |
trimmed_audio_path = process_audio(audio_path, temp_dir) | |
print(f"Processed file was stored temporarily at: {trimmed_audio_path}") | |
audio_path=trimmed_audio_path | |
scheduler = DDIMScheduler.from_pretrained("configs") | |
if config.model.cross_attention_dim == 768: | |
whisper_model_path = "checkpoints/whisper/small.pt" | |
elif config.model.cross_attention_dim == 384: | |
whisper_model_path = "checkpoints/whisper/tiny.pt" | |
else: | |
raise NotImplementedError("cross_attention_dim must be 768 or 384") | |
audio_encoder = Audio2Feature(model_path=whisper_model_path, device="cpu", num_frames=config.data.num_frames) | |
vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=torch.float16) | |
vae.config.scaling_factor = 0.18215 | |
vae.config.shift_factor = 0 | |
unet, _ = UNet3DConditionModel.from_pretrained( | |
OmegaConf.to_container(config.model), | |
inference_ckpt_path, # load checkpoint | |
device="cpu", | |
) | |
unet = unet.to(dtype=torch.float16) | |
# set xformers | |
if is_xformers_available(): | |
unet.enable_xformers_memory_efficient_attention() | |
pipeline = LipsyncPipeline( | |
vae=vae, | |
audio_encoder=audio_encoder, | |
unet=unet, | |
scheduler=scheduler, | |
).to("cuda") | |
seed = -1 | |
if seed != -1: | |
set_seed(seed) | |
else: | |
torch.seed() | |
print(f"Initial seed: {torch.initial_seed()}") | |
unique_id = str(uuid.uuid4()) | |
video_out_path = f"video_out{unique_id}.mp4" | |
pipeline( | |
video_path=video_path, | |
audio_path=audio_path, | |
video_out_path=video_out_path, | |
video_mask_path=video_out_path.replace(".mp4", "_mask.mp4"), | |
num_frames=config.data.num_frames, | |
num_inference_steps=config.run.inference_steps, | |
guidance_scale=1.0, | |
weight_dtype=torch.float16, | |
width=config.data.resolution, | |
height=config.data.resolution, | |
) | |
if is_shared_ui: | |
# Clean up the temporary directory | |
if os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir) | |
print(f"Temporary directory {temp_dir} deleted.") | |
return video_out_path | |
with gr.Blocks() as demo: | |
with gr.Column(elem_id="col-container"): | |
with gr.Row(): | |
with gr.Column(): | |
video_input = gr.Video(label="Video Control", format="mp4") | |
audio_input = gr.Audio(label="Audio Input", type="filepath") | |
submit_btn = gr.Button("Submit") | |
with gr.Column(): | |
video_result = gr.Video(label="Result") | |
if False: | |
gr.Examples( | |
examples = [ | |
["assets/demo1_video.mp4", "assets/demo1_audio.wav"], | |
["assets/demo2_video.mp4", "assets/demo2_audio.wav"], | |
["assets/demo3_video.mp4", "assets/demo3_audio.wav"], | |
], | |
inputs = [video_input, audio_input] | |
) | |
submit_btn.click( | |
fn = main, | |
inputs = [video_input, audio_input], | |
outputs = [video_result] | |
) | |
demo.queue().launch(show_api=False, show_error=True) | |