Spaces:
No application file
No application file
from __future__ import annotations | |
import os | |
import logging | |
from typing import Dict, List, Union | |
import logging | |
import cv2 | |
import numpy as np | |
from moviepy.editor import ( | |
VideoFileClip, | |
concatenate_videoclips, | |
vfx, | |
CompositeVideoClip, | |
TextClip, | |
) | |
from ...data import Clip, ClipSeq | |
from ..config.CFG import CFG | |
from .face import face_roles2frames | |
from .video_process_with_moviepy import ( | |
crop_edge_2_even, | |
crop_target_bbox, | |
get_sub_mvpclip_by_time, | |
) | |
from .vision_frame import ( | |
Frame, | |
get_time_center_by_topkrole, | |
get_width_center_by_topkrole, | |
) | |
from ..utils.vision_util import round_up_to_even | |
logger = logging.getLogger(__name__) # pylint: disable=invalid-name | |
class VideoClip(Clip): | |
"""可以自定义一些自己的处理方法,如台词渲染、片段拼接、等""" | |
def __init__( | |
self, | |
time_start: float, | |
duration: float, | |
clipid: int = None, | |
media_type: str = None, | |
mediaid: str = None, | |
media_name: str = None, | |
video_path: str = None, | |
roles: Dict[str, Dict[str, Dict[str, List[float]]]] = None, | |
scenes: List[Dict] = None, | |
background: str = None, | |
scene_roles: list = None, | |
timepoint_type: int = None, | |
text: str = None, | |
stage: str = None, | |
path: str = None, | |
duration_num: int = None, | |
similar_clipseq: list = None, | |
frames: List[Frame] = None, | |
offset: float = 0.2, | |
dynamic: float = None, | |
camera_move: str = None, | |
**kwargs, | |
): | |
"""视频拆片段后的类定义 | |
Args: | |
scene (_type_, optional): _description_. Defaults to None. | |
video_path (_type_, optional): _description_. Defaults to None. | |
roles (_type_, optional): _description_. Defaults to None. | |
background (_type_, optional): _description_. Defaults to None. | |
roles_name (_type_, optional): _description_. Defaults to None. | |
offset (float, optional): 读取moviepy 视频片段后,做掐头去尾的偏移操作, | |
有利于解决夹帧问题,该参数表示减少的总时长,掐头去尾各一半时间. Defaults to None. | |
roles: | |
"roles": { | |
"9": { # 角色id | |
"bbox": { # | |
"839": [ # 帧id | |
[ | |
461.8400573730469, | |
110.67144012451172, | |
685.0857543945312, | |
379.3414001464844 | |
] | |
], | |
} | |
"name": "", | |
"conf_of_talking": -0.88, | |
} | |
} | |
} | |
""" | |
self.media_name = media_name | |
self.video_path = video_path | |
self.roles = roles | |
self.scenes = scenes | |
self.background = background | |
self.scenes_roles = scene_roles | |
self.frames = frames | |
self.camera_move = camera_move | |
self.offset = offset | |
super().__init__( | |
time_start, | |
duration, | |
clipid, | |
media_type, | |
mediaid, | |
timepoint_type, | |
text, | |
stage, | |
path, | |
duration_num, | |
similar_clipseq, | |
dynamic, | |
**kwargs, | |
) | |
self.preprocess() | |
def preprocess(self): | |
self.preprocess_clip() | |
self.preprocess_frames() | |
def preprocess_clip(self): | |
self.spread_parameters() | |
def preprocess_frames(self): | |
if self.frames is not None: | |
for frame in self.frames: | |
frame.preprocess() | |
def spread_parameters(self): | |
target_keys = [ | |
"width", | |
"height", | |
"content_width", | |
"content_height", | |
"fps", | |
"frame_num", | |
] | |
if self.frames is not None: | |
for k in target_keys: | |
if k in self.__dict__ and self.__dict__[k] is not None: | |
for frame in self.frames: | |
frame.__setattr__(k, self.__dict__[k]) | |
def time_end(self): | |
return self.time_start + self.duration | |
def get_mvp_clip( | |
self, | |
): | |
"""获取当前Clip对应的moviepy的实际视频clip""" | |
return VideoFileClip(self.media_path).subclip(self.time_start, self.time_end) | |
def get_offset_mvp_clip(self, clip=None, offset: float = None): | |
if clip is None: | |
clip = self.get_mvp_clip() | |
if offset is None: | |
offset = self.offset | |
duration = clip.duration | |
time_start = max(0, offset / 2) | |
time_end = min(duration, duration - offset / 2) | |
clip = clip.subclip(time_start, time_end) | |
return clip | |
def get_clean_mvp_clip(self, clip=None): | |
"""获取处理干净的 moviepy.VideoClip | |
Args: | |
clip (VideoClip, optional): 待处理的VideoClip. Defaults to None. | |
Returns: | |
VideoClip: 处理干净的 moviepy.VideoClip | |
""" | |
if clip is None: | |
clip = self.get_mvp_clip() | |
# offset | |
clip = self.get_offset_mvp_clip(clip=clip, offset=self.offset) | |
# content | |
clip = self.get_content_clip(clip=clip) | |
if self.target_width_height_ratio is not None: | |
clip = self.get_target_width_height_ratio_clip(clip=clip) | |
# logger.debug( | |
# "after get_target_width_height_ratio_clip: width={}, height={}, duration={}".format( | |
# clip.w, clip.h, clip.duration | |
# ) | |
# ) | |
if self.target_width is not None and self.target_height is not None: | |
clip = self.get_target_width_height_clip(clip=clip) | |
logger.debug( | |
"after get_target_width_height_clip: width={}, height={}, duration={}".format( | |
clip.w, clip.h, clip.duration | |
) | |
) | |
# crop width and height to even | |
clip = crop_edge_2_even(clip=clip) | |
return clip | |
def get_content_clip(self, clip=None): | |
"""根据 content_box信息获取实际视频内容部分,非视频内容部分往往是黑边(可能含有字幕) | |
Args: | |
clip (moviepy.VideoClip, optional): 可能有黑边的 moviepy.VideoClip. Defaults to None. | |
Returns: | |
VideoClip: 对应视频实际内容的 moviepy.VideoClip | |
""" | |
if clip is None: | |
clip = self.get_mvp_clip() | |
# 获取内容部分,剔除黑边 | |
if self.content_box is not None: | |
clip = crop_target_bbox(clip, self.content_box) | |
return clip | |
def get_target_width_height_ratio_clip(self, clip=None): | |
"""获取符合目标宽高比的内容部分,目前 | |
1. 默认高度等于输入高度 | |
2. 当有人脸框时,使用中间帧的宽中心作为获取中心; | |
2. 其他情况,使用图像中心位置 | |
Args: | |
clip (moviepy.VideoClip, optional): 原始分辨率的视频内容. Defaults to None. | |
Returns: | |
moviepy.VideoClip: 符合宽高比目标的内容 | |
""" | |
if clip is None: | |
clip = self.get_mvp_clip() | |
target_height = clip.h | |
target_width = round_up_to_even(target_height * self.target_width_height_ratio) | |
target_width = min(target_width, clip.w) | |
# TODO: 有待确定中间crop的使用范围 | |
if self.roles is None or len(self.roles) == 0: | |
target_center_x = clip.w / 2 | |
else: | |
target_center_x = get_width_center_by_topkrole( | |
objs=self.roles, coord_offset=[self.content_box[0], self.content_box[1]] | |
) | |
target_center_x = min( | |
max(target_width / 2, target_center_x), | |
clip.w - target_width / 2, | |
) | |
target_coord = [ | |
target_center_x - target_width / 2, | |
0, | |
target_center_x + target_width / 2, | |
target_height, | |
] | |
clip = clip.crop(*target_coord) | |
return clip | |
def get_target_width_height_clip(self, clip=None): | |
"""获取符合目标宽高的内容部分 | |
Args: | |
clip (moviepy.VideoClip, optional): 待处理的视频内容. Defaults to None. | |
Returns:fontsize | |
moviepy.VideoClip: 符合宽高目标的内容 | |
""" | |
if clip is None: | |
clip = self.get_mvp_clip() | |
if self.target_width and self.target_height: | |
logger.debug( | |
"get_target_width_height_clip: clip.w={}, clip.h={}, target_width={}, target_height={}".format( | |
clip.w, clip.h, self.target_width, self.target_height | |
) | |
) | |
clip = clip.resize(newsize=(self.target_width, self.target_height)) | |
return clip | |
def get_subclip_by_time( | |
self, final_duration: float, clip=None, method: str = "speed" | |
): | |
"""根据视频长度,对齐到指定长度 | |
Args: | |
video_clips (list[VideoClipSeq]): 媒体文件片段序列 | |
final_duration (float): 目标长度 | |
mode (int, optional): how to chang video length. Defaults to `speed`. | |
speed: chang length by sample | |
cut: change length by cut middle length | |
None: change length accorrding difference of clip duration and final_duration. Defaults to None. | |
Raises: | |
NotImplementedError: _description_ | |
Returns: | |
VideoClip: 读取、对齐后moviepy VideoClip | |
""" | |
if clip is None: | |
clip = self.get_clean_mvp_clip() | |
if self.roles is None or len(self.roles) == 0: | |
center_ratio = 0.5 | |
else: | |
frame_idx = get_time_center_by_topkrole(self.roles) | |
center_ratio = (frame_idx - self.frame_start) / ( | |
self.frame_end - self.frame_start | |
) | |
clip = get_sub_mvpclip_by_time( | |
clip=clip, | |
final_duration=final_duration, | |
method=method, | |
center_ratio=center_ratio, | |
) | |
if CFG.RunningMode == "DEBUG": | |
clip = self.vis(clip=clip) | |
return clip | |
def vis(self, clip=None): | |
"""将clip的原始信息可视化到左上角,开始点加红框 | |
Args: | |
clip (moviepy.VideoClip): 待可视化的视频片段 | |
Returns: | |
moviepy.VideoClip: 可视化后的视频片段 | |
""" | |
if clip is None: | |
clip = self.get_clean_mvp_clip() | |
clip = self.vis_meta_info(clip=clip) | |
if CFG.VisFrame: | |
clip = self.vis_frames(clip=clip) | |
return clip | |
def vis_meta_info(self, clip): | |
txt = "videoname={}\nclipid={}\nt_start={}\nduration={}\n".format( | |
os.path.basename(self.media_name), | |
self.clipid, | |
self.time_start, | |
self.duration, | |
) | |
txt_clip = TextClip( | |
txt, | |
fontsize=CFG.DebugFontSize, | |
color=CFG.DebugTextColorsCls.color, | |
font=CFG.Font, | |
stroke_width=CFG.DebugTextStrokeWidth, | |
) | |
txt_clip = txt_clip.set_duration(clip.duration) | |
txt_clip = txt_clip.set_position(("left", "top")) | |
clip = CompositeVideoClip([clip, txt_clip]) | |
return clip | |
def vis_frames(self, clip): | |
if self.frames is None: | |
return clip | |
for frame in self.frames: | |
clip = self.vis_frame(clip=clip, frame=frame) | |
return clip | |
def vis_frame(self, clip, frame): | |
txt = "" | |
txt += "\n{}".format(frame.shot_size) | |
txt_clip = TextClip( | |
txt, | |
fontsize=CFG.DebugFontSize, | |
color=CFG.DebugFrameTextColor, | |
font=CFG.Font, | |
stroke_width=CFG.DebugTextStrokeWidth, | |
) | |
txt_clip = txt_clip.set_duration(CFG.DebugFrameTextDuration) | |
txt_clip = txt_clip.set_position(("right", "top")) | |
# TODO: vis objs | |
clip = CompositeVideoClip( | |
[clip, txt_clip.set_start(frame.timestamp - self.time_start)] | |
) | |
return clip | |
def from_data(cls, data: Dict): | |
return VideoClip(**data) | |
def roles_name(self) -> List: | |
if self.roles is None: | |
return [] | |
roles_name = [r["name"] for r in self.roles if "name" in r] | |
return list(set(roles_name)) | |
def has_roles_name(self, role_names: Union[str, List[str]], mode: str) -> bool: | |
if not isinstance(role_names, list): | |
role_names = [role_names] | |
role_set = set(role_names) & set(self.roles_name) | |
if mode == "all": | |
return len(role_set) == len(role_names) | |
elif mode == "least": | |
return len(role_set) > 0 | |
else: | |
raise ValueError(f"mode only support least or all, but given {mode}") | |
class VideoClipSeq(ClipSeq): | |
"""可以自定义一些自己的处理方法,如台词渲染、片段拼接、等""" | |
def __init__(self, clipseq: List[VideoClip]) -> None: | |
super().__init__(clipseq) | |
def __getitem__(self, i: int) -> ClipSeq: | |
clipseq = super().__getitem__(i) | |
if isinstance(clipseq, Clip): | |
return clipseq | |
elif isinstance(clipseq, list): | |
return VideoClipSeq(clipseq) | |
else: | |
return VideoClipSeq(clipseq.clipseq) | |
def get_mvp_clip(self, method="rough"): | |
"""获取ClipSeq对应的 moviepy中的视频序列 | |
Args: | |
method (str, optional): 支持 rough 和 fine 两种模式. Defaults to "rough". | |
rough: 获取clipseq中的开头结束时间,直接从视频文件中读取;,适用于整个clipseq都是同一个视频文件且连续; | |
fine: 每个clip分别从视频文件中获取,再拼在一起,适合clipseq中时间不连续或者含有多种视频; | |
Raises: | |
ValueError: 不支持的 moviepy.VideoClip 获取方式 | |
Returns: | |
moviepy.VideoClip: clipseq 对应的 moviepy.VideoClip | |
""" | |
if method == "rough": | |
time_start = self.clipseq[0].time_start | |
time_end = self.clipseq[-1].time_end | |
video_path = self.clipseq[0].video_path | |
clip = VideoFileClip(video_path).subclip(time_start, time_end) | |
elif method == "fine": | |
clipseq = [c.get_mvp_clip() for c in self.clipseq] | |
clip = concatenate_videoclips(clipseq) | |
else: | |
raise ValueError( | |
"only support method=[rough, fine], but given {}".format(method) | |
) | |
return clip | |
def get_clean_mvp_clipseq( | |
self, | |
): | |
"""获取处理干净的 moviepy.VideoClip | |
Returns: | |
moviepy.VideoClip: 干净的 moviepy.VideoClip | |
""" | |
clipseq = [c.get_clean_mvp_clip() for c in self.clipseq] | |
return clipseq | |
def get_time_center( | |
self, | |
): | |
pass | |
def get_subclip_by_time( | |
self, | |
clipseq, | |
final_duration: float, | |
method: str = None, | |
): | |
"""根据视频长度,对齐到指定长度,现在默认每个clip按照比例取时长。 | |
Args: | |
video_clips (list[VideoClipSeq]): 媒体文件片段序列 | |
final_duration (float): 目标长度 | |
method (int, optional): how to chang video length. Defaults to `None`. | |
speed: chang length by sample | |
cut: change length by cut middle length | |
None: change length accorrding difference of clip duration and final_duration. Defaults to None. | |
Returns: | |
VideoClip: 读取、对齐后 moviepy VideoClip | |
""" | |
clipseq_duration = np.sum([c.duration for c in clipseq]) | |
final_duration_per_clip = [ | |
c.duration / clipseq_duration * final_duration for c in clipseq | |
] | |
clipseq = [ | |
self.clipseq[i].get_subclip_by_time( | |
final_duration=final_duration_per_clip[i], | |
method=method, | |
clip=clip, | |
) | |
for i, clip in enumerate(clipseq) | |
] | |
clip = concatenate_videoclips(clipseq) | |
clip = clip.fx(vfx.speedx, final_duration=final_duration) | |
return clip | |
def get_target_mvp_clip( | |
self, | |
final_duration: float, | |
method: str = None, | |
): | |
"""获取符合所有目标的 moviepy.VideoClip,当前目标有 | |
1. 去黑边、值得宽高比、值得宽高 | |
2. 指定时间长度 | |
Args: | |
final_duration (float): 目标长度 | |
method (str, optional): 时间变长的方法. Defaults to None. | |
Returns: | |
moviepy.VideoClip: 符合目标的moviepy.VideoClip | |
""" | |
clipseq = self.get_clean_mvp_clipseq() | |
clip = self.get_subclip_by_time( | |
clipseq=clipseq, final_duration=final_duration, method=method | |
) | |
return clip | |
def from_data(cls, clipseq: List[Dict]) -> VideoClipSeq: | |
new_clipseq = [] | |
for clip in clipseq: | |
video_clip = VideoClip.from_data(clip) | |
new_clipseq.append(video_clip) | |
video_clipseq = VideoClipSeq(new_clipseq) | |
return video_clipseq | |