Spaces:
Sleeping
Sleeping
import gc | |
import os | |
from pathlib import Path | |
import cv2 | |
from PIL import Image | |
from tqdm import tqdm | |
from .preprocess_dir.utils import crop_with_fan as cwf | |
from .preprocess_dir.utils import face_finder as ff | |
from .util import * | |
# template video ์ ์ฒ๋ฆฌ | |
# preprocess_template_old(๊ธฐ์กดํจ์) ์ ๊ธฐ๋ฅ์ ๋์ผํ๊ณ , ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋ ์ค์ | |
def preprocess_template( | |
config_path, | |
template_video_path, | |
reference_face, | |
work_root_path, | |
device, | |
template_frame_ratio=1.0, | |
template_video_ratio=[1.0], | |
callback=None, | |
verbose=False, | |
save_frames=True, | |
silent_video_path=None, | |
no_infer_frames=[], | |
): | |
"""template video ์ ์ฒ๋ฆฌ | |
Parameters | |
---------- | |
config_path (str) : ์ค์ ํ์ผ ๊ฒฝ๋ก | |
template_video_path (str) : ํ ํ๋ฆฟ ์์ ๊ฒฝ๋ก | |
reference_face : (str) : ์ฐธ๊ณ ํ ์ผ๊ตด ์ด๋ฏธ์ง ๊ฒฝ๋ก | |
work_root_path (str) : ์์ ํด๋ ๊ฒฝ๋ก. ์ ์ฒ๋ฆฌ ์ ๋ณด๊ฐ ์ ์ฅ๋จ. | |
device (str) : device ์ ๋ณด. ex) cuda:0 | |
template_frame_ratio (float) : ํ ํ๋ฆฟ ๋น๋์ค resize ๋น์จ. 1.0: ์์ ๊ทธ๋๋ก ์ฌ์ฉ | |
template_video_ratio (list[float]) : ํ ํ๋ฆฟ ๋น๋์ค resize ๋น์จ. 1.0: ์์ ๊ทธ๋๋ก ์ฌ์ฉ | |
save_frames (bool) : ํ ํ๋ฆฟ ๋น๋์ค ํ๋ ์ ์ ์ฅ์ฌ๋ถ | |
no_infer_frames (list[tuple[int,int]]) : ์ถ๋ก ์ ์ฌ์ฉ๋์ง ์๋frame ๊ตฌ๊ฐ. ์์์ ํฌํจ, ๋์ ํฌํจ๋์ง ์์. | |
""" | |
load_gpu = False | |
config = read_config(config_path) | |
image_size = config.img_size | |
callback1 = callback_inter( | |
callback, min_per=0, max_per=2, desc="preprocess_template 1", verbose=verbose | |
) | |
callback2 = callback_inter( | |
callback, min_per=2, max_per=20, desc="preprocess_template 2", verbose=verbose | |
) | |
callback3 = callback_inter( | |
callback, min_per=20, max_per=100, desc="preprocess_template 3", verbose=verbose | |
) | |
preprocess_dir = get_preprocess_dir(work_root_path, config.name) | |
Path(preprocess_dir).mkdir(exist_ok=True, parents=True) | |
# snow : for debug | |
if verbose: | |
print("preprocess_dir: ", preprocess_dir, ", work_root_path:", work_root_path) | |
# ์ ์ฒ๋ฆฌ ํ์ผ ๊ฒฝ๋ก | |
crop_mp4 = get_crop_mp4_dir(preprocess_dir, template_video_path) | |
if not Path(crop_mp4).exists(): | |
load_gpu = True | |
ff.init_face_finder(device) | |
cwf.init_fan(device) | |
if verbose: | |
print("ํ ํ๋ฆฟ ๋น๋์ค ์ฒ๋ฆฌ ... ") | |
# ์๋์ด์ ์ผ๊ตด ์ ๋ณด๋ฅผ ๊ตฌํ๋ค. | |
df_face, imgs = ff.find_face(reference_face) | |
callback1(100) # ์งํ์จ์ ์๋ ค์ค๋ค. | |
g_anchor_ebd = df_face["ebd"].values[0] | |
# ํ ํ๋ฆฟ ๋์์์์ ์๋์ด์ ์ผ๊ตด ์์น๋ง ์ ์ฅํด ๋๋๋ค | |
df_paths = ff.save_face_info3( | |
template_video_path, | |
g_anchor_ebd, | |
config.move, | |
base=preprocess_dir, | |
callback=callback2, | |
verbose=verbose, | |
) | |
### ์ผ๊ตด ์์ญ์ FAN ๋๋๋งํฌ ๊ธฐ๋ฐ์ผ๋ก ํฌ๋กญํด ๋๋๋ค | |
assert len(df_paths) == 1 | |
if config.move: | |
if verbose: | |
print("cwf.save_crop_info_move --") | |
df_fan_path = cwf.save_crop_info_move( | |
image_size=image_size, | |
anchor_box_path=df_paths[0], | |
mp4_path=template_video_path, | |
out_dir=crop_mp4, | |
crop_offset_y=config.crop_offset_y, | |
crop_margin=config.crop_margin, | |
callback=callback3, | |
verbose=verbose, | |
) | |
else: | |
if verbose: | |
print("cwf.save_crop_info2 --") | |
df_fan_path = cwf.save_crop_info2( | |
image_size=image_size, | |
anchor_box_path=df_paths[0], | |
mp4_path=template_video_path, | |
out_dir=crop_mp4, | |
crop_offset_y=config.crop_offset_y, | |
crop_margin=config.crop_margin, | |
no_infer_frames=no_infer_frames, | |
callback=callback3, | |
verbose=verbose, | |
) | |
# snow : for debug | |
if verbose: | |
print("df_fan_path: ", df_fan_path) | |
ff.del_face_finder() | |
cwf.del_fan() | |
else: | |
if verbose: | |
print("์ ์ฒ๋ฆฌ๊ฐ ์ด๋ฏธ ๋์ด์์") | |
callback3(100) | |
# 1. save frames for stf | |
if save_frames: | |
frame_dir = get_frame_dir( | |
preprocess_dir, template_video_path, ratio=template_frame_ratio | |
) | |
if verbose: | |
print("frame_dir:", frame_dir) | |
save_template_frames( | |
template_video_path=template_video_path, | |
template_frames_path=frame_dir, | |
ratio=template_frame_ratio, | |
save_in_video=False, | |
verbose=verbose, | |
) | |
if silent_video_path is not None: | |
frame_dir = get_frame_dir( | |
preprocess_dir, silent_video_path, ratio=template_frame_ratio | |
) | |
save_template_frames( | |
template_video_path=silent_video_path, | |
template_frames_path=frame_dir, | |
ratio=template_frame_ratio, | |
save_in_video=False, | |
verbose=verbose, | |
) | |
if template_video_path.endswith(".mov"): | |
# TODO snow : ์ฑ๋ฅ ํ์ธ ํ์. | |
# ์ง๊ธ์ mov ์ธ ๊ฒฝ์ฐ๋ง ํ์ผ์ ์ ์ฅํ๋ค. ์ถ๋ก ํ ๋ ๋๋ ค์ ๋ผ๊ณ ํ๋ค. by hojin | |
# 2. save video for encoding | |
for video_ratio in template_video_ratio: | |
if video_ratio != 1.0: | |
out_path = get_template_ratio_file_path( | |
preprocess_dir, template_video_path, ratio=video_ratio | |
) | |
save_template_frames( | |
template_video_path=template_video_path, | |
template_frames_path="", | |
template_video_path_with_ratio=out_path, | |
ratio=video_ratio, | |
save_in_video=True, | |
verbose=verbose, | |
) | |
if silent_video_path is not None: | |
out_path = get_template_ratio_file_path( | |
preprocess_dir, silent_video_path, ratio=video_ratio | |
) | |
save_template_webm_ratio( | |
template_video_path=silent_video_path, | |
ratio=video_ratio, | |
out_path=out_path, | |
verbose=verbose, | |
) | |
if template_video_path.endswith(".webm"): | |
# TODO snow : ์ฑ๋ฅ ํ์ธ ํ์. ratio ๊ฐ์๋งํผ webm ์ ๋ง๋ ๋ค. | |
for video_ratio in template_video_ratio: | |
out_path = get_template_ratio_file_path( | |
preprocess_dir, template_video_path, ratio=video_ratio | |
) | |
save_template_webm_ratio( | |
template_video_path=template_video_path, | |
ratio=video_ratio, | |
out_path=out_path, | |
verbose=verbose, | |
) | |
if silent_video_path is not None: | |
out_path = get_template_ratio_file_path( | |
preprocess_dir, silent_video_path, ratio=video_ratio | |
) | |
save_template_webm_ratio( | |
template_video_path=silent_video_path, | |
ratio=video_ratio, | |
out_path=out_path, | |
verbose=verbose, | |
) | |
gc.collect() | |
return load_gpu | |
# snow: webm ํ ํ๋ฆฟ์ ratio ๋ณ๋ก resize ํ์ฌ ์ ์ฅํ๋ ํจ์ | |
def save_template_webm_ratio(template_video_path, ratio, out_path, verbose): | |
def resize_(size, img): | |
w, h = size | |
img = cv2.resize(img, (w, h), inter_alg_(w, h, img)) | |
return img | |
def inter_alg_(w, h, img): | |
if w * h < img.shape[0] * img.shape[1]: | |
return cv2.INTER_AREA | |
else: | |
return cv2.INTER_CUBIC | |
os.makedirs(os.path.dirname(out_path), exist_ok=True) | |
reader, meta = get_four_channel_ffmpeg_reader(template_video_path) | |
if Path(out_path).exists(): | |
if verbose: | |
print(f"ratio ํ์ผ์ด ์ ์ฅ๋์ด ์์, {out_path}") | |
return | |
if verbose: | |
print(f"webm ratio template, org:{template_video_path}, ratio:{ratio}") | |
size_org = meta["size"] | |
size = list(int(round(ratio * v)) // 2 * 2 for v in size_org) | |
writer = get_webm_ffmpeg_writer( | |
out_path, size=size, fps=meta["fps"], wav_path=template_video_path | |
) | |
writer.send(None) # seed the generator | |
total_cnt, _ = imageio_ffmpeg.count_frames_and_secs(template_video_path) | |
for idx, f in tqdm( | |
enumerate(reader), total=total_cnt, desc=f"save webm ratio:{ratio}, size:{size}" | |
): | |
f = np.frombuffer(f, dtype=np.uint8) | |
f = f.reshape(size_org[1], size_org[0], 4) | |
f = resize_(size, f) | |
writer.send(f) # seed the generator | |
writer.close() | |
# hojin | |
# png frame ์ถ์ถ + crop | |
def save_template_frames( | |
template_video_path, | |
template_frames_path, | |
template_video_path_with_ratio=None, | |
ratio=1.0, | |
save_in_video=False, | |
verbose=False, | |
): | |
def inter_alg_(w, h, img): | |
if w * h < img.shape[0] * img.shape[1]: | |
return cv2.INTER_AREA | |
else: | |
return cv2.INTER_CUBIC | |
def resize_(size, img): | |
w, h = size | |
img = cv2.resize(img, (w, h), inter_alg_(w, h, img)) | |
return img | |
# hojin: ํ ํ๋ฆฟ์ ํ๋ ์๋ณ๋ก ์ ์ฅํด๋๊ธฐ -> write_video_in_thread์์ reader ์ฌ์ฉํ์ง ์๊ธฐ ์ํจ | |
if save_in_video == False: | |
if Path(template_frames_path).exists(): | |
if verbose: | |
print("ํ๋ ์์ด ๋ชจ๋ ์ ์ฅ๋์ด ์์") | |
return | |
else: | |
if Path(template_video_path_with_ratio).exists(): | |
if verbose: | |
print("๋น๋์ค๊ฐ ์์ฑ๋์ด ์์") | |
return | |
os.makedirs(os.path.dirname(template_video_path_with_ratio), exist_ok=True) | |
if template_video_path.endswith(".mov") or template_video_path.endswith(".webm"): | |
reader, meta = get_four_channel_ffmpeg_reader(template_video_path) | |
else: # mp4 | |
reader, meta = get_three_channel_ffmpeg_reader(template_video_path) | |
size_org = meta["size"] | |
size = list(int(round(ratio * v)) // 2 * 2 for v in size_org) | |
fps = meta["fps"] | |
if verbose: | |
print(meta) | |
total_cnt, _ = imageio_ffmpeg.count_frames_and_secs(template_video_path) | |
if save_in_video is False: | |
Path(template_frames_path).mkdir(exist_ok=True, parents=True) | |
# hojin: ์ถ์ถํ ํ๋ ์์ ๋ด๋ณด๋ด๊ธฐ๋ฅผ ์ํด์ ๋ค์ mov๋ก ๋ง๋ค์ด๋๊ธฐ (ratio<1.0) | |
writer = None | |
if verbose: | |
print("template_frames_path: ", template_frames_path) | |
for idx, f in tqdm( | |
enumerate(reader), | |
total=total_cnt, | |
desc=f"save frames f{ratio}" | |
if save_in_video is False | |
else f"save video f{ratio}", | |
): | |
name = f"""{idx:05d}.webp""" | |
f = np.frombuffer(f, dtype=np.uint8) | |
f = f.reshape( | |
size_org[1], size_org[0], 3 if template_video_path.endswith(".mp4") else 4 | |
) | |
f = resize_(size, f) | |
if save_in_video is False: | |
f = np.ascontiguousarray(f) | |
f = Image.fromarray( | |
f, mode="RGB" if template_video_path.endswith(".mp4") else "RGBA" | |
) | |
f.save( | |
str(Path(template_frames_path) / str(name)), format="png", lossless=True | |
) | |
# cv2.imwrite(str(Path(template_frames_path) / str(name)), f[:, :, [2, 1, 0, 3]], [int(cv2.IMWRITE_PNG_COMPRESSION), 3]) | |
if writer is None and save_in_video is True: | |
if ratio != 1.0: | |
writer = imageio_ffmpeg.write_frames( | |
template_video_path_with_ratio, | |
size=size, | |
fps=fps, | |
quality=10, | |
pix_fmt_in="rgba", | |
pix_fmt_out="rgba", | |
codec="png", | |
macro_block_size=1, | |
) | |
writer.send(None) | |
if writer: | |
writer.send(f) | |
if writer: | |
writer.close() | |
# hojin end | |