# coding: utf-8 """ Utility functions and classes to handle feature extraction and model loading """ import os import os.path as osp import torch from collections import OrderedDict import psutil from rich.console import Console from rich.progress import Progress from ..modules.spade_generator import SPADEDecoder from ..modules.warping_network import WarpingNetwork from ..modules.motion_extractor import MotionExtractor from ..modules.appearance_feature_extractor import AppearanceFeatureExtractor from ..modules.stitching_retargeting_network import StitchingRetargetingNetwork from rich.console import Console import psutil console = Console() def show_memory_usage(): """ Display the current memory usage in the terminal using rich. """ mem_info = psutil.virtual_memory() total_mem = mem_info.total / (1024 ** 3) # Convert to GB used_mem = mem_info.used / (1024 ** 3) # Convert to GB available_mem = mem_info.available / (1024 ** 3) # Convert to GB console.log(f"[bold green]Memory Usage:[/bold green] [bold red]{used_mem:.2f} GB[/bold red] used of [bold blue]{total_mem:.2f} GB[/bold blue]") console.log(f"[bold green]Available Memory:[/bold green] [bold yellow]{available_mem:.2f} GB[/bold yellow]") def suffix(filename): """a.jpg -> jpg""" pos = filename.rfind(".") if pos == -1: return "" return filename[pos + 1:] def prefix(filename): """a.jpg -> a""" pos = filename.rfind(".") if pos == -1: return filename return filename[:pos] def basename(filename): """a/b/c.jpg -> c""" return prefix(osp.basename(filename)) def is_video(file_path): if file_path.lower().endswith((".mp4", ".mov", ".avi", ".webm")) or osp.isdir(file_path): return True return False def is_template(file_path): if file_path.endswith(".pkl"): return True return False def mkdir(d, log=False): # return self-assigned `d`, for one line code if not osp.exists(d): os.makedirs(d, exist_ok=True) if log: log(f"Make dir: {d}") return d def squeeze_tensor_to_numpy(tensor): out = tensor.data.squeeze(0).cpu().numpy() return out def dct2cpu(dct: dict, device='cpu'): for key in dct: dct[key] = torch.tensor(dct[key]).to(device) return dct def concat_feat(kp_source: torch.Tensor, kp_driving: torch.Tensor) -> torch.Tensor: """ kp_source: (bs, k, 3) kp_driving: (bs, k, 3) Return: (bs, 2k*3) """ bs_src = kp_source.shape[0] bs_dri = kp_driving.shape[0] assert bs_src == bs_dri, 'batch size must be equal' feat = torch.cat([kp_source.view(bs_src, -1), kp_driving.view(bs_dri, -1)], dim=1) return feat def remove_ddp_duplicate_key(state_dict): state_dict_new = OrderedDict() for key in state_dict.keys(): state_dict_new[key.replace('module.', '')] = state_dict[key] return state_dict_new def load_model(ckpt_path, model_config, device, model_type): model_params = model_config['model_params'][f'{model_type}_params'] if model_type == 'appearance_feature_extractor': model = AppearanceFeatureExtractor(**model_params).to('cpu') elif model_type == 'motion_extractor': model = MotionExtractor(**model_params).to('cpu') elif model_type == 'warping_module': model = WarpingNetwork(**model_params).to('cpu') elif model_type == 'spade_generator': model = SPADEDecoder(**model_params).to('cpu') elif model_type == 'stitching_retargeting_module': # Special handling for stitching and retargeting module config = model_config['model_params']['stitching_retargeting_module_params'] checkpoint = torch.load(ckpt_path, map_location='cpu') stitcher = StitchingRetargetingNetwork(**config.get('stitching')) stitcher.load_state_dict(remove_ddp_duplicate_key(checkpoint['retarget_shoulder'])) stitcher = stitcher.to('cpu') stitcher.eval() retargetor_lip = StitchingRetargetingNetwork(**config.get('lip')) retargetor_lip.load_state_dict(remove_ddp_duplicate_key(checkpoint['retarget_mouth'])) retargetor_lip = retargetor_lip.to('cpu') retargetor_lip.eval() retargetor_eye = StitchingRetargetingNetwork(**config.get('eye')) retargetor_eye.load_state_dict(remove_ddp_duplicate_key(checkpoint['retarget_eye'])) retargetor_eye = retargetor_eye.to('cpu') retargetor_eye.eval() return { 'stitching': stitcher, 'lip': retargetor_lip, 'eye': retargetor_eye } else: raise ValueError(f"Unknown model type: {model_type}") model.load_state_dict(torch.load(ckpt_path, map_location='cpu')) model.eval() return model # Get coefficients of Eqn. 7 def calculate_transformation(config, s_kp_info, t_0_kp_info, t_i_kp_info, R_s, R_t_0, R_t_i): if config.relative: new_rotation = (R_t_i @ R_t_0.permute(0, 2, 1)) @ R_s new_expression = s_kp_info['exp'] + (t_i_kp_info['exp'] - t_0_kp_info['exp']) else: new_rotation = R_t_i new_expression = t_i_kp_info['exp'] new_translation = s_kp_info['t'] + (t_i_kp_info['t'] - t_0_kp_info['t']) new_translation[..., 2].fill_(0) # Keep the z-axis unchanged new_scale = s_kp_info['scale'] * (t_i_kp_info['scale'] / t_0_kp_info['scale']) return new_rotation, new_expression, new_translation, new_scale def load_description(fp): with open(fp, 'r', encoding='utf-8') as f: content = f.read() return content