LatentSync / app.py
Francke's picture
css
2759645
import gradio as gr
import os
import sys
import shutil
import uuid
import subprocess
from glob import glob
from huggingface_hub import snapshot_download
# Download models
os.makedirs("checkpoints", exist_ok=True)
snapshot_download(
repo_id = "chunyu-li/LatentSync",
local_dir = "./checkpoints"
)
import tempfile
from moviepy.editor import VideoFileClip
from pydub import AudioSegment
def process_video(input_video_path, temp_dir="temp_dir"):
"""
Crop a given MP4 video to a maximum duration of 10 seconds if it is longer than 10 seconds.
Save the new video in the specified folder (default is temp_dir).
Args:
input_video_path (str): Path to the input video file.
temp_dir (str): Directory where the processed video will be saved.
Returns:
str: Path to the cropped video file.
"""
# Ensure the temp_dir exists
os.makedirs(temp_dir, exist_ok=True)
# Load the video
video = VideoFileClip(input_video_path)
# Determine the output path
input_file_name = os.path.basename(input_video_path)
output_video_path = os.path.join(temp_dir, f"cropped_{input_file_name}")
# Crop the video to 10 seconds if necessary
# if video.duration > 10:
# video = video.subclip(0, 10)
# Write the cropped video to the output path
video.write_videofile(output_video_path, codec="libx264", audio_codec="aac")
# Return the path to the cropped video
return output_video_path
def process_audio(file_path, temp_dir):
# Load the audio file
audio = AudioSegment.from_file(file_path)
# Check and cut the audio if longer than 4 seconds
max_duration = 8 * 1000 # 4 seconds in milliseconds
if len(audio) > max_duration:
audio = audio[:max_duration]
# Save the processed audio in the temporary directory
output_path = os.path.join(temp_dir, "trimmed_audio.wav")
audio.export(output_path, format="wav")
# Return the path to the trimmed file
print(f"Processed audio saved at: {output_path}")
return output_path
import argparse
from omegaconf import OmegaConf
import torch
from diffusers import AutoencoderKL, DDIMScheduler
from latentsync.models.unet import UNet3DConditionModel
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline
from diffusers.utils.import_utils import is_xformers_available
from accelerate.utils import set_seed
from latentsync.whisper.audio2feature import Audio2Feature
def main(video_path, audio_path, progress=gr.Progress(track_tqdm=True)):
inference_ckpt_path = "checkpoints/latentsync_unet.pt"
unet_config_path = "configs/unet/second_stage.yaml"
config = OmegaConf.load(unet_config_path)
print(f"Input video path: {video_path}")
print(f"Input audio path: {audio_path}")
print(f"Loaded checkpoint path: {inference_ckpt_path}")
is_shared_ui = True if "fffiloni/LatentSync" in os.environ['SPACE_ID'] else False
temp_dir = None
if is_shared_ui:
temp_dir = tempfile.mkdtemp()
cropped_video_path = process_video(video_path)
print(f"Cropped video saved to: {cropped_video_path}")
video_path=cropped_video_path
trimmed_audio_path = process_audio(audio_path, temp_dir)
print(f"Processed file was stored temporarily at: {trimmed_audio_path}")
audio_path=trimmed_audio_path
scheduler = DDIMScheduler.from_pretrained("configs")
if config.model.cross_attention_dim == 768:
whisper_model_path = "checkpoints/whisper/small.pt"
elif config.model.cross_attention_dim == 384:
whisper_model_path = "checkpoints/whisper/tiny.pt"
else:
raise NotImplementedError("cross_attention_dim must be 768 or 384")
audio_encoder = Audio2Feature(model_path=whisper_model_path, device="cpu", num_frames=config.data.num_frames)
vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=torch.float16)
vae.config.scaling_factor = 0.18215
vae.config.shift_factor = 0
unet, _ = UNet3DConditionModel.from_pretrained(
OmegaConf.to_container(config.model),
inference_ckpt_path, # load checkpoint
device="cpu",
)
unet = unet.to(dtype=torch.float16)
# set xformers
if is_xformers_available():
unet.enable_xformers_memory_efficient_attention()
pipeline = LipsyncPipeline(
vae=vae,
audio_encoder=audio_encoder,
unet=unet,
scheduler=scheduler,
).to("cuda")
seed = -1
if seed != -1:
set_seed(seed)
else:
torch.seed()
print(f"Initial seed: {torch.initial_seed()}")
unique_id = str(uuid.uuid4())
video_out_path = f"video_out{unique_id}.mp4"
pipeline(
video_path=video_path,
audio_path=audio_path,
video_out_path=video_out_path,
video_mask_path=video_out_path.replace(".mp4", "_mask.mp4"),
num_frames=config.data.num_frames,
num_inference_steps=config.run.inference_steps,
guidance_scale=1.0,
weight_dtype=torch.float16,
width=config.data.resolution,
height=config.data.resolution,
)
if is_shared_ui:
# Clean up the temporary directory
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
print(f"Temporary directory {temp_dir} deleted.")
return video_out_path
with gr.Blocks() as demo:
with gr.Column(elem_id="col-container"):
with gr.Row():
with gr.Column():
video_input = gr.Video(label="Video Control", format="mp4")
audio_input = gr.Audio(label="Audio Input", type="filepath")
submit_btn = gr.Button("Submit")
with gr.Column():
video_result = gr.Video(label="Result")
if False:
gr.Examples(
examples = [
["assets/demo1_video.mp4", "assets/demo1_audio.wav"],
["assets/demo2_video.mp4", "assets/demo2_audio.wav"],
["assets/demo3_video.mp4", "assets/demo3_audio.wav"],
],
inputs = [video_input, audio_input]
)
submit_btn.click(
fn = main,
inputs = [video_input, audio_input],
outputs = [video_result]
)
demo.queue().launch(show_api=False, show_error=True)