kevinwang676's picture
Upload folder using huggingface_hub
6755a2d verified
raw
history blame
17.6 kB
from __future__ import annotations
import os
import logging
from typing import Dict, List, Union
import logging
import cv2
import numpy as np
from moviepy.editor import (
VideoFileClip,
concatenate_videoclips,
vfx,
CompositeVideoClip,
TextClip,
)
from ...data import Clip, ClipSeq
from ..config.CFG import CFG
from .face import face_roles2frames
from .video_process_with_moviepy import (
crop_edge_2_even,
crop_target_bbox,
get_sub_mvpclip_by_time,
)
from .vision_frame import (
Frame,
get_time_center_by_topkrole,
get_width_center_by_topkrole,
)
from ..utils.vision_util import round_up_to_even
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
class VideoClip(Clip):
"""可以自定义一些自己的处理方法,如台词渲染、片段拼接、等"""
def __init__(
self,
time_start: float,
duration: float,
clipid: int = None,
media_type: str = None,
mediaid: str = None,
media_name: str = None,
video_path: str = None,
roles: Dict[str, Dict[str, Dict[str, List[float]]]] = None,
scenes: List[Dict] = None,
background: str = None,
scene_roles: list = None,
timepoint_type: int = None,
text: str = None,
stage: str = None,
path: str = None,
duration_num: int = None,
similar_clipseq: list = None,
frames: List[Frame] = None,
offset: float = 0.2,
dynamic: float = None,
camera_move: str = None,
**kwargs,
):
"""视频拆片段后的类定义
Args:
scene (_type_, optional): _description_. Defaults to None.
video_path (_type_, optional): _description_. Defaults to None.
roles (_type_, optional): _description_. Defaults to None.
background (_type_, optional): _description_. Defaults to None.
roles_name (_type_, optional): _description_. Defaults to None.
offset (float, optional): 读取moviepy 视频片段后,做掐头去尾的偏移操作,
有利于解决夹帧问题,该参数表示减少的总时长,掐头去尾各一半时间. Defaults to None.
roles:
"roles": {
"9": { # 角色id
"bbox": { #
"839": [ # 帧id
[
461.8400573730469,
110.67144012451172,
685.0857543945312,
379.3414001464844
]
],
}
"name": "",
"conf_of_talking": -0.88,
}
}
}
"""
self.media_name = media_name
self.video_path = video_path
self.roles = roles
self.scenes = scenes
self.background = background
self.scenes_roles = scene_roles
self.frames = frames
self.camera_move = camera_move
self.offset = offset
super().__init__(
time_start,
duration,
clipid,
media_type,
mediaid,
timepoint_type,
text,
stage,
path,
duration_num,
similar_clipseq,
dynamic,
**kwargs,
)
self.preprocess()
def preprocess(self):
self.preprocess_clip()
self.preprocess_frames()
def preprocess_clip(self):
self.spread_parameters()
def preprocess_frames(self):
if self.frames is not None:
for frame in self.frames:
frame.preprocess()
def spread_parameters(self):
target_keys = [
"width",
"height",
"content_width",
"content_height",
"fps",
"frame_num",
]
if self.frames is not None:
for k in target_keys:
if k in self.__dict__ and self.__dict__[k] is not None:
for frame in self.frames:
frame.__setattr__(k, self.__dict__[k])
@property
def time_end(self):
return self.time_start + self.duration
def get_mvp_clip(
self,
):
"""获取当前Clip对应的moviepy的实际视频clip"""
return VideoFileClip(self.media_path).subclip(self.time_start, self.time_end)
def get_offset_mvp_clip(self, clip=None, offset: float = None):
if clip is None:
clip = self.get_mvp_clip()
if offset is None:
offset = self.offset
duration = clip.duration
time_start = max(0, offset / 2)
time_end = min(duration, duration - offset / 2)
clip = clip.subclip(time_start, time_end)
return clip
def get_clean_mvp_clip(self, clip=None):
"""获取处理干净的 moviepy.VideoClip
Args:
clip (VideoClip, optional): 待处理的VideoClip. Defaults to None.
Returns:
VideoClip: 处理干净的 moviepy.VideoClip
"""
if clip is None:
clip = self.get_mvp_clip()
# offset
clip = self.get_offset_mvp_clip(clip=clip, offset=self.offset)
# content
clip = self.get_content_clip(clip=clip)
if self.target_width_height_ratio is not None:
clip = self.get_target_width_height_ratio_clip(clip=clip)
# logger.debug(
# "after get_target_width_height_ratio_clip: width={}, height={}, duration={}".format(
# clip.w, clip.h, clip.duration
# )
# )
if self.target_width is not None and self.target_height is not None:
clip = self.get_target_width_height_clip(clip=clip)
logger.debug(
"after get_target_width_height_clip: width={}, height={}, duration={}".format(
clip.w, clip.h, clip.duration
)
)
# crop width and height to even
clip = crop_edge_2_even(clip=clip)
return clip
def get_content_clip(self, clip=None):
"""根据 content_box信息获取实际视频内容部分,非视频内容部分往往是黑边(可能含有字幕)
Args:
clip (moviepy.VideoClip, optional): 可能有黑边的 moviepy.VideoClip. Defaults to None.
Returns:
VideoClip: 对应视频实际内容的 moviepy.VideoClip
"""
if clip is None:
clip = self.get_mvp_clip()
# 获取内容部分,剔除黑边
if self.content_box is not None:
clip = crop_target_bbox(clip, self.content_box)
return clip
def get_target_width_height_ratio_clip(self, clip=None):
"""获取符合目标宽高比的内容部分,目前
1. 默认高度等于输入高度
2. 当有人脸框时,使用中间帧的宽中心作为获取中心;
2. 其他情况,使用图像中心位置
Args:
clip (moviepy.VideoClip, optional): 原始分辨率的视频内容. Defaults to None.
Returns:
moviepy.VideoClip: 符合宽高比目标的内容
"""
if clip is None:
clip = self.get_mvp_clip()
target_height = clip.h
target_width = round_up_to_even(target_height * self.target_width_height_ratio)
target_width = min(target_width, clip.w)
# TODO: 有待确定中间crop的使用范围
if self.roles is None or len(self.roles) == 0:
target_center_x = clip.w / 2
else:
target_center_x = get_width_center_by_topkrole(
objs=self.roles, coord_offset=[self.content_box[0], self.content_box[1]]
)
target_center_x = min(
max(target_width / 2, target_center_x),
clip.w - target_width / 2,
)
target_coord = [
target_center_x - target_width / 2,
0,
target_center_x + target_width / 2,
target_height,
]
clip = clip.crop(*target_coord)
return clip
def get_target_width_height_clip(self, clip=None):
"""获取符合目标宽高的内容部分
Args:
clip (moviepy.VideoClip, optional): 待处理的视频内容. Defaults to None.
Returns:fontsize
moviepy.VideoClip: 符合宽高目标的内容
"""
if clip is None:
clip = self.get_mvp_clip()
if self.target_width and self.target_height:
logger.debug(
"get_target_width_height_clip: clip.w={}, clip.h={}, target_width={}, target_height={}".format(
clip.w, clip.h, self.target_width, self.target_height
)
)
clip = clip.resize(newsize=(self.target_width, self.target_height))
return clip
def get_subclip_by_time(
self, final_duration: float, clip=None, method: str = "speed"
):
"""根据视频长度,对齐到指定长度
Args:
video_clips (list[VideoClipSeq]): 媒体文件片段序列
final_duration (float): 目标长度
mode (int, optional): how to chang video length. Defaults to `speed`.
speed: chang length by sample
cut: change length by cut middle length
None: change length accorrding difference of clip duration and final_duration. Defaults to None.
Raises:
NotImplementedError: _description_
Returns:
VideoClip: 读取、对齐后moviepy VideoClip
"""
if clip is None:
clip = self.get_clean_mvp_clip()
if self.roles is None or len(self.roles) == 0:
center_ratio = 0.5
else:
frame_idx = get_time_center_by_topkrole(self.roles)
center_ratio = (frame_idx - self.frame_start) / (
self.frame_end - self.frame_start
)
clip = get_sub_mvpclip_by_time(
clip=clip,
final_duration=final_duration,
method=method,
center_ratio=center_ratio,
)
if CFG.RunningMode == "DEBUG":
clip = self.vis(clip=clip)
return clip
def vis(self, clip=None):
"""将clip的原始信息可视化到左上角,开始点加红框
Args:
clip (moviepy.VideoClip): 待可视化的视频片段
Returns:
moviepy.VideoClip: 可视化后的视频片段
"""
if clip is None:
clip = self.get_clean_mvp_clip()
clip = self.vis_meta_info(clip=clip)
if CFG.VisFrame:
clip = self.vis_frames(clip=clip)
return clip
def vis_meta_info(self, clip):
txt = "videoname={}\nclipid={}\nt_start={}\nduration={}\n".format(
os.path.basename(self.media_name),
self.clipid,
self.time_start,
self.duration,
)
txt_clip = TextClip(
txt,
fontsize=CFG.DebugFontSize,
color=CFG.DebugTextColorsCls.color,
font=CFG.Font,
stroke_width=CFG.DebugTextStrokeWidth,
)
txt_clip = txt_clip.set_duration(clip.duration)
txt_clip = txt_clip.set_position(("left", "top"))
clip = CompositeVideoClip([clip, txt_clip])
return clip
def vis_frames(self, clip):
if self.frames is None:
return clip
for frame in self.frames:
clip = self.vis_frame(clip=clip, frame=frame)
return clip
def vis_frame(self, clip, frame):
txt = ""
txt += "\n{}".format(frame.shot_size)
txt_clip = TextClip(
txt,
fontsize=CFG.DebugFontSize,
color=CFG.DebugFrameTextColor,
font=CFG.Font,
stroke_width=CFG.DebugTextStrokeWidth,
)
txt_clip = txt_clip.set_duration(CFG.DebugFrameTextDuration)
txt_clip = txt_clip.set_position(("right", "top"))
# TODO: vis objs
clip = CompositeVideoClip(
[clip, txt_clip.set_start(frame.timestamp - self.time_start)]
)
return clip
@classmethod
def from_data(cls, data: Dict):
return VideoClip(**data)
@property
def roles_name(self) -> List:
if self.roles is None:
return []
roles_name = [r["name"] for r in self.roles if "name" in r]
return list(set(roles_name))
def has_roles_name(self, role_names: Union[str, List[str]], mode: str) -> bool:
if not isinstance(role_names, list):
role_names = [role_names]
role_set = set(role_names) & set(self.roles_name)
if mode == "all":
return len(role_set) == len(role_names)
elif mode == "least":
return len(role_set) > 0
else:
raise ValueError(f"mode only support least or all, but given {mode}")
class VideoClipSeq(ClipSeq):
"""可以自定义一些自己的处理方法,如台词渲染、片段拼接、等"""
def __init__(self, clipseq: List[VideoClip]) -> None:
super().__init__(clipseq)
def __getitem__(self, i: int) -> ClipSeq:
clipseq = super().__getitem__(i)
if isinstance(clipseq, Clip):
return clipseq
elif isinstance(clipseq, list):
return VideoClipSeq(clipseq)
else:
return VideoClipSeq(clipseq.clipseq)
def get_mvp_clip(self, method="rough"):
"""获取ClipSeq对应的 moviepy中的视频序列
Args:
method (str, optional): 支持 rough 和 fine 两种模式. Defaults to "rough".
rough: 获取clipseq中的开头结束时间,直接从视频文件中读取;,适用于整个clipseq都是同一个视频文件且连续;
fine: 每个clip分别从视频文件中获取,再拼在一起,适合clipseq中时间不连续或者含有多种视频;
Raises:
ValueError: 不支持的 moviepy.VideoClip 获取方式
Returns:
moviepy.VideoClip: clipseq 对应的 moviepy.VideoClip
"""
if method == "rough":
time_start = self.clipseq[0].time_start
time_end = self.clipseq[-1].time_end
video_path = self.clipseq[0].video_path
clip = VideoFileClip(video_path).subclip(time_start, time_end)
elif method == "fine":
clipseq = [c.get_mvp_clip() for c in self.clipseq]
clip = concatenate_videoclips(clipseq)
else:
raise ValueError(
"only support method=[rough, fine], but given {}".format(method)
)
return clip
def get_clean_mvp_clipseq(
self,
):
"""获取处理干净的 moviepy.VideoClip
Returns:
moviepy.VideoClip: 干净的 moviepy.VideoClip
"""
clipseq = [c.get_clean_mvp_clip() for c in self.clipseq]
return clipseq
def get_time_center(
self,
):
pass
def get_subclip_by_time(
self,
clipseq,
final_duration: float,
method: str = None,
):
"""根据视频长度,对齐到指定长度,现在默认每个clip按照比例取时长。
Args:
video_clips (list[VideoClipSeq]): 媒体文件片段序列
final_duration (float): 目标长度
method (int, optional): how to chang video length. Defaults to `None`.
speed: chang length by sample
cut: change length by cut middle length
None: change length accorrding difference of clip duration and final_duration. Defaults to None.
Returns:
VideoClip: 读取、对齐后 moviepy VideoClip
"""
clipseq_duration = np.sum([c.duration for c in clipseq])
final_duration_per_clip = [
c.duration / clipseq_duration * final_duration for c in clipseq
]
clipseq = [
self.clipseq[i].get_subclip_by_time(
final_duration=final_duration_per_clip[i],
method=method,
clip=clip,
)
for i, clip in enumerate(clipseq)
]
clip = concatenate_videoclips(clipseq)
clip = clip.fx(vfx.speedx, final_duration=final_duration)
return clip
def get_target_mvp_clip(
self,
final_duration: float,
method: str = None,
):
"""获取符合所有目标的 moviepy.VideoClip,当前目标有
1. 去黑边、值得宽高比、值得宽高
2. 指定时间长度
Args:
final_duration (float): 目标长度
method (str, optional): 时间变长的方法. Defaults to None.
Returns:
moviepy.VideoClip: 符合目标的moviepy.VideoClip
"""
clipseq = self.get_clean_mvp_clipseq()
clip = self.get_subclip_by_time(
clipseq=clipseq, final_duration=final_duration, method=method
)
return clip
@classmethod
def from_data(cls, clipseq: List[Dict]) -> VideoClipSeq:
new_clipseq = []
for clip in clipseq:
video_clip = VideoClip.from_data(clip)
new_clipseq.append(video_clip)
video_clipseq = VideoClipSeq(new_clipseq)
return video_clipseq