Spaces:
Running
Running
import os | |
import time | |
from common.arguments import parse_args | |
from common.camera import * | |
from common.generators import * | |
from common.loss import * | |
from common.model import * | |
from common.utils import Timer, evaluate, add_path | |
from common.inference_3d import * | |
from model.block.refine import refine | |
from model.stmo import Model | |
import HPE2keyframes as Hk | |
from datetime import datetime | |
import pytz | |
# from joints_detectors.openpose.main import generate_kpts as open_pose | |
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" # see issue #152 | |
os.environ["CUDA_VISIBLE_DEVICES"] = "0" | |
metadata = {'layout_name': 'coco', 'num_joints': 17, 'keypoints_symmetry': [[1, 3, 5, 7, 9, 11, 13, 15], [2, 4, 6, 8, 10, 12, 14, 16]]} | |
add_path() | |
# record time | |
def ckpt_time(ckpt=None): | |
if not ckpt: | |
return time.time() | |
else: | |
return time.time() - float(ckpt), time.time() | |
time0 = ckpt_time() | |
def get_detector_2d(detector_name): | |
def get_alpha_pose(): | |
from joints_detectors.Alphapose.gene_npz import generate_kpts as alpha_pose | |
return alpha_pose | |
detector_map = { | |
'alpha_pose': get_alpha_pose, | |
# 'hr_pose': get_hr_pose, | |
# 'open_pose': open_pose | |
} | |
assert detector_name in detector_map, f'2D detector: {detector_name} not implemented yet!' | |
return detector_map[detector_name]() | |
class Skeleton: | |
def parents(self): | |
return np.array([-1, 0, 1, 2, 0, 4, 5, 0, 7, 8, 9, 8, 11, 12, 8, 14, 15]) | |
def joints_right(self): | |
return [1, 2, 3, 14, 15, 16] | |
def main(args, progress): | |
detector_2d = get_detector_2d(args.detector_2d) | |
assert detector_2d, 'detector_2d should be in ({alpha, hr, open}_pose)' | |
# 2D kpts loads or generate | |
#args.input_npz = './outputs/alpha_pose_skiing_cut/skiing_cut.npz' | |
if not args.input_npz: | |
video_name = args.viz_video | |
keypoints = detector_2d(video_name, progress) | |
else: | |
npz = np.load(args.input_npz) | |
keypoints = npz['kpts'] # (N, 17, 2) | |
keypoints_symmetry = metadata['keypoints_symmetry'] | |
kps_left, kps_right = list(keypoints_symmetry[0]), list(keypoints_symmetry[1]) | |
joints_left, joints_right = list([4, 5, 6, 11, 12, 13]), list([1, 2, 3, 14, 15, 16]) | |
# normlization keypoints Suppose using the camera parameter | |
keypoints = normalize_screen_coordinates(keypoints[..., :2], w=1000, h=1002) | |
# model_pos = TemporalModel(17, 2, 17, filter_widths=[3, 3, 3, 3, 3], causal=args.causal, dropout=args.dropout, channels=args.channels, | |
# dense=args.dense) | |
model = {} | |
model['trans'] = Model(args) | |
# if torch.cuda.is_available(): | |
# model_pos = model_pos | |
ckpt, time1 = ckpt_time(time0) | |
print('-------------- load data spends {:.2f} seconds'.format(ckpt)) | |
# load trained model | |
# chk_filename = os.path.join(args.checkpoint, args.resume if args.resume else args.evaluate) | |
# print('Loading checkpoint', chk_filename) | |
# checkpoint = torch.load(chk_filename, map_location=lambda storage, loc: storage) # 把loc映射到storage | |
# model_pos.load_state_dict(checkpoint['model_pos']) | |
model_dict = model['trans'].state_dict() | |
no_refine_path = "checkpoint/PSTMOS_no_refine_48_5137_in_the_wild.pth" | |
pre_dict = torch.load(no_refine_path, map_location=torch.device('cpu')) | |
for key, value in pre_dict.items(): | |
name = key[7:] | |
model_dict[name] = pre_dict[key] | |
model['trans'].load_state_dict(model_dict) | |
ckpt, time2 = ckpt_time(time1) | |
print('-------------- load 3D model spends {:.2f} seconds'.format(ckpt)) | |
# Receptive field: 243 frames for args.arc [3, 3, 3, 3, 3] | |
receptive_field = args.frames | |
pad = (receptive_field - 1) // 2 # Padding on each side | |
causal_shift = 0 | |
print('Rendering...') | |
input_keypoints = keypoints.copy() | |
print(input_keypoints.shape) | |
# gen = UnchunkedGenerator(None, None, [input_keypoints], | |
# pad=pad, causal_shift=causal_shift, augment=args.test_time_augmentation, | |
# kps_left=kps_left, kps_right=kps_right, joints_left=joints_left, joints_right=joints_right) | |
# test_data = Fusion(opt=args, train=False, dataset=dataset, root_path=root_path, MAE=opt.MAE) | |
# test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=1, | |
# shuffle=False, num_workers=0, pin_memory=True) | |
#prediction = evaluate(gen, model_pos, return_predictions=True) | |
gen = Evaluate_Generator(128, None, None, [input_keypoints], args.stride, | |
pad=pad, causal_shift=causal_shift, augment=args.test_time_augmentation, shuffle=False, | |
kps_left=kps_left, kps_right=kps_right, joints_left=joints_left, joints_right=joints_right) | |
prediction = val(args, gen, model, progress) | |
# save 3D joint points | |
# np.save(f'outputs/test_3d_{args.video_name}_output.npy', prediction, allow_pickle=True) | |
rot = np.array([0.14070565, -0.15007018, -0.7552408, 0.62232804], dtype=np.float32) | |
prediction = camera_to_world(prediction, R=rot, t=0) | |
# We don't have the trajectory, but at least we can rebase the height | |
prediction[:, :, 2] -= np.min(prediction[:, :, 2]) | |
output_dir_dict = {} | |
npy_filename = f'output_3Dpose_npy/{args.video_name}.npy' | |
output_dir_dict['npy'] = npy_filename | |
np.save(npy_filename, prediction, allow_pickle=True) | |
anim_output = {'Skeleton': prediction} | |
input_keypoints = image_coordinates(input_keypoints[..., :2], w=1000, h=1002) | |
ckpt, time3 = ckpt_time(time2) | |
print('-------------- generate reconstruction 3D data spends {:.2f} seconds'.format(ckpt)) | |
if not args.viz_output: | |
args.viz_output = 'outputs/alpha_result.mp4' | |
from common.visualization import render_animation | |
render_animation(input_keypoints, anim_output, | |
Skeleton(), 25, args.viz_bitrate, np.array(70., dtype=np.float32), args.viz_output, progress, | |
limit=args.viz_limit, downsample=args.viz_downsample, size=args.viz_size, | |
input_video_path=args.viz_video, viewport=(1000, 1002), | |
input_video_skip=args.viz_skip) | |
ckpt, time4 = ckpt_time(time3) | |
print('total spend {:2f} second'.format(ckpt)) | |
return output_dir_dict | |
def inference_video(video_path, detector_2d, progress): | |
""" | |
Do image -> 2d points -> 3d points to video. | |
:param detector_2d: used 2d joints detector. Can be {alpha_pose, hr_pose} | |
:param video_path: relative to outputs | |
:return: None | |
""" | |
args = parse_args() | |
args.detector_2d = detector_2d | |
dir_name = os.path.dirname(video_path) | |
basename = os.path.basename(video_path) | |
args.video_name = basename[:basename.rfind('.')] | |
args.viz_video = video_path | |
args.viz_output = f'output_videos/{args.video_name}.mp4' | |
args.evaluate = 'pretrained_h36m_detectron_coco.bin' | |
with Timer(video_path): | |
output_dir_dict = main(args, progress) | |
output_dir_dict["output_videos"] = args.viz_output | |
output_dir_dict["video_name"] = args.video_name | |
return output_dir_dict | |
def gr_video2mc(video_path, progress): | |
print("\n>>>>> One video uploaded <<<<<\n") | |
china_tz = pytz.timezone('Asia/Shanghai') | |
current_time = datetime.now(china_tz) | |
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') | |
print(f"Start Time: {formatted_time}\n") | |
if not os.path.exists('output_3Dpose_npy'): | |
os.makedirs('output_3Dpose_npy') | |
if not os.path.exists('output_alphapose'): | |
os.makedirs('output_alphapose') | |
if not os.path.exists('output_miframes'): | |
os.makedirs('output_miframes') | |
if not os.path.exists('output_videos'): | |
os.makedirs('output_videos') | |
FPS_mine_imator = 30 | |
output_dir_dict = inference_video(video_path, 'alpha_pose', progress) | |
Hk.hpe2keyframes(output_dir_dict['npy'], FPS_mine_imator, f"output_miframes/{output_dir_dict['video_name']}.miframes") | |
path1 = os.path.abspath(f"output_miframes/{output_dir_dict['video_name']}.miframes") | |
path2 = os.path.abspath(f"output_videos/{output_dir_dict['video_name']}.mp4") | |
print("\n----- One video processed -----\n") | |
china_tz = pytz.timezone('Asia/Shanghai') | |
current_time = datetime.now(china_tz) | |
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') | |
print(f"Finished Time: {formatted_time}\n") | |
return path1, path2 | |
if __name__ == '__main__': | |
files = os.listdir('./input_videos') | |
FPS_mine_imator = 30 | |
for file in files: | |
output_dir_dict = inference_video(os.path.join('input_videos', file), 'alpha_pose') | |
Hk.hpe2keyframes(output_dir_dict['npy'], FPS_mine_imator, f"output_miframes/{output_dir_dict['video_name']}.miframes") | |