import gc import os from pathlib import Path import cv2 from PIL import Image from tqdm import tqdm from .preprocess_dir.utils import crop_with_fan as cwf from .preprocess_dir.utils import face_finder as ff from .util import * # template video 전처리 # preprocess_template_old(기존함수) 와 기능은 동일하고, 메모리 사용량 줄임 def preprocess_template( config_path, template_video_path, reference_face, work_root_path, device, template_frame_ratio=1.0, template_video_ratio=[1.0], callback=None, verbose=False, save_frames=True, silent_video_path=None, no_infer_frames=[], ): """template video 전처리 Parameters ---------- config_path (str) : 설정파일 경로 template_video_path (str) : 템플릿 영상 경로 reference_face : (str) : 참고할 얼굴 이미지 경로 work_root_path (str) : 작업폴더 경로. 전처리 정보가 저장됨. device (str) : device 정보. ex) cuda:0 template_frame_ratio (float) : 템플릿 비디오 resize 비율. 1.0: 영상 그대로 사용 template_video_ratio (list[float]) : 템플릿 비디오 resize 비율. 1.0: 영상 그대로 사용 save_frames (bool) : 템플릿 비디오 프레임 저장여부 no_infer_frames (list[tuple[int,int]]) : 추론에 사용되지 않는frame 구간. 시작은 포함, 끝은 포함되지 않음. """ load_gpu = False config = read_config(config_path) image_size = config.img_size callback1 = callback_inter( callback, min_per=0, max_per=2, desc="preprocess_template 1", verbose=verbose ) callback2 = callback_inter( callback, min_per=2, max_per=20, desc="preprocess_template 2", verbose=verbose ) callback3 = callback_inter( callback, min_per=20, max_per=100, desc="preprocess_template 3", verbose=verbose ) preprocess_dir = get_preprocess_dir(work_root_path, config.name) Path(preprocess_dir).mkdir(exist_ok=True, parents=True) # snow : for debug if verbose: print("preprocess_dir: ", preprocess_dir, ", work_root_path:", work_root_path) # 전처리 파일 경로 crop_mp4 = get_crop_mp4_dir(preprocess_dir, template_video_path) if not Path(crop_mp4).exists(): load_gpu = True ff.init_face_finder(device) cwf.init_fan(device) if verbose: print("템플릿 비디오 처리 ... ") # 아나운서 얼굴 정보를 구한다. df_face, imgs = ff.find_face(reference_face) callback1(100) # 진행율을 알려준다. g_anchor_ebd = df_face["ebd"].values[0] # 템플릿 동영상에서 아나운서 얼굴 위치만 저장해 놓는다 df_paths = ff.save_face_info3( template_video_path, g_anchor_ebd, config.move, base=preprocess_dir, callback=callback2, verbose=verbose, ) ### 얼굴 영역을 FAN 랜드마크 기반으로 크롭해 놓는다 assert len(df_paths) == 1 if config.move: if verbose: print("cwf.save_crop_info_move --") df_fan_path = cwf.save_crop_info_move( image_size=image_size, anchor_box_path=df_paths[0], mp4_path=template_video_path, out_dir=crop_mp4, crop_offset_y=config.crop_offset_y, crop_margin=config.crop_margin, callback=callback3, verbose=verbose, ) else: if verbose: print("cwf.save_crop_info2 --") df_fan_path = cwf.save_crop_info2( image_size=image_size, anchor_box_path=df_paths[0], mp4_path=template_video_path, out_dir=crop_mp4, crop_offset_y=config.crop_offset_y, crop_margin=config.crop_margin, no_infer_frames=no_infer_frames, callback=callback3, verbose=verbose, ) # snow : for debug if verbose: print("df_fan_path: ", df_fan_path) ff.del_face_finder() cwf.del_fan() else: if verbose: print("전처리가 이미 되어있음") callback3(100) # 1. save frames for stf if save_frames: frame_dir = get_frame_dir( preprocess_dir, template_video_path, ratio=template_frame_ratio ) if verbose: print("frame_dir:", frame_dir) save_template_frames( template_video_path=template_video_path, template_frames_path=frame_dir, ratio=template_frame_ratio, save_in_video=False, verbose=verbose, ) if silent_video_path is not None: frame_dir = get_frame_dir( preprocess_dir, silent_video_path, ratio=template_frame_ratio ) save_template_frames( template_video_path=silent_video_path, template_frames_path=frame_dir, ratio=template_frame_ratio, save_in_video=False, verbose=verbose, ) if template_video_path.endswith(".mov"): # TODO snow : 성능 확인 필요. # 지금은 mov 인 경우만 파일을 저장한다. 추론할 때 느려서 라고 한다. by hojin # 2. save video for encoding for video_ratio in template_video_ratio: if video_ratio != 1.0: out_path = get_template_ratio_file_path( preprocess_dir, template_video_path, ratio=video_ratio ) save_template_frames( template_video_path=template_video_path, template_frames_path="", template_video_path_with_ratio=out_path, ratio=video_ratio, save_in_video=True, verbose=verbose, ) if silent_video_path is not None: out_path = get_template_ratio_file_path( preprocess_dir, silent_video_path, ratio=video_ratio ) save_template_webm_ratio( template_video_path=silent_video_path, ratio=video_ratio, out_path=out_path, verbose=verbose, ) if template_video_path.endswith(".webm"): # TODO snow : 성능 확인 필요. ratio 개수만큼 webm 을 만든다. for video_ratio in template_video_ratio: out_path = get_template_ratio_file_path( preprocess_dir, template_video_path, ratio=video_ratio ) save_template_webm_ratio( template_video_path=template_video_path, ratio=video_ratio, out_path=out_path, verbose=verbose, ) if silent_video_path is not None: out_path = get_template_ratio_file_path( preprocess_dir, silent_video_path, ratio=video_ratio ) save_template_webm_ratio( template_video_path=silent_video_path, ratio=video_ratio, out_path=out_path, verbose=verbose, ) gc.collect() return load_gpu # snow: webm 템플릿을 ratio 별로 resize 하여 저장하는 함수 def save_template_webm_ratio(template_video_path, ratio, out_path, verbose): def resize_(size, img): w, h = size img = cv2.resize(img, (w, h), inter_alg_(w, h, img)) return img def inter_alg_(w, h, img): if w * h < img.shape[0] * img.shape[1]: return cv2.INTER_AREA else: return cv2.INTER_CUBIC os.makedirs(os.path.dirname(out_path), exist_ok=True) reader, meta = get_four_channel_ffmpeg_reader(template_video_path) if Path(out_path).exists(): if verbose: print(f"ratio 파일이 저장되어 있음, {out_path}") return if verbose: print(f"webm ratio template, org:{template_video_path}, ratio:{ratio}") size_org = meta["size"] size = list(int(round(ratio * v)) // 2 * 2 for v in size_org) writer = get_webm_ffmpeg_writer( out_path, size=size, fps=meta["fps"], wav_path=template_video_path ) writer.send(None) # seed the generator total_cnt, _ = imageio_ffmpeg.count_frames_and_secs(template_video_path) for idx, f in tqdm( enumerate(reader), total=total_cnt, desc=f"save webm ratio:{ratio}, size:{size}" ): f = np.frombuffer(f, dtype=np.uint8) f = f.reshape(size_org[1], size_org[0], 4) f = resize_(size, f) writer.send(f) # seed the generator writer.close() # hojin # png frame 추출 + crop def save_template_frames( template_video_path, template_frames_path, template_video_path_with_ratio=None, ratio=1.0, save_in_video=False, verbose=False, ): def inter_alg_(w, h, img): if w * h < img.shape[0] * img.shape[1]: return cv2.INTER_AREA else: return cv2.INTER_CUBIC def resize_(size, img): w, h = size img = cv2.resize(img, (w, h), inter_alg_(w, h, img)) return img # hojin: 템플릿을 프레임별로 저장해두기 -> write_video_in_thread에서 reader 사용하지 않기 위함 if save_in_video == False: if Path(template_frames_path).exists(): if verbose: print("프레임이 모두 저장되어 있음") return else: if Path(template_video_path_with_ratio).exists(): if verbose: print("비디오가 생성되어 있음") return os.makedirs(os.path.dirname(template_video_path_with_ratio), exist_ok=True) if template_video_path.endswith(".mov") or template_video_path.endswith(".webm"): reader, meta = get_four_channel_ffmpeg_reader(template_video_path) else: # mp4 reader, meta = get_three_channel_ffmpeg_reader(template_video_path) size_org = meta["size"] size = list(int(round(ratio * v)) // 2 * 2 for v in size_org) fps = meta["fps"] if verbose: print(meta) total_cnt, _ = imageio_ffmpeg.count_frames_and_secs(template_video_path) if save_in_video is False: Path(template_frames_path).mkdir(exist_ok=True, parents=True) # hojin: 추출한 프레임을 내보내기를 위해서 다시 mov로 만들어놓기 (ratio<1.0) writer = None if verbose: print("template_frames_path: ", template_frames_path) for idx, f in tqdm( enumerate(reader), total=total_cnt, desc=f"save frames f{ratio}" if save_in_video is False else f"save video f{ratio}", ): name = f"""{idx:05d}.webp""" f = np.frombuffer(f, dtype=np.uint8) f = f.reshape( size_org[1], size_org[0], 3 if template_video_path.endswith(".mp4") else 4 ) f = resize_(size, f) if save_in_video is False: f = np.ascontiguousarray(f) f = Image.fromarray( f, mode="RGB" if template_video_path.endswith(".mp4") else "RGBA" ) f.save( str(Path(template_frames_path) / str(name)), format="png", lossless=True ) # cv2.imwrite(str(Path(template_frames_path) / str(name)), f[:, :, [2, 1, 0, 3]], [int(cv2.IMWRITE_PNG_COMPRESSION), 3]) if writer is None and save_in_video is True: if ratio != 1.0: writer = imageio_ffmpeg.write_frames( template_video_path_with_ratio, size=size, fps=fps, quality=10, pix_fmt_in="rgba", pix_fmt_out="rgba", codec="png", macro_block_size=1, ) writer.send(None) if writer: writer.send(f) if writer: writer.close() # hojin end