Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import gc | |
import torch | |
import cv2 | |
import gradio as gr | |
import numpy as np | |
import matplotlib.cm as cm | |
import matplotlib | |
import subprocess | |
import sys | |
import spaces | |
from video_depth_anything.video_depth import VideoDepthAnything | |
from utils.dc_utils import save_video | |
from huggingface_hub import hf_hub_download | |
# Examples for the Gradio Demo. | |
examples = [ | |
['assets/example_videos/octopus_01.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/chicken_01.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/gorilla_01.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/davis_rollercoaster.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/Tokyo-Walk_rgb.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/4158877-uhd_3840_2160_30fps_rgb.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/4511004-uhd_3840_2160_24fps_rgb.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/1753029-hd_1920_1080_30fps.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/davis_burnout.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/example_5473765-l.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/Istanbul-26920.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/obj_1.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/sheep_cut1.mp4', -1, -1, 1280, True, True, True, 0.3], | |
] | |
# Use GPU if available; otherwise, use CPU. | |
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' | |
# Model configuration for different encoder variants. | |
model_configs = { | |
'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]}, | |
'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]}, | |
} | |
encoder2name = { | |
'vits': 'Small', | |
'vitl': 'Large', | |
} | |
encoder = 'vitl' | |
model_name = encoder2name[encoder] | |
# Initialize the model. | |
video_depth_anything = VideoDepthAnything(**model_configs[encoder]) | |
filepath = hf_hub_download( | |
repo_id=f"depth-anything/Video-Depth-Anything-{model_name}", | |
filename=f"video_depth_anything_{encoder}.pth", | |
repo_type="model" | |
) | |
video_depth_anything.load_state_dict(torch.load(filepath, map_location='cpu')) | |
video_depth_anything = video_depth_anything.to(DEVICE).eval() | |
title = "# Video Depth Anything + RGBD sbs output" | |
description = """**Video Depth Anything** + RGBD sbs output for viewing with Looking Glass Factory displays. | |
Please refer to our [paper](https://arxiv.org/abs/2501.12375), [project page](https://videodepthanything.github.io/), and [github](https://github.com/DepthAnything/Video-Depth-Anything) for more details.""" | |
def get_video_info(video_path, max_len=-1, target_fps=-1): | |
"""Extract video information without loading all frames into memory.""" | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise ValueError(f"Could not open video file: {video_path}") | |
# Get video properties | |
original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
original_fps = cap.get(cv2.CAP_PROP_FPS) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
# Adjust based on max_len parameter | |
if max_len > 0 and max_len < total_frames: | |
frame_count = max_len | |
else: | |
frame_count = total_frames | |
# Use target_fps if specified | |
if target_fps > 0: | |
fps = target_fps | |
else: | |
fps = original_fps | |
cap.release() | |
return { | |
'width': original_width, | |
'height': original_height, | |
'fps': fps, | |
'original_fps': original_fps, | |
'frame_count': frame_count, | |
'total_frames': total_frames | |
} | |
def process_frame(frame, max_res): | |
"""Process a single frame to the desired resolution.""" | |
if max_res > 0: | |
h, w = frame.shape[:2] | |
scale = min(max_res / w, max_res / h) | |
if scale < 1: | |
new_w, new_h = int(w * scale), int(h * scale) | |
frame = cv2.resize(frame, (new_w, new_h)) | |
return frame | |
def read_video_frames_chunked(video_path, max_len=-1, target_fps=-1, max_res=-1, chunk_size=32): | |
"""Read video frames in chunks to avoid loading the entire video into memory.""" | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise ValueError(f"Could not open video file: {video_path}") | |
original_fps = cap.get(cv2.CAP_PROP_FPS) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
# Determine actual number of frames to process | |
if max_len > 0 and max_len < total_frames: | |
frame_count = max_len | |
else: | |
frame_count = total_frames | |
# Use target_fps if specified | |
if target_fps > 0: | |
fps = target_fps | |
# Calculate frame skip if downsampling fps | |
if target_fps < original_fps: | |
skip = int(round(original_fps / target_fps)) - 1 | |
else: | |
skip = 0 | |
else: | |
fps = original_fps | |
skip = 0 | |
frame_idx = 0 | |
processed_count = 0 | |
while processed_count < frame_count: | |
frames_chunk = [] | |
# Read frames up to chunk size or remaining frames | |
chunk_limit = min(chunk_size, frame_count - processed_count) | |
while len(frames_chunk) < chunk_limit: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Process frame if we're not skipping it | |
if frame_idx % (skip + 1) == 0: | |
# Convert from BGR to RGB | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# Resize if necessary | |
frame = process_frame(frame, max_res) | |
frames_chunk.append(frame) | |
processed_count += 1 | |
if processed_count >= frame_count: | |
break | |
frame_idx += 1 | |
if frames_chunk: | |
yield frames_chunk, fps | |
if processed_count >= frame_count or len(frames_chunk) < chunk_limit: | |
break | |
cap.release() | |
def infer_video_depth( | |
input_video: str, | |
max_len: int = -1, | |
target_fps: int = -1, | |
max_res: int = 1280, | |
stitch: bool = True, | |
grayscale: bool = True, | |
convert_from_color: bool = True, | |
blur: float = 0.3, | |
output_dir: str = './outputs', | |
input_size: int = 518, | |
): | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
video_name = os.path.basename(input_video) | |
processed_video_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_src.mp4') | |
depth_vis_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_vis.mp4') | |
# Get video info first | |
video_info = get_video_info(input_video, max_len, target_fps) | |
fps = video_info['fps'] | |
frame_count = video_info['frame_count'] | |
print(f"Processing video: {input_video}, {frame_count} frames at {fps} fps") | |
# Process the video in chunks to manage memory | |
chunk_size = 32 # Adjust based on available memory | |
# We'll collect depths as we go to calculate global min/max | |
all_depths = [] | |
all_processed_frames = [] | |
# First pass to collect frames and depths | |
frame_idx = 0 | |
for frames_chunk, fps in read_video_frames_chunked(input_video, max_len, target_fps, max_res, chunk_size): | |
print(f"Processing chunk: frames {frame_idx+1}-{frame_idx+len(frames_chunk)}/{frame_count}") | |
# Process this chunk of frames | |
depths, _ = video_depth_anything.infer_video_depth(frames_chunk, fps, input_size=input_size, device=DEVICE) | |
# Store results (we'll need both for the output videos) | |
all_processed_frames.extend(frames_chunk) | |
all_depths.extend(depths) | |
frame_idx += len(frames_chunk) | |
# Free memory | |
gc.collect() | |
torch.cuda.empty_cache() | |
# Calculate global min/max for depth normalization | |
depths_array = np.array(all_depths) | |
d_min, d_max = depths_array.min(), depths_array.max() | |
# Save the preprocessed video and depth visualization | |
save_video(all_processed_frames, processed_video_path, fps=fps) | |
save_video(all_depths, depth_vis_path, fps=fps, is_depths=True) | |
# Free some memory before stitching | |
del all_processed_frames | |
gc.collect() | |
# Process stitched video if requested | |
stitched_video_path = None | |
if stitch: | |
# Use only the first 20 characters of the base name for the output filename | |
base_name = os.path.splitext(video_name)[0] | |
short_name = base_name[:20] | |
stitched_video_path = os.path.join(output_dir, short_name + '_RGBD.mp4') | |
# For stitching: read the original video in full resolution and stitch frames one by one | |
stitched_frames = [] | |
# Process in chunks for memory efficiency | |
frame_idx = 0 | |
for frames_chunk, _ in read_video_frames_chunked(input_video, max_len, target_fps, -1, chunk_size): # No max_res for original resolution | |
print(f"Stitching chunk: frames {frame_idx+1}-{frame_idx+len(frames_chunk)}/{frame_count}") | |
# Process each frame in the chunk | |
for i, rgb_full in enumerate(frames_chunk): | |
depth_idx = frame_idx + i | |
if depth_idx >= len(all_depths): | |
break | |
depth_frame = all_depths[depth_idx] | |
# Normalize the depth frame | |
depth_norm = ((depth_frame - d_min) / (d_max - d_min) * 255).astype(np.uint8) | |
# Generate depth visualization | |
if grayscale: | |
if convert_from_color: | |
# Convert from color to grayscale | |
cmap = matplotlib.colormaps.get_cmap("inferno") | |
depth_color = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) | |
depth_gray = cv2.cvtColor(depth_color, cv2.COLOR_RGB2GRAY) | |
depth_vis = np.stack([depth_gray] * 3, axis=-1) | |
else: | |
# Directly use grayscale | |
depth_vis = np.stack([depth_norm] * 3, axis=-1) | |
else: | |
# Use color visualization | |
cmap = matplotlib.colormaps.get_cmap("inferno") | |
depth_vis = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) | |
# Apply blur if requested | |
if blur > 0: | |
kernel_size = int(blur * 20) * 2 + 1 # Ensures odd kernel size | |
depth_vis = cv2.GaussianBlur(depth_vis, (kernel_size, kernel_size), 0) | |
# Resize depth visualization to match original resolution | |
H_full, W_full = rgb_full.shape[:2] | |
depth_vis_resized = cv2.resize(depth_vis, (W_full, H_full)) | |
# Concatenate RGB and depth | |
stitched = cv2.hconcat([rgb_full, depth_vis_resized]) | |
stitched_frames.append(stitched) | |
frame_idx += len(frames_chunk) | |
# Free memory after processing each chunk | |
gc.collect() | |
# Save the stitched video | |
save_video(stitched_frames, stitched_video_path, fps=fps) | |
# Merge audio from the input video | |
temp_audio_path = stitched_video_path.replace('_RGBD.mp4', '_RGBD_audio.mp4') | |
cmd = [ | |
"ffmpeg", | |
"-y", | |
"-i", stitched_video_path, | |
"-i", input_video, | |
"-c:v", "copy", | |
"-c:a", "aac", | |
"-map", "0:v:0", | |
"-map", "1:a:0?", | |
"-shortest", | |
temp_audio_path | |
] | |
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
os.replace(temp_audio_path, stitched_video_path) | |
# Free memory | |
del stitched_frames | |
# Clean up | |
del all_depths | |
gc.collect() | |
torch.cuda.empty_cache() | |
return [processed_video_path, depth_vis_path, stitched_video_path] | |
def construct_demo(): | |
with gr.Blocks(analytics_enabled=False) as demo: | |
gr.Markdown(title) | |
gr.Markdown(description) | |
gr.Markdown("### If you find this work useful, please help ⭐ the [Github Repo](https://github.com/DepthAnything/Video-Depth-Anything). Thanks for your attention!") | |
with gr.Row(equal_height=True): | |
with gr.Column(scale=1): | |
# Video input component for file upload. | |
input_video = gr.Video(label="Input Video") | |
with gr.Column(scale=2): | |
with gr.Row(equal_height=True): | |
processed_video = gr.Video(label="Preprocessed Video", interactive=False, autoplay=True, loop=True, show_share_button=True, scale=5) | |
depth_vis_video = gr.Video(label="Generated Depth Video", interactive=False, autoplay=True, loop=True, show_share_button=True, scale=5) | |
stitched_video = gr.Video(label="Stitched RGBD Video", interactive=False, autoplay=True, loop=True, show_share_button=True, scale=5) | |
with gr.Row(equal_height=True): | |
with gr.Column(scale=1): | |
with gr.Accordion("Advanced Settings", open=False): | |
max_len = gr.Slider(label="Max process length", minimum=-1, maximum=1000, value=-1, step=1) | |
target_fps = gr.Slider(label="Target FPS", minimum=-1, maximum=30, value=-1, step=1) | |
max_res = gr.Slider(label="Max side resolution", minimum=480, maximum=1920, value=1280, step=1) | |
stitch_option = gr.Checkbox(label="Stitch RGB & Depth Videos", value=True) | |
grayscale_option = gr.Checkbox(label="Output Depth as Grayscale", value=True) | |
convert_from_color_option = gr.Checkbox(label="Convert Grayscale from Color", value=True) | |
blur_slider = gr.Slider(minimum=0, maximum=1, step=0.01, label="Depth Blur (can reduce edge artifacts on display)", value=0.3) | |
generate_btn = gr.Button("Generate") | |
with gr.Column(scale=2): | |
pass | |
gr.Examples( | |
examples=examples, | |
inputs=[input_video, max_len, target_fps, max_res, stitch_option, grayscale_option, convert_from_color_option, blur_slider], | |
outputs=[processed_video, depth_vis_video, stitched_video], | |
fn=infer_video_depth, | |
cache_examples=False, | |
cache_mode="lazy", | |
) | |
generate_btn.click( | |
fn=infer_video_depth, | |
inputs=[input_video, max_len, target_fps, max_res, stitch_option, grayscale_option, convert_from_color_option, blur_slider], | |
outputs=[processed_video, depth_vis_video, stitched_video], | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = construct_demo() | |
demo.queue(max_size=2).launch() |