Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import gc | |
import torch | |
import cv2 | |
import gradio as gr | |
import numpy as np | |
import matplotlib.cm as cm | |
import matplotlib | |
import subprocess | |
import sys | |
import spaces | |
from video_depth_anything.video_depth import VideoDepthAnything | |
from utils.dc_utils import save_video | |
from huggingface_hub import hf_hub_download | |
# Examples for the Gradio Demo. | |
examples = [ | |
['assets/example_videos/octopus_01.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/chicken_01.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/gorilla_01.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/davis_rollercoaster.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/Tokyo-Walk_rgb.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/4158877-uhd_3840_2160_30fps_rgb.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/4511004-uhd_3840_2160_24fps_rgb.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/1753029-hd_1920_1080_30fps.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/davis_burnout.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/example_5473765-l.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/Istanbul-26920.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/obj_1.mp4', -1, -1, 1280, True, True, True, 0.3], | |
['assets/example_videos/sheep_cut1.mp4', -1, -1, 1280, True, True, True, 0.3], | |
] | |
# Use GPU if available; otherwise, use CPU. | |
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' | |
# Model configuration for different encoder variants. | |
model_configs = { | |
'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]}, | |
'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]}, | |
} | |
encoder2name = { | |
'vits': 'Small', | |
'vitl': 'Large', | |
} | |
encoder = 'vitl' | |
model_name = encoder2name[encoder] | |
# Initialize the model. | |
video_depth_anything = VideoDepthAnything(**model_configs[encoder]) | |
filepath = hf_hub_download( | |
repo_id=f"depth-anything/Video-Depth-Anything-{model_name}", | |
filename=f"video_depth_anything_{encoder}.pth", | |
repo_type="model" | |
) | |
video_depth_anything.load_state_dict(torch.load(filepath, map_location='cpu')) | |
video_depth_anything = video_depth_anything.to(DEVICE).eval() | |
title = "# Video Depth Anything + RGBD sbs output" | |
description = """**Video Depth Anything** + RGBD sbs output for viewing with Looking Glass Factory displays. | |
Please refer to our [paper](https://arxiv.org/abs/2501.12375), [project page](https://videodepthanything.github.io/), and [github](https://github.com/DepthAnything/Video-Depth-Anything) for more details.""" | |
def get_video_info(video_path, max_len=-1, target_fps=-1): | |
"""Extract video information without loading all frames into memory.""" | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise ValueError(f"Could not open video file: {video_path}") | |
# Get video properties | |
original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
original_fps = cap.get(cv2.CAP_PROP_FPS) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
# Adjust based on max_len parameter | |
if max_len > 0 and max_len < total_frames: | |
frame_count = max_len | |
else: | |
frame_count = total_frames | |
# Use target_fps if specified | |
if target_fps > 0: | |
fps = target_fps | |
else: | |
fps = original_fps | |
cap.release() | |
return { | |
'width': original_width, | |
'height': original_height, | |
'fps': fps, | |
'original_fps': original_fps, | |
'frame_count': frame_count, | |
'total_frames': total_frames | |
} | |
def process_frame(frame, max_res): | |
"""Process a single frame to the desired resolution.""" | |
if max_res > 0: | |
h, w = frame.shape[:2] | |
scale = min(max_res / w, max_res / h) | |
if scale < 1: | |
new_w, new_h = int(w * scale), int(h * scale) | |
frame = cv2.resize(frame, (new_w, new_h)) | |
return frame | |
def frame_generator(video_path, max_len=-1, target_fps=-1, max_res=-1, skip_frames=0): | |
"""Generate frames from a video file one at a time.""" | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise ValueError(f"Could not open video file: {video_path}") | |
original_fps = cap.get(cv2.CAP_PROP_FPS) | |
frame_count = 0 | |
# Calculate frame skip if target_fps is specified | |
if target_fps > 0 and target_fps < original_fps: | |
skip = int(round(original_fps / target_fps)) - 1 | |
else: | |
skip = skip_frames | |
frame_idx = 0 | |
while True: | |
ret, frame = cap.read() | |
if not ret or (max_len > 0 and frame_count >= max_len): | |
break | |
# Process frame if we're not skipping it | |
if frame_idx % (skip + 1) == 0: | |
# Convert from BGR to RGB | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# Resize if necessary | |
processed_frame = process_frame(frame, max_res) | |
yield processed_frame | |
frame_count += 1 | |
frame_idx += 1 | |
cap.release() | |
def infer_video_depth( | |
input_video: str, | |
max_len: int = -1, | |
target_fps: int = -1, | |
max_res: int = 1280, | |
stitch: bool = True, | |
grayscale: bool = True, | |
convert_from_color: bool = True, | |
blur: float = 0.3, | |
output_dir: str = './outputs', | |
input_size: int = 518, | |
): | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
video_name = os.path.basename(input_video) | |
processed_video_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_src.mp4') | |
depth_vis_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_vis.mp4') | |
# Get video info first | |
video_info = get_video_info(input_video, max_len, target_fps) | |
fps = video_info['fps'] | |
frame_count = video_info['frame_count'] | |
# Set up VideoWriters | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
# Setup for processing batches of frames | |
batch_size = 8 # Process frames in small batches to balance efficiency and memory usage | |
processed_frames = [] | |
depth_frames = [] | |
stitched_frames = [] | |
# Initialize min/max depth values for depth normalization | |
d_min, d_max = float('inf'), float('-inf') | |
depth_values = [] | |
# First pass: Process frames for depth inference and collect min/max depth values | |
print(f"Processing video: {input_video}, {frame_count} frames at {fps} fps") | |
# Process frames in batches for depth inference | |
frame_gen = frame_generator(input_video, max_len, target_fps, max_res) | |
batch_count = 0 | |
for i, frame in enumerate(frame_gen): | |
if i % 10 == 0: | |
print(f"Processing frame {i+1}/{frame_count}") | |
processed_frames.append(frame) | |
# When we have a full batch or reached the end, process it | |
if len(processed_frames) == batch_size or i == frame_count - 1: | |
# Process the batch for depth | |
with torch.no_grad(): | |
batch_depths = video_depth_anything.infer_frames_depth( | |
processed_frames, | |
input_size=input_size, | |
device=DEVICE | |
) | |
# Collect depth statistics and frames | |
for depth in batch_depths: | |
d_min = min(d_min, depth.min()) | |
d_max = max(d_max, depth.max()) | |
depth_values.append(depth) | |
# Clear batch for next iteration | |
processed_frames = [] | |
batch_count += 1 | |
# Free up memory | |
torch.cuda.empty_cache() | |
gc.collect() | |
# Save the processed video | |
height, width = depth_values[0].shape[:2] if depth_values else (0, 0) | |
video_writer = cv2.VideoWriter(processed_video_path, fourcc, fps, (width, height)) | |
# Reprocess frames to save original and depth videos | |
frame_gen = frame_generator(input_video, max_len, target_fps, max_res) | |
depth_writer = cv2.VideoWriter(depth_vis_path, fourcc, fps, (width, height)) | |
for i, (frame, depth) in enumerate(zip(frame_gen, depth_values)): | |
# Save original frame (convert RGB to BGR for OpenCV) | |
video_writer.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)) | |
# Normalize and visualize depth | |
depth_norm = ((depth - d_min) / (d_max - d_min) * 255).astype(np.uint8) | |
if grayscale: | |
if convert_from_color: | |
cmap = matplotlib.colormaps.get_cmap("inferno") | |
depth_color = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) | |
depth_gray = cv2.cvtColor(depth_color, cv2.COLOR_RGB2GRAY) | |
depth_vis = np.stack([depth_gray] * 3, axis=-1) | |
else: | |
depth_vis = np.stack([depth_norm] * 3, axis=-1) | |
else: | |
cmap = matplotlib.colormaps.get_cmap("inferno") | |
depth_vis = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) | |
# Apply blur if requested | |
if blur > 0: | |
kernel_size = int(blur * 20) * 2 + 1 # Ensures an odd kernel size | |
depth_vis = cv2.GaussianBlur(depth_vis, (kernel_size, kernel_size), 0) | |
# Save depth visualization (convert RGB to BGR for OpenCV) | |
depth_writer.write(cv2.cvtColor(depth_vis, cv2.COLOR_RGB2BGR)) | |
video_writer.release() | |
depth_writer.release() | |
# Process stitched video if requested | |
stitched_video_path = None | |
if stitch: | |
# For stitching: read the original video in full resolution | |
video_info_full = get_video_info(input_video, max_len, target_fps) | |
original_frame_gen = frame_generator(input_video, max_len, target_fps, -1) # No resizing | |
# Create a new writer for the stitched video | |
base_name = os.path.splitext(video_name)[0] | |
short_name = base_name[:20] | |
stitched_video_path = os.path.join(output_dir, short_name + '_RGBD.mp4') | |
# Get dimensions of the first frame to setup the video writer | |
first_frame = next(frame_generator(input_video, 1, -1, -1)) | |
H_full, W_full = first_frame.shape[:2] | |
# Set up the stitched video writer | |
stitched_writer = cv2.VideoWriter( | |
stitched_video_path, | |
fourcc, | |
fps, | |
(W_full * 2, H_full) # Width is doubled for side-by-side | |
) | |
# Reset frame generator | |
original_frame_gen = frame_generator(input_video, max_len, target_fps, -1) | |
# Process each frame | |
for i, (rgb_full, depth) in enumerate(zip(original_frame_gen, depth_values)): | |
if i % 10 == 0: | |
print(f"Stitching frame {i+1}/{frame_count}") | |
# Normalize and visualize depth | |
depth_norm = ((depth - d_min) / (d_max - d_min) * 255).astype(np.uint8) | |
# Generate depth visualization | |
if grayscale: | |
if convert_from_color: | |
cmap = matplotlib.colormaps.get_cmap("inferno") | |
depth_color = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) | |
depth_gray = cv2.cvtColor(depth_color, cv2.COLOR_RGB2GRAY) | |
depth_vis = np.stack([depth_gray] * 3, axis=-1) | |
else: | |
depth_vis = np.stack([depth_norm] * 3, axis=-1) | |
else: | |
cmap = matplotlib.colormaps.get_cmap("inferno") | |
depth_vis = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) | |
# Apply blur if requested | |
if blur > 0: | |
kernel_size = int(blur * 20) * 2 + 1 | |
depth_vis = cv2.GaussianBlur(depth_vis, (kernel_size, kernel_size), 0) | |
# Resize depth to match original frame | |
H_full, W_full = rgb_full.shape[:2] | |
depth_vis_resized = cv2.resize(depth_vis, (W_full, H_full)) | |
# Concatenate RGB and depth | |
stitched = cv2.hconcat([rgb_full, depth_vis_resized]) | |
# Write to video (convert RGB to BGR for OpenCV) | |
stitched_writer.write(cv2.cvtColor(stitched, cv2.COLOR_RGB2BGR)) | |
stitched_writer.release() | |
# Merge audio from the input video into the stitched video | |
temp_audio_path = stitched_video_path.replace('_RGBD.mp4', '_RGBD_audio.mp4') | |
cmd = [ | |
"ffmpeg", | |
"-y", | |
"-i", stitched_video_path, | |
"-i", input_video, | |
"-c:v", "copy", | |
"-c:a", "aac", | |
"-map", "0:v:0", | |
"-map", "1:a:0?", | |
"-shortest", | |
temp_audio_path | |
] | |
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
os.replace(temp_audio_path, stitched_video_path) | |
# Clean up | |
gc.collect() | |
torch.cuda.empty_cache() | |
return [processed_video_path, depth_vis_path, stitched_video_path] | |
def construct_demo(): | |
with gr.Blocks(analytics_enabled=False) as demo: | |
gr.Markdown(title) | |
gr.Markdown(description) | |
gr.Markdown("### If you find this work useful, please help ⭐ the [Github Repo](https://github.com/DepthAnything/Video-Depth-Anything). Thanks for your attention!") | |
with gr.Row(equal_height=True): | |
with gr.Column(scale=1): | |
# Video input component for file upload. | |
input_video = gr.Video(label="Input Video") | |
with gr.Column(scale=2): | |
with gr.Row(equal_height=True): | |
processed_video = gr.Video(label="Preprocessed Video", interactive=False, autoplay=True, loop=True, show_share_button=True, scale=5) | |
depth_vis_video = gr.Video(label="Generated Depth Video", interactive=False, autoplay=True, loop=True, show_share_button=True, scale=5) | |
stitched_video = gr.Video(label="Stitched RGBD Video", interactive=False, autoplay=True, loop=True, show_share_button=True, scale=5) | |
with gr.Row(equal_height=True): | |
with gr.Column(scale=1): | |
with gr.Accordion("Advanced Settings", open=False): | |
max_len = gr.Slider(label="Max process length", minimum=-1, maximum=1000, value=-1, step=1) | |
target_fps = gr.Slider(label="Target FPS", minimum=-1, maximum=30, value=-1, step=1) | |
max_res = gr.Slider(label="Max side resolution", minimum=480, maximum=1920, value=1280, step=1) | |
stitch_option = gr.Checkbox(label="Stitch RGB & Depth Videos", value=True) | |
grayscale_option = gr.Checkbox(label="Output Depth as Grayscale", value=True) | |
convert_from_color_option = gr.Checkbox(label="Convert Grayscale from Color", value=True) | |
blur_slider = gr.Slider(minimum=0, maximum=1, step=0.01, label="Depth Blur (can reduce edge artifacts on display)", value=0.3) | |
generate_btn = gr.Button("Generate") | |
with gr.Column(scale=2): | |
pass | |
gr.Examples( | |
examples=examples, | |
inputs=[input_video, max_len, target_fps, max_res, stitch_option, grayscale_option, convert_from_color_option, blur_slider], | |
outputs=[processed_video, depth_vis_video, stitched_video], | |
fn=infer_video_depth, | |
cache_examples=False, | |
cache_mode="lazy", | |
) | |
generate_btn.click( | |
fn=infer_video_depth, | |
inputs=[input_video, max_len, target_fps, max_res, stitch_option, grayscale_option, convert_from_color_option, blur_slider], | |
outputs=[processed_video, depth_vis_video, stitched_video], | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = construct_demo() | |
demo.queue(max_size=2).launch() |