yerang's picture
Update stf/stf-api-alternative/src/stf_alternative/preprocess.py
749e6ac verified
raw
history blame
12.4 kB
import gc
import os
from pathlib import Path
import cv2; cv2.setNumThreads(0); cv2.ocl.setUseOpenCL(False)
from PIL import Image
from tqdm import tqdm
from .preprocess_dir.utils import crop_with_fan as cwf
from .preprocess_dir.utils import face_finder as ff
from .util import *
# template video ์ „์ฒ˜๋ฆฌ
# preprocess_template_old(๊ธฐ์กดํ•จ์ˆ˜) ์™€ ๊ธฐ๋Šฅ์€ ๋™์ผํ•˜๊ณ , ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰ ์ค„์ž„
def preprocess_template(
config_path,
template_video_path,
reference_face,
work_root_path,
device,
template_frame_ratio=1.0,
template_video_ratio=[1.0],
callback=None,
verbose=False,
save_frames=True,
silent_video_path=None,
no_infer_frames=[],
):
"""template video ์ „์ฒ˜๋ฆฌ
Parameters
----------
config_path (str) : ์„ค์ •ํŒŒ์ผ ๊ฒฝ๋กœ
template_video_path (str) : ํ…œํ”Œ๋ฆฟ ์˜์ƒ ๊ฒฝ๋กœ
reference_face : (str) : ์ฐธ๊ณ ํ•  ์–ผ๊ตด ์ด๋ฏธ์ง€ ๊ฒฝ๋กœ
work_root_path (str) : ์ž‘์—…ํด๋” ๊ฒฝ๋กœ. ์ „์ฒ˜๋ฆฌ ์ •๋ณด๊ฐ€ ์ €์žฅ๋จ.
device (str) : device ์ •๋ณด. ex) cuda:0
template_frame_ratio (float) : ํ…œํ”Œ๋ฆฟ ๋น„๋””์˜ค resize ๋น„์œจ. 1.0: ์˜์ƒ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉ
template_video_ratio (list[float]) : ํ…œํ”Œ๋ฆฟ ๋น„๋””์˜ค resize ๋น„์œจ. 1.0: ์˜์ƒ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉ
save_frames (bool) : ํ…œํ”Œ๋ฆฟ ๋น„๋””์˜ค ํ”„๋ ˆ์ž„ ์ €์žฅ์—ฌ๋ถ€
no_infer_frames (list[tuple[int,int]]) : ์ถ”๋ก ์— ์‚ฌ์šฉ๋˜์ง€ ์•Š๋Š”frame ๊ตฌ๊ฐ„. ์‹œ์ž‘์€ ํฌํ•จ, ๋์€ ํฌํ•จ๋˜์ง€ ์•Š์Œ.
"""
load_gpu = False
config = read_config(config_path)
image_size = config.img_size
callback1 = callback_inter(
callback, min_per=0, max_per=2, desc="preprocess_template 1", verbose=verbose
)
callback2 = callback_inter(
callback, min_per=2, max_per=20, desc="preprocess_template 2", verbose=verbose
)
callback3 = callback_inter(
callback, min_per=20, max_per=100, desc="preprocess_template 3", verbose=verbose
)
preprocess_dir = get_preprocess_dir(work_root_path, config.name)
Path(preprocess_dir).mkdir(exist_ok=True, parents=True)
# snow : for debug
if verbose:
print("preprocess_dir: ", preprocess_dir, ", work_root_path:", work_root_path)
# ์ „์ฒ˜๋ฆฌ ํŒŒ์ผ ๊ฒฝ๋กœ
crop_mp4 = get_crop_mp4_dir(preprocess_dir, template_video_path)
if not Path(crop_mp4).exists():
load_gpu = True
ff.init_face_finder(device)
cwf.init_fan(device)
if verbose:
print("ํ…œํ”Œ๋ฆฟ ๋น„๋””์˜ค ์ฒ˜๋ฆฌ ... ")
# ์•„๋‚˜์šด์„œ ์–ผ๊ตด ์ •๋ณด๋ฅผ ๊ตฌํ•œ๋‹ค.
df_face, imgs = ff.find_face(reference_face)
callback1(100) # ์ง„ํ–‰์œจ์„ ์•Œ๋ ค์ค€๋‹ค.
g_anchor_ebd = df_face["ebd"].values[0]
# ํ…œํ”Œ๋ฆฟ ๋™์˜์ƒ์—์„œ ์•„๋‚˜์šด์„œ ์–ผ๊ตด ์œ„์น˜๋งŒ ์ €์žฅํ•ด ๋†“๋Š”๋‹ค
df_paths = ff.save_face_info3(
template_video_path,
g_anchor_ebd,
config.move,
base=preprocess_dir,
callback=callback2,
verbose=verbose,
)
### ์–ผ๊ตด ์˜์—ญ์„ FAN ๋žœ๋“œ๋งˆํฌ ๊ธฐ๋ฐ˜์œผ๋กœ ํฌ๋กญํ•ด ๋†“๋Š”๋‹ค
assert len(df_paths) == 1
if config.move:
if verbose:
print("cwf.save_crop_info_move --")
df_fan_path = cwf.save_crop_info_move(
image_size=image_size,
anchor_box_path=df_paths[0],
mp4_path=template_video_path,
out_dir=crop_mp4,
crop_offset_y=config.crop_offset_y,
crop_margin=config.crop_margin,
callback=callback3,
verbose=verbose,
)
else:
if verbose:
print("cwf.save_crop_info2 --")
df_fan_path = cwf.save_crop_info2(
image_size=image_size,
anchor_box_path=df_paths[0],
mp4_path=template_video_path,
out_dir=crop_mp4,
crop_offset_y=config.crop_offset_y,
crop_margin=config.crop_margin,
no_infer_frames=no_infer_frames,
callback=callback3,
verbose=verbose,
)
# snow : for debug
if verbose:
print("df_fan_path: ", df_fan_path)
ff.del_face_finder()
cwf.del_fan()
else:
if verbose:
print("์ „์ฒ˜๋ฆฌ๊ฐ€ ์ด๋ฏธ ๋˜์–ด์žˆ์Œ")
callback3(100)
# 1. save frames for stf
if save_frames:
frame_dir = get_frame_dir(
preprocess_dir, template_video_path, ratio=template_frame_ratio
)
if verbose:
print("frame_dir:", frame_dir)
save_template_frames(
template_video_path=template_video_path,
template_frames_path=frame_dir,
ratio=template_frame_ratio,
save_in_video=False,
verbose=verbose,
)
if silent_video_path is not None:
frame_dir = get_frame_dir(
preprocess_dir, silent_video_path, ratio=template_frame_ratio
)
save_template_frames(
template_video_path=silent_video_path,
template_frames_path=frame_dir,
ratio=template_frame_ratio,
save_in_video=False,
verbose=verbose,
)
if template_video_path.endswith(".mov"):
# TODO snow : ์„ฑ๋Šฅ ํ™•์ธ ํ•„์š”.
# ์ง€๊ธˆ์€ mov ์ธ ๊ฒฝ์šฐ๋งŒ ํŒŒ์ผ์„ ์ €์žฅํ•œ๋‹ค. ์ถ”๋ก ํ•  ๋•Œ ๋Š๋ ค์„œ ๋ผ๊ณ  ํ•œ๋‹ค. by hojin
# 2. save video for encoding
for video_ratio in template_video_ratio:
if video_ratio != 1.0:
out_path = get_template_ratio_file_path(
preprocess_dir, template_video_path, ratio=video_ratio
)
save_template_frames(
template_video_path=template_video_path,
template_frames_path="",
template_video_path_with_ratio=out_path,
ratio=video_ratio,
save_in_video=True,
verbose=verbose,
)
if silent_video_path is not None:
out_path = get_template_ratio_file_path(
preprocess_dir, silent_video_path, ratio=video_ratio
)
save_template_webm_ratio(
template_video_path=silent_video_path,
ratio=video_ratio,
out_path=out_path,
verbose=verbose,
)
if template_video_path.endswith(".webm"):
# TODO snow : ์„ฑ๋Šฅ ํ™•์ธ ํ•„์š”. ratio ๊ฐœ์ˆ˜๋งŒํผ webm ์„ ๋งŒ๋“ ๋‹ค.
for video_ratio in template_video_ratio:
out_path = get_template_ratio_file_path(
preprocess_dir, template_video_path, ratio=video_ratio
)
save_template_webm_ratio(
template_video_path=template_video_path,
ratio=video_ratio,
out_path=out_path,
verbose=verbose,
)
if silent_video_path is not None:
out_path = get_template_ratio_file_path(
preprocess_dir, silent_video_path, ratio=video_ratio
)
save_template_webm_ratio(
template_video_path=silent_video_path,
ratio=video_ratio,
out_path=out_path,
verbose=verbose,
)
gc.collect()
return load_gpu
# snow: webm ํ…œํ”Œ๋ฆฟ์„ ratio ๋ณ„๋กœ resize ํ•˜์—ฌ ์ €์žฅํ•˜๋Š” ํ•จ์ˆ˜
def save_template_webm_ratio(template_video_path, ratio, out_path, verbose):
def resize_(size, img):
w, h = size
img = cv2.resize(img, (w, h), inter_alg_(w, h, img))
return img
def inter_alg_(w, h, img):
if w * h < img.shape[0] * img.shape[1]:
return cv2.INTER_AREA
else:
return cv2.INTER_CUBIC
os.makedirs(os.path.dirname(out_path), exist_ok=True)
reader, meta = get_four_channel_ffmpeg_reader(template_video_path)
if Path(out_path).exists():
if verbose:
print(f"ratio ํŒŒ์ผ์ด ์ €์žฅ๋˜์–ด ์žˆ์Œ, {out_path}")
return
if verbose:
print(f"webm ratio template, org:{template_video_path}, ratio:{ratio}")
size_org = meta["size"]
size = list(int(round(ratio * v)) // 2 * 2 for v in size_org)
writer = get_webm_ffmpeg_writer(
out_path, size=size, fps=meta["fps"], wav_path=template_video_path
)
writer.send(None) # seed the generator
total_cnt, _ = imageio_ffmpeg.count_frames_and_secs(template_video_path)
for idx, f in tqdm(
enumerate(reader), total=total_cnt, desc=f"save webm ratio:{ratio}, size:{size}"
):
f = np.frombuffer(f, dtype=np.uint8)
f = f.reshape(size_org[1], size_org[0], 4)
f = resize_(size, f)
writer.send(f) # seed the generator
writer.close()
# hojin
# png frame ์ถ”์ถœ + crop
def save_template_frames(
template_video_path,
template_frames_path,
template_video_path_with_ratio=None,
ratio=1.0,
save_in_video=False,
verbose=False,
):
def inter_alg_(w, h, img):
if w * h < img.shape[0] * img.shape[1]:
return cv2.INTER_AREA
else:
return cv2.INTER_CUBIC
def resize_(size, img):
w, h = size
img = cv2.resize(img, (w, h), inter_alg_(w, h, img))
return img
# hojin: ํ…œํ”Œ๋ฆฟ์„ ํ”„๋ ˆ์ž„๋ณ„๋กœ ์ €์žฅํ•ด๋‘๊ธฐ -> write_video_in_thread์—์„œ reader ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ธฐ ์œ„ํ•จ
if save_in_video == False:
if Path(template_frames_path).exists():
if verbose:
print("ํ”„๋ ˆ์ž„์ด ๋ชจ๋‘ ์ €์žฅ๋˜์–ด ์žˆ์Œ")
return
else:
if Path(template_video_path_with_ratio).exists():
if verbose:
print("๋น„๋””์˜ค๊ฐ€ ์ƒ์„ฑ๋˜์–ด ์žˆ์Œ")
return
os.makedirs(os.path.dirname(template_video_path_with_ratio), exist_ok=True)
if template_video_path.endswith(".mov") or template_video_path.endswith(".webm"):
reader, meta = get_four_channel_ffmpeg_reader(template_video_path)
else: # mp4
reader, meta = get_three_channel_ffmpeg_reader(template_video_path)
size_org = meta["size"]
size = list(int(round(ratio * v)) // 2 * 2 for v in size_org)
fps = meta["fps"]
if verbose:
print(meta)
total_cnt, _ = imageio_ffmpeg.count_frames_and_secs(template_video_path)
if save_in_video is False:
Path(template_frames_path).mkdir(exist_ok=True, parents=True)
# hojin: ์ถ”์ถœํ•œ ํ”„๋ ˆ์ž„์„ ๋‚ด๋ณด๋‚ด๊ธฐ๋ฅผ ์œ„ํ•ด์„œ ๋‹ค์‹œ mov๋กœ ๋งŒ๋“ค์–ด๋†“๊ธฐ (ratio<1.0)
writer = None
if verbose:
print("template_frames_path: ", template_frames_path)
for idx, f in tqdm(
enumerate(reader),
total=total_cnt,
desc=f"save frames f{ratio}"
if save_in_video is False
else f"save video f{ratio}",
):
name = f"""{idx:05d}.webp"""
f = np.frombuffer(f, dtype=np.uint8)
f = f.reshape(
size_org[1], size_org[0], 3 if template_video_path.endswith(".mp4") else 4
)
f = resize_(size, f)
if save_in_video is False:
f = np.ascontiguousarray(f)
f = Image.fromarray(
f, mode="RGB" if template_video_path.endswith(".mp4") else "RGBA"
)
f.save(
str(Path(template_frames_path) / str(name)), format="png", lossless=True
)
# cv2.imwrite(str(Path(template_frames_path) / str(name)), f[:, :, [2, 1, 0, 3]], [int(cv2.IMWRITE_PNG_COMPRESSION), 3])
if writer is None and save_in_video is True:
if ratio != 1.0:
writer = imageio_ffmpeg.write_frames(
template_video_path_with_ratio,
size=size,
fps=fps,
quality=10,
pix_fmt_in="rgba",
pix_fmt_out="rgba",
codec="png",
macro_block_size=1,
)
writer.send(None)
if writer:
writer.send(f)
if writer:
writer.close()
# hojin end