yerang's picture
Upload 1110 files
e3af00f verified
raw
history blame
12.3 kB
import gc
import os
from pathlib import Path
import cv2
from PIL import Image
from tqdm import tqdm
from .preprocess_dir.utils import crop_with_fan as cwf
from .preprocess_dir.utils import face_finder as ff
from .util import *
# template video ์ „์ฒ˜๋ฆฌ
# preprocess_template_old(๊ธฐ์กดํ•จ์ˆ˜) ์™€ ๊ธฐ๋Šฅ์€ ๋™์ผํ•˜๊ณ , ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰ ์ค„์ž„
def preprocess_template(
config_path,
template_video_path,
reference_face,
work_root_path,
device,
template_frame_ratio=1.0,
template_video_ratio=[1.0],
callback=None,
verbose=False,
save_frames=True,
silent_video_path=None,
no_infer_frames=[],
):
"""template video ์ „์ฒ˜๋ฆฌ
Parameters
----------
config_path (str) : ์„ค์ •ํŒŒ์ผ ๊ฒฝ๋กœ
template_video_path (str) : ํ…œํ”Œ๋ฆฟ ์˜์ƒ ๊ฒฝ๋กœ
reference_face : (str) : ์ฐธ๊ณ ํ•  ์–ผ๊ตด ์ด๋ฏธ์ง€ ๊ฒฝ๋กœ
work_root_path (str) : ์ž‘์—…ํด๋” ๊ฒฝ๋กœ. ์ „์ฒ˜๋ฆฌ ์ •๋ณด๊ฐ€ ์ €์žฅ๋จ.
device (str) : device ์ •๋ณด. ex) cuda:0
template_frame_ratio (float) : ํ…œํ”Œ๋ฆฟ ๋น„๋””์˜ค resize ๋น„์œจ. 1.0: ์˜์ƒ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉ
template_video_ratio (list[float]) : ํ…œํ”Œ๋ฆฟ ๋น„๋””์˜ค resize ๋น„์œจ. 1.0: ์˜์ƒ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉ
save_frames (bool) : ํ…œํ”Œ๋ฆฟ ๋น„๋””์˜ค ํ”„๋ ˆ์ž„ ์ €์žฅ์—ฌ๋ถ€
no_infer_frames (list[tuple[int,int]]) : ์ถ”๋ก ์— ์‚ฌ์šฉ๋˜์ง€ ์•Š๋Š”frame ๊ตฌ๊ฐ„. ์‹œ์ž‘์€ ํฌํ•จ, ๋์€ ํฌํ•จ๋˜์ง€ ์•Š์Œ.
"""
load_gpu = False
config = read_config(config_path)
image_size = config.img_size
callback1 = callback_inter(
callback, min_per=0, max_per=2, desc="preprocess_template 1", verbose=verbose
)
callback2 = callback_inter(
callback, min_per=2, max_per=20, desc="preprocess_template 2", verbose=verbose
)
callback3 = callback_inter(
callback, min_per=20, max_per=100, desc="preprocess_template 3", verbose=verbose
)
preprocess_dir = get_preprocess_dir(work_root_path, config.name)
Path(preprocess_dir).mkdir(exist_ok=True, parents=True)
# snow : for debug
if verbose:
print("preprocess_dir: ", preprocess_dir, ", work_root_path:", work_root_path)
# ์ „์ฒ˜๋ฆฌ ํŒŒ์ผ ๊ฒฝ๋กœ
crop_mp4 = get_crop_mp4_dir(preprocess_dir, template_video_path)
if not Path(crop_mp4).exists():
load_gpu = True
ff.init_face_finder(device)
cwf.init_fan(device)
if verbose:
print("ํ…œํ”Œ๋ฆฟ ๋น„๋””์˜ค ์ฒ˜๋ฆฌ ... ")
# ์•„๋‚˜์šด์„œ ์–ผ๊ตด ์ •๋ณด๋ฅผ ๊ตฌํ•œ๋‹ค.
df_face, imgs = ff.find_face(reference_face)
callback1(100) # ์ง„ํ–‰์œจ์„ ์•Œ๋ ค์ค€๋‹ค.
g_anchor_ebd = df_face["ebd"].values[0]
# ํ…œํ”Œ๋ฆฟ ๋™์˜์ƒ์—์„œ ์•„๋‚˜์šด์„œ ์–ผ๊ตด ์œ„์น˜๋งŒ ์ €์žฅํ•ด ๋†“๋Š”๋‹ค
df_paths = ff.save_face_info3(
template_video_path,
g_anchor_ebd,
config.move,
base=preprocess_dir,
callback=callback2,
verbose=verbose,
)
### ์–ผ๊ตด ์˜์—ญ์„ FAN ๋žœ๋“œ๋งˆํฌ ๊ธฐ๋ฐ˜์œผ๋กœ ํฌ๋กญํ•ด ๋†“๋Š”๋‹ค
assert len(df_paths) == 1
if config.move:
if verbose:
print("cwf.save_crop_info_move --")
df_fan_path = cwf.save_crop_info_move(
image_size=image_size,
anchor_box_path=df_paths[0],
mp4_path=template_video_path,
out_dir=crop_mp4,
crop_offset_y=config.crop_offset_y,
crop_margin=config.crop_margin,
callback=callback3,
verbose=verbose,
)
else:
if verbose:
print("cwf.save_crop_info2 --")
df_fan_path = cwf.save_crop_info2(
image_size=image_size,
anchor_box_path=df_paths[0],
mp4_path=template_video_path,
out_dir=crop_mp4,
crop_offset_y=config.crop_offset_y,
crop_margin=config.crop_margin,
no_infer_frames=no_infer_frames,
callback=callback3,
verbose=verbose,
)
# snow : for debug
if verbose:
print("df_fan_path: ", df_fan_path)
ff.del_face_finder()
cwf.del_fan()
else:
if verbose:
print("์ „์ฒ˜๋ฆฌ๊ฐ€ ์ด๋ฏธ ๋˜์–ด์žˆ์Œ")
callback3(100)
# 1. save frames for stf
if save_frames:
frame_dir = get_frame_dir(
preprocess_dir, template_video_path, ratio=template_frame_ratio
)
if verbose:
print("frame_dir:", frame_dir)
save_template_frames(
template_video_path=template_video_path,
template_frames_path=frame_dir,
ratio=template_frame_ratio,
save_in_video=False,
verbose=verbose,
)
if silent_video_path is not None:
frame_dir = get_frame_dir(
preprocess_dir, silent_video_path, ratio=template_frame_ratio
)
save_template_frames(
template_video_path=silent_video_path,
template_frames_path=frame_dir,
ratio=template_frame_ratio,
save_in_video=False,
verbose=verbose,
)
if template_video_path.endswith(".mov"):
# TODO snow : ์„ฑ๋Šฅ ํ™•์ธ ํ•„์š”.
# ์ง€๊ธˆ์€ mov ์ธ ๊ฒฝ์šฐ๋งŒ ํŒŒ์ผ์„ ์ €์žฅํ•œ๋‹ค. ์ถ”๋ก ํ•  ๋•Œ ๋Š๋ ค์„œ ๋ผ๊ณ  ํ•œ๋‹ค. by hojin
# 2. save video for encoding
for video_ratio in template_video_ratio:
if video_ratio != 1.0:
out_path = get_template_ratio_file_path(
preprocess_dir, template_video_path, ratio=video_ratio
)
save_template_frames(
template_video_path=template_video_path,
template_frames_path="",
template_video_path_with_ratio=out_path,
ratio=video_ratio,
save_in_video=True,
verbose=verbose,
)
if silent_video_path is not None:
out_path = get_template_ratio_file_path(
preprocess_dir, silent_video_path, ratio=video_ratio
)
save_template_webm_ratio(
template_video_path=silent_video_path,
ratio=video_ratio,
out_path=out_path,
verbose=verbose,
)
if template_video_path.endswith(".webm"):
# TODO snow : ์„ฑ๋Šฅ ํ™•์ธ ํ•„์š”. ratio ๊ฐœ์ˆ˜๋งŒํผ webm ์„ ๋งŒ๋“ ๋‹ค.
for video_ratio in template_video_ratio:
out_path = get_template_ratio_file_path(
preprocess_dir, template_video_path, ratio=video_ratio
)
save_template_webm_ratio(
template_video_path=template_video_path,
ratio=video_ratio,
out_path=out_path,
verbose=verbose,
)
if silent_video_path is not None:
out_path = get_template_ratio_file_path(
preprocess_dir, silent_video_path, ratio=video_ratio
)
save_template_webm_ratio(
template_video_path=silent_video_path,
ratio=video_ratio,
out_path=out_path,
verbose=verbose,
)
gc.collect()
return load_gpu
# snow: webm ํ…œํ”Œ๋ฆฟ์„ ratio ๋ณ„๋กœ resize ํ•˜์—ฌ ์ €์žฅํ•˜๋Š” ํ•จ์ˆ˜
def save_template_webm_ratio(template_video_path, ratio, out_path, verbose):
def resize_(size, img):
w, h = size
img = cv2.resize(img, (w, h), inter_alg_(w, h, img))
return img
def inter_alg_(w, h, img):
if w * h < img.shape[0] * img.shape[1]:
return cv2.INTER_AREA
else:
return cv2.INTER_CUBIC
os.makedirs(os.path.dirname(out_path), exist_ok=True)
reader, meta = get_four_channel_ffmpeg_reader(template_video_path)
if Path(out_path).exists():
if verbose:
print(f"ratio ํŒŒ์ผ์ด ์ €์žฅ๋˜์–ด ์žˆ์Œ, {out_path}")
return
if verbose:
print(f"webm ratio template, org:{template_video_path}, ratio:{ratio}")
size_org = meta["size"]
size = list(int(round(ratio * v)) // 2 * 2 for v in size_org)
writer = get_webm_ffmpeg_writer(
out_path, size=size, fps=meta["fps"], wav_path=template_video_path
)
writer.send(None) # seed the generator
total_cnt, _ = imageio_ffmpeg.count_frames_and_secs(template_video_path)
for idx, f in tqdm(
enumerate(reader), total=total_cnt, desc=f"save webm ratio:{ratio}, size:{size}"
):
f = np.frombuffer(f, dtype=np.uint8)
f = f.reshape(size_org[1], size_org[0], 4)
f = resize_(size, f)
writer.send(f) # seed the generator
writer.close()
# hojin
# png frame ์ถ”์ถœ + crop
def save_template_frames(
template_video_path,
template_frames_path,
template_video_path_with_ratio=None,
ratio=1.0,
save_in_video=False,
verbose=False,
):
def inter_alg_(w, h, img):
if w * h < img.shape[0] * img.shape[1]:
return cv2.INTER_AREA
else:
return cv2.INTER_CUBIC
def resize_(size, img):
w, h = size
img = cv2.resize(img, (w, h), inter_alg_(w, h, img))
return img
# hojin: ํ…œํ”Œ๋ฆฟ์„ ํ”„๋ ˆ์ž„๋ณ„๋กœ ์ €์žฅํ•ด๋‘๊ธฐ -> write_video_in_thread์—์„œ reader ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ธฐ ์œ„ํ•จ
if save_in_video == False:
if Path(template_frames_path).exists():
if verbose:
print("ํ”„๋ ˆ์ž„์ด ๋ชจ๋‘ ์ €์žฅ๋˜์–ด ์žˆ์Œ")
return
else:
if Path(template_video_path_with_ratio).exists():
if verbose:
print("๋น„๋””์˜ค๊ฐ€ ์ƒ์„ฑ๋˜์–ด ์žˆ์Œ")
return
os.makedirs(os.path.dirname(template_video_path_with_ratio), exist_ok=True)
if template_video_path.endswith(".mov") or template_video_path.endswith(".webm"):
reader, meta = get_four_channel_ffmpeg_reader(template_video_path)
else: # mp4
reader, meta = get_three_channel_ffmpeg_reader(template_video_path)
size_org = meta["size"]
size = list(int(round(ratio * v)) // 2 * 2 for v in size_org)
fps = meta["fps"]
if verbose:
print(meta)
total_cnt, _ = imageio_ffmpeg.count_frames_and_secs(template_video_path)
if save_in_video is False:
Path(template_frames_path).mkdir(exist_ok=True, parents=True)
# hojin: ์ถ”์ถœํ•œ ํ”„๋ ˆ์ž„์„ ๋‚ด๋ณด๋‚ด๊ธฐ๋ฅผ ์œ„ํ•ด์„œ ๋‹ค์‹œ mov๋กœ ๋งŒ๋“ค์–ด๋†“๊ธฐ (ratio<1.0)
writer = None
if verbose:
print("template_frames_path: ", template_frames_path)
for idx, f in tqdm(
enumerate(reader),
total=total_cnt,
desc=f"save frames f{ratio}"
if save_in_video is False
else f"save video f{ratio}",
):
name = f"""{idx:05d}.webp"""
f = np.frombuffer(f, dtype=np.uint8)
f = f.reshape(
size_org[1], size_org[0], 3 if template_video_path.endswith(".mp4") else 4
)
f = resize_(size, f)
if save_in_video is False:
f = np.ascontiguousarray(f)
f = Image.fromarray(
f, mode="RGB" if template_video_path.endswith(".mp4") else "RGBA"
)
f.save(
str(Path(template_frames_path) / str(name)), format="png", lossless=True
)
# cv2.imwrite(str(Path(template_frames_path) / str(name)), f[:, :, [2, 1, 0, 3]], [int(cv2.IMWRITE_PNG_COMPRESSION), 3])
if writer is None and save_in_video is True:
if ratio != 1.0:
writer = imageio_ffmpeg.write_frames(
template_video_path_with_ratio,
size=size,
fps=fps,
quality=10,
pix_fmt_in="rgba",
pix_fmt_out="rgba",
codec="png",
macro_block_size=1,
)
writer.send(None)
if writer:
writer.send(f)
if writer:
writer.close()
# hojin end