MM-StoryAgent / mm_story_agent /video_compose_agent.py
Xu Xuenan
Fix caption bug
6a4302b
from pathlib import Path
from typing import List, Union
import random
import re
from datetime import timedelta
from tqdm import trange
import numpy as np
import librosa
import cv2
from zhon.hanzi import punctuation as zh_punc
from moviepy.editor import ImageClip, AudioFileClip, CompositeAudioClip, \
CompositeVideoClip, ColorClip, VideoFileClip, VideoClip, TextClip, concatenate_audioclips
import moviepy.video.compositing.transitions as transfx
from moviepy.audio.AudioClip import AudioArrayClip
from moviepy.audio.fx.all import audio_loop
from moviepy.video.tools.subtitles import SubtitlesClip
def generate_srt(timestamps: List,
captions: List,
save_path: Union[str, Path],
max_single_length: int = 30):
def format_time(seconds: float) -> str:
td = timedelta(seconds=seconds)
total_seconds = int(td.total_seconds())
millis = int((td.total_seconds() - total_seconds) * 1000)
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02}:{minutes:02}:{seconds:02},{millis:03}"
srt_content = []
num_caps = len(timestamps)
for idx in range(num_caps):
start_time, end_time = timestamps[idx]
caption_chunks = split_caption(captions[idx], max_single_length).split("\n")
num_chunks = len(caption_chunks)
if num_chunks == 0:
continue
segment_duration = (end_time - start_time) / num_chunks
for chunk_idx, chunk in enumerate(caption_chunks):
chunk_start_time = start_time + segment_duration * chunk_idx
chunk_end_time = start_time + segment_duration * (chunk_idx + 1)
start_time_str = format_time(chunk_start_time)
end_time_str = format_time(chunk_end_time)
srt_content.append(f"{len(srt_content) // 2 + 1}\n{start_time_str} --> {end_time_str}\n{chunk}\n\n")
with open(save_path, 'w') as srt_file:
srt_file.writelines(srt_content)
def add_caption(captions: List,
srt_path: Union[str, Path],
timestamps: List,
video_clip: VideoClip,
max_single_length: int = 30,
**caption_config):
generate_srt(timestamps, captions, srt_path, max_single_length)
generator = lambda txt: TextClip(txt, **caption_config)
subtitles = SubtitlesClip(srt_path.__str__(), generator)
captioned_clip = CompositeVideoClip([video_clip,
subtitles.set_position(("center", "bottom"), relative=True)])
return captioned_clip
def split_keep_separator(text, separator):
pattern = f'([{re.escape(separator)}])'
pieces = re.split(pattern, text)
return pieces
def split_caption(caption, max_length=30):
lines = []
if ord(caption[0]) >= ord("a") and ord(caption[0]) <= ord("z") or ord(caption[0]) >= ord("A") and ord(caption[0]) <= ord("Z"):
words = caption.split(" ")
current_words = []
for word in words:
if len(" ".join(current_words + [word])) <= max_length:
current_words += [word]
else:
if current_words:
lines.append(" ".join(current_words))
current_words = [word]
if current_words:
lines.append(" ".join(current_words))
else:
sentences = split_keep_separator(caption, zh_punc)
current_line = ""
for sentence in sentences:
if len(current_line + sentence) <= max_length:
current_line += sentence
else:
if current_line:
lines.append(current_line)
current_line = ""
if sentence.startswith(tuple(zh_punc)):
if lines:
lines[-1] += sentence[0]
current_line = sentence[1:]
else:
current_line = sentence
if current_line:
lines.append(current_line.strip())
return '\n'.join(lines)
def add_bottom_black_area(clip: VideoFileClip,
black_area_height: int = 64):
"""
Add a black area at the bottom of the video clip (for captions).
Args:
clip (VideoFileClip): Video clip to be processed.
black_area_height (int): Height of the black area.
Returns:
VideoFileClip: Processed video clip.
"""
black_bar = ColorClip(size=(clip.w, black_area_height), color=(0, 0, 0), duration=clip.duration)
extended_clip = CompositeVideoClip([clip, black_bar.set_position(("center", "bottom"))])
return extended_clip
def add_zoom_effect(clip, speed=1.0, mode='in', position='center'):
fps = clip.fps
duration = clip.duration
total_frames = int(duration * fps)
def main(getframe, t):
frame = getframe(t)
h, w = frame.shape[: 2]
i = t * fps
if mode == 'out':
i = total_frames - i
zoom = 1 + (i * ((0.1 * speed) / total_frames))
positions = {'center': [(w - (w * zoom)) / 2, (h - (h * zoom)) / 2],
'left': [0, (h - (h * zoom)) / 2],
'right': [(w - (w * zoom)), (h - (h * zoom)) / 2],
'top': [(w - (w * zoom)) / 2, 0],
'topleft': [0, 0],
'topright': [(w - (w * zoom)), 0],
'bottom': [(w - (w * zoom)) / 2, (h - (h * zoom))],
'bottomleft': [0, (h - (h * zoom))],
'bottomright': [(w - (w * zoom)), (h - (h * zoom))]}
tx, ty = positions[position]
M = np.array([[zoom, 0, tx], [0, zoom, ty]])
frame = cv2.warpAffine(frame, M, (w, h))
return frame
return clip.fl(main)
def add_move_effect(clip, direction="left", move_raito=0.95):
orig_width = clip.size[0]
orig_height = clip.size[1]
new_width = int(orig_width / move_raito)
new_height = int(orig_height / move_raito)
clip = clip.resize(width=new_width, height=new_height)
if direction == "left":
start_position = (0, 0)
end_position = (orig_width - new_width, 0)
elif direction == "right":
start_position = (orig_width - new_width, 0)
end_position = (0, 0)
duration = clip.duration
moving_clip = clip.set_position(
lambda t: (start_position[0] + (
end_position[0] - start_position[0]) / duration * t, start_position[1])
)
final_clip = CompositeVideoClip([moving_clip], size=(orig_width, orig_height))
return final_clip
def add_slide_effect(clips, slide_duration):
####### CAUTION: requires at least `slide_duration` of silence at the end of each clip #######
durations = [clip.duration for clip in clips]
first_clip = CompositeVideoClip(
[clips[0].fx(transfx.slide_out, duration=slide_duration, side="left")]
).set_start(0)
slide_out_sides = ["left"]
videos = [first_clip]
out_to_in_mapping = {"left": "right", "right": "left"}
for idx, clip in enumerate(clips[1: -1], start=1):
# For all other clips in the middle, we need them to slide in to the previous clip and out for the next one
# determine `slide_in_side` according to the `slide_out_side` of the previous clip
slide_in_side = out_to_in_mapping[slide_out_sides[-1]]
slide_out_side = "left" if random.random() <= 0.5 else "right"
slide_out_sides.append(slide_out_side)
videos.append(
(
CompositeVideoClip(
[clip.fx(transfx.slide_in, duration=slide_duration, side=slide_in_side)]
)
.set_start(sum(durations[:idx]) - (slide_duration) * idx)
.fx(transfx.slide_out, duration=slide_duration, side=slide_out_side)
)
)
last_clip = CompositeVideoClip(
[clips[-1].fx(transfx.slide_in, duration=slide_duration, side=out_to_in_mapping[slide_out_sides[-1]])]
).set_start(sum(durations[:-1]) - slide_duration * (len(clips) - 1))
videos.append(last_clip)
video = CompositeVideoClip(videos)
return video
def compose_video(story_dir: Union[str, Path],
save_path: Union[str, Path],
captions: List,
music_path: Union[str, Path],
num_pages: int,
fps: int = 10,
audio_sample_rate: int = 16000,
audio_codec: str = "mp3",
caption_config: dict = {},
max_single_caption_length: int = 30,
fade_duration: float = 1.0,
slide_duration: float = 0.4,
zoom_speed: float = 0.5,
move_ratio: float = 0.95,
sound_volume: float = 0.2,
music_volume: float = 0.2,
bg_speech_ratio: float = 0.4):
if not isinstance(story_dir, Path):
story_dir = Path(story_dir)
sound_dir = story_dir / "sound"
image_dir = story_dir / "image"
speech_dir = story_dir / "speech"
video_clips = []
# audio_durations = []
cur_duration = 0
timestamps = []
for page in trange(1, num_pages + 1):
##### speech track
slide_silence = AudioArrayClip(np.zeros((int(audio_sample_rate * slide_duration), 2)), fps=audio_sample_rate)
fade_silence = AudioArrayClip(np.zeros((int(audio_sample_rate * fade_duration), 2)), fps=audio_sample_rate)
if (speech_dir / f"p{page}.wav").exists(): # single speech file
single_utterance = True
speech_file = (speech_dir / f"./p{page}.wav").__str__()
speech_clip = AudioFileClip(speech_file, fps=audio_sample_rate)
# speech_clip = speech_clip.audio_fadein(fade_duration)
speech_clip = concatenate_audioclips([fade_silence, speech_clip, fade_silence])
else: # multiple speech files
single_utterance = False
speech_files = list(speech_dir.glob(f"p{page}_*.wav"))
speech_files = sorted(speech_files, key=lambda x: int(x.stem.split("_")[-1]))
speech_clips = []
for utt_idx, speech_file in enumerate(speech_files):
speech_clip = AudioFileClip(speech_file.__str__(), fps=audio_sample_rate)
# add multiple timestamps of the same speech clip
if utt_idx == 0:
timestamps.append([cur_duration + fade_duration,
cur_duration + fade_duration + speech_clip.duration])
cur_duration += speech_clip.duration + fade_duration
elif utt_idx == len(speech_files) - 1:
timestamps.append([
cur_duration,
cur_duration + speech_clip.duration
])
cur_duration += speech_clip.duration + fade_duration + slide_duration
else:
timestamps.append([
cur_duration,
cur_duration + speech_clip.duration
])
cur_duration += speech_clip.duration
speech_clips.append(speech_clip)
speech_clip = concatenate_audioclips([fade_silence] + speech_clips + [fade_silence])
speech_file = speech_files[0] # for energy calculation
# add slide silence
if page == 1:
speech_clip = concatenate_audioclips([speech_clip, slide_silence])
else:
speech_clip = concatenate_audioclips([slide_silence, speech_clip, slide_silence])
# add the timestamp of the whole clip as a single element
if single_utterance:
if page == 1:
timestamps.append([cur_duration + fade_duration,
cur_duration + speech_clip.duration - fade_duration - slide_duration])
cur_duration += speech_clip.duration - slide_duration
else:
timestamps.append([cur_duration + fade_duration + slide_duration,
cur_duration + speech_clip.duration - fade_duration - slide_duration])
cur_duration += speech_clip.duration - slide_duration
speech_array, _ = librosa.core.load(speech_file, sr=None)
speech_rms = librosa.feature.rms(y=speech_array)[0].mean()
# set image as the main content, align the duration
image_file = (image_dir / f"./p{page}.png").__str__()
image_clip = ImageClip(image_file)
image_clip = image_clip.set_duration(speech_clip.duration).set_fps(fps)
image_clip = image_clip.crossfadein(fade_duration).crossfadeout(fade_duration)
if random.random() <= 0.5: # zoom in or zoom out
if random.random() <= 0.5:
zoom_mode = "in"
else:
zoom_mode = "out"
image_clip = add_zoom_effect(image_clip, zoom_speed, zoom_mode)
else: # move left or right
if random.random() <= 0.5:
direction = "left"
else:
direction = "right"
image_clip = add_move_effect(image_clip, direction=direction, move_raito=move_ratio)
# sound track
sound_file = sound_dir / f"p{page}.wav"
if sound_file.exists():
sound_clip = AudioFileClip(sound_file.__str__(), fps=audio_sample_rate)
sound_clip = sound_clip.audio_fadein(fade_duration)
if sound_clip.duration < speech_clip.duration:
sound_clip = audio_loop(sound_clip, duration=speech_clip.duration)
else:
sound_clip = sound_clip.subclip(0, speech_clip.duration)
sound_array, _ = librosa.core.load(sound_file.__str__(), sr=None)
sound_rms = librosa.feature.rms(y=sound_array)[0].mean()
ratio = speech_rms / sound_rms * bg_speech_ratio
audio_clip = CompositeAudioClip([speech_clip, sound_clip.volumex(sound_volume * ratio).audio_fadeout(fade_duration)])
else:
audio_clip = speech_clip
video_clip = image_clip.set_audio(audio_clip)
video_clips.append(video_clip)
# audio_durations.append(audio_clip.duration)
# final_clip = concatenate_videoclips(video_clips, method="compose")
composite_clip = add_slide_effect(video_clips, slide_duration=slide_duration)
composite_clip = add_bottom_black_area(composite_clip, black_area_height=caption_config["area_height"])
del caption_config["area_height"]
composite_clip = add_caption(
captions,
story_dir / "captions.srt",
timestamps,
composite_clip,
max_single_caption_length,
**caption_config
)
# add music track, align the duration
music_clip = AudioFileClip(music_path.__str__(), fps=audio_sample_rate)
music_array, _ = librosa.core.load(music_path.__str__(), sr=None)
music_rms = librosa.feature.rms(y=music_array)[0].mean()
ratio = speech_rms / music_rms * bg_speech_ratio
if music_clip.duration < composite_clip.duration:
music_clip = audio_loop(music_clip, duration=composite_clip.duration)
else:
music_clip = music_clip.subclip(0, composite_clip.duration)
all_audio_clip = CompositeAudioClip([composite_clip.audio, music_clip.volumex(music_volume * ratio)])
composite_clip = composite_clip.set_audio(all_audio_clip)
composite_clip.write_videofile(save_path.__str__(),
audio_fps=audio_sample_rate,
audio_codec=audio_codec,)
class VideoComposeAgent:
def adjust_caption_config(self, width, height):
area_height = int(height * 0.06)
fontsize = int((width + height) / 2 * 0.025)
return {
"fontsize": fontsize,
"area_height": area_height
}
def call(self, pages, config):
height = config["image_generation"]["obj_cfg"]["height"]
width = config["image_generation"]["obj_cfg"]["width"]
config["caption_config"].update(self.adjust_caption_config(width, height))
compose_video(
story_dir=Path(config["story_dir"]),
save_path=Path(config["story_dir"]) / "output.mp4",
captions=pages,
music_path=Path(config["story_dir"]) / "music/music.wav",
num_pages=len(pages),
audio_sample_rate=config["audio_sample_rate"],
audio_codec=config["audio_codec"],
caption_config=config["caption_config"],
max_single_caption_length=config["max_single_caption_length"],
**config["slideshow_effect"]
)