import os import gc import torch import cv2 import gradio as gr import numpy as np import matplotlib.cm as cm import matplotlib import subprocess import sys import spaces from video_depth_anything.video_depth import VideoDepthAnything from utils.dc_utils import save_video from huggingface_hub import hf_hub_download # Examples for the Gradio Demo. examples = [ ['assets/example_videos/octopus_01.mp4', -1, -1, 1280, True, True, True, 0.3], ['assets/example_videos/chicken_01.mp4', -1, -1, 1280, True, True, True, 0.3], ['assets/example_videos/gorilla_01.mp4', -1, -1, 1280, True, True, True, 0.3], ['assets/example_videos/davis_rollercoaster.mp4', -1, -1, 1280, True, True, True, 0.3], ['assets/example_videos/Tokyo-Walk_rgb.mp4', -1, -1, 1280, True, True, True, 0.3], ['assets/example_videos/4158877-uhd_3840_2160_30fps_rgb.mp4', -1, -1, 1280, True, True, True, 0.3], ['assets/example_videos/4511004-uhd_3840_2160_24fps_rgb.mp4', -1, -1, 1280, True, True, True, 0.3], ['assets/example_videos/1753029-hd_1920_1080_30fps.mp4', -1, -1, 1280, True, True, True, 0.3], ['assets/example_videos/davis_burnout.mp4', -1, -1, 1280, True, True, True, 0.3], ['assets/example_videos/example_5473765-l.mp4', -1, -1, 1280, True, True, True, 0.3], ['assets/example_videos/Istanbul-26920.mp4', -1, -1, 1280, True, True, True, 0.3], ['assets/example_videos/obj_1.mp4', -1, -1, 1280, True, True, True, 0.3], ['assets/example_videos/sheep_cut1.mp4', -1, -1, 1280, True, True, True, 0.3], ] # Use GPU if available; otherwise, use CPU. DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' # Model configuration for different encoder variants. model_configs = { 'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]}, 'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]}, } encoder2name = { 'vits': 'Small', 'vitl': 'Large', } encoder = 'vitl' model_name = encoder2name[encoder] # Initialize the model. video_depth_anything = VideoDepthAnything(**model_configs[encoder]) filepath = hf_hub_download( repo_id=f"depth-anything/Video-Depth-Anything-{model_name}", filename=f"video_depth_anything_{encoder}.pth", repo_type="model" ) video_depth_anything.load_state_dict(torch.load(filepath, map_location='cpu')) video_depth_anything = video_depth_anything.to(DEVICE).eval() title = "# Video Depth Anything + RGBD sbs output" description = """**Video Depth Anything** + RGBD sbs output for viewing with Looking Glass Factory displays. Please refer to our [paper](https://arxiv.org/abs/2501.12375), [project page](https://videodepthanything.github.io/), and [github](https://github.com/DepthAnything/Video-Depth-Anything) for more details.""" def get_video_info(video_path, max_len=-1, target_fps=-1): """Extract video information without loading all frames into memory.""" cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Could not open video file: {video_path}") # Get video properties original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) original_fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Adjust based on max_len parameter if max_len > 0 and max_len < total_frames: frame_count = max_len else: frame_count = total_frames # Use target_fps if specified if target_fps > 0: fps = target_fps else: fps = original_fps cap.release() return { 'width': original_width, 'height': original_height, 'fps': fps, 'original_fps': original_fps, 'frame_count': frame_count, 'total_frames': total_frames } def process_frame(frame, max_res): """Process a single frame to the desired resolution.""" if max_res > 0: h, w = frame.shape[:2] scale = min(max_res / w, max_res / h) if scale < 1: new_w, new_h = int(w * scale), int(h * scale) frame = cv2.resize(frame, (new_w, new_h)) return frame def frame_generator(video_path, max_len=-1, target_fps=-1, max_res=-1, skip_frames=0): """Generate frames from a video file one at a time.""" cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Could not open video file: {video_path}") original_fps = cap.get(cv2.CAP_PROP_FPS) frame_count = 0 # Calculate frame skip if target_fps is specified if target_fps > 0 and target_fps < original_fps: skip = int(round(original_fps / target_fps)) - 1 else: skip = skip_frames frame_idx = 0 while True: ret, frame = cap.read() if not ret or (max_len > 0 and frame_count >= max_len): break # Process frame if we're not skipping it if frame_idx % (skip + 1) == 0: # Convert from BGR to RGB frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Resize if necessary processed_frame = process_frame(frame, max_res) yield processed_frame frame_count += 1 frame_idx += 1 cap.release() @spaces.GPU(enable_queue=True) def infer_video_depth( input_video: str, max_len: int = -1, target_fps: int = -1, max_res: int = 1280, stitch: bool = True, grayscale: bool = True, convert_from_color: bool = True, blur: float = 0.3, output_dir: str = './outputs', input_size: int = 518, ): if not os.path.exists(output_dir): os.makedirs(output_dir) video_name = os.path.basename(input_video) processed_video_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_src.mp4') depth_vis_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_vis.mp4') # Get video info first video_info = get_video_info(input_video, max_len, target_fps) fps = video_info['fps'] frame_count = video_info['frame_count'] # Set up VideoWriters fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Setup for processing batches of frames batch_size = 8 # Process frames in small batches to balance efficiency and memory usage processed_frames = [] depth_frames = [] stitched_frames = [] # Initialize min/max depth values for depth normalization d_min, d_max = float('inf'), float('-inf') depth_values = [] # First pass: Process frames for depth inference and collect min/max depth values print(f"Processing video: {input_video}, {frame_count} frames at {fps} fps") # Process frames in batches for depth inference frame_gen = frame_generator(input_video, max_len, target_fps, max_res) batch_count = 0 for i, frame in enumerate(frame_gen): if i % 10 == 0: print(f"Processing frame {i+1}/{frame_count}") processed_frames.append(frame) # When we have a full batch or reached the end, process it if len(processed_frames) == batch_size or i == frame_count - 1: # Process the batch for depth with torch.no_grad(): batch_depths = video_depth_anything.infer_frames_depth( processed_frames, input_size=input_size, device=DEVICE ) # Collect depth statistics and frames for depth in batch_depths: d_min = min(d_min, depth.min()) d_max = max(d_max, depth.max()) depth_values.append(depth) # Clear batch for next iteration processed_frames = [] batch_count += 1 # Free up memory torch.cuda.empty_cache() gc.collect() # Save the processed video height, width = depth_values[0].shape[:2] if depth_values else (0, 0) video_writer = cv2.VideoWriter(processed_video_path, fourcc, fps, (width, height)) # Reprocess frames to save original and depth videos frame_gen = frame_generator(input_video, max_len, target_fps, max_res) depth_writer = cv2.VideoWriter(depth_vis_path, fourcc, fps, (width, height)) for i, (frame, depth) in enumerate(zip(frame_gen, depth_values)): # Save original frame (convert RGB to BGR for OpenCV) video_writer.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)) # Normalize and visualize depth depth_norm = ((depth - d_min) / (d_max - d_min) * 255).astype(np.uint8) if grayscale: if convert_from_color: cmap = matplotlib.colormaps.get_cmap("inferno") depth_color = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) depth_gray = cv2.cvtColor(depth_color, cv2.COLOR_RGB2GRAY) depth_vis = np.stack([depth_gray] * 3, axis=-1) else: depth_vis = np.stack([depth_norm] * 3, axis=-1) else: cmap = matplotlib.colormaps.get_cmap("inferno") depth_vis = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) # Apply blur if requested if blur > 0: kernel_size = int(blur * 20) * 2 + 1 # Ensures an odd kernel size depth_vis = cv2.GaussianBlur(depth_vis, (kernel_size, kernel_size), 0) # Save depth visualization (convert RGB to BGR for OpenCV) depth_writer.write(cv2.cvtColor(depth_vis, cv2.COLOR_RGB2BGR)) video_writer.release() depth_writer.release() # Process stitched video if requested stitched_video_path = None if stitch: # For stitching: read the original video in full resolution video_info_full = get_video_info(input_video, max_len, target_fps) original_frame_gen = frame_generator(input_video, max_len, target_fps, -1) # No resizing # Create a new writer for the stitched video base_name = os.path.splitext(video_name)[0] short_name = base_name[:20] stitched_video_path = os.path.join(output_dir, short_name + '_RGBD.mp4') # Get dimensions of the first frame to setup the video writer first_frame = next(frame_generator(input_video, 1, -1, -1)) H_full, W_full = first_frame.shape[:2] # Set up the stitched video writer stitched_writer = cv2.VideoWriter( stitched_video_path, fourcc, fps, (W_full * 2, H_full) # Width is doubled for side-by-side ) # Reset frame generator original_frame_gen = frame_generator(input_video, max_len, target_fps, -1) # Process each frame for i, (rgb_full, depth) in enumerate(zip(original_frame_gen, depth_values)): if i % 10 == 0: print(f"Stitching frame {i+1}/{frame_count}") # Normalize and visualize depth depth_norm = ((depth - d_min) / (d_max - d_min) * 255).astype(np.uint8) # Generate depth visualization if grayscale: if convert_from_color: cmap = matplotlib.colormaps.get_cmap("inferno") depth_color = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) depth_gray = cv2.cvtColor(depth_color, cv2.COLOR_RGB2GRAY) depth_vis = np.stack([depth_gray] * 3, axis=-1) else: depth_vis = np.stack([depth_norm] * 3, axis=-1) else: cmap = matplotlib.colormaps.get_cmap("inferno") depth_vis = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8) # Apply blur if requested if blur > 0: kernel_size = int(blur * 20) * 2 + 1 depth_vis = cv2.GaussianBlur(depth_vis, (kernel_size, kernel_size), 0) # Resize depth to match original frame H_full, W_full = rgb_full.shape[:2] depth_vis_resized = cv2.resize(depth_vis, (W_full, H_full)) # Concatenate RGB and depth stitched = cv2.hconcat([rgb_full, depth_vis_resized]) # Write to video (convert RGB to BGR for OpenCV) stitched_writer.write(cv2.cvtColor(stitched, cv2.COLOR_RGB2BGR)) stitched_writer.release() # Merge audio from the input video into the stitched video temp_audio_path = stitched_video_path.replace('_RGBD.mp4', '_RGBD_audio.mp4') cmd = [ "ffmpeg", "-y", "-i", stitched_video_path, "-i", input_video, "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0?", "-shortest", temp_audio_path ] subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) os.replace(temp_audio_path, stitched_video_path) # Clean up gc.collect() torch.cuda.empty_cache() return [processed_video_path, depth_vis_path, stitched_video_path] def construct_demo(): with gr.Blocks(analytics_enabled=False) as demo: gr.Markdown(title) gr.Markdown(description) gr.Markdown("### If you find this work useful, please help ⭐ the [Github Repo](https://github.com/DepthAnything/Video-Depth-Anything). Thanks for your attention!") with gr.Row(equal_height=True): with gr.Column(scale=1): # Video input component for file upload. input_video = gr.Video(label="Input Video") with gr.Column(scale=2): with gr.Row(equal_height=True): processed_video = gr.Video(label="Preprocessed Video", interactive=False, autoplay=True, loop=True, show_share_button=True, scale=5) depth_vis_video = gr.Video(label="Generated Depth Video", interactive=False, autoplay=True, loop=True, show_share_button=True, scale=5) stitched_video = gr.Video(label="Stitched RGBD Video", interactive=False, autoplay=True, loop=True, show_share_button=True, scale=5) with gr.Row(equal_height=True): with gr.Column(scale=1): with gr.Accordion("Advanced Settings", open=False): max_len = gr.Slider(label="Max process length", minimum=-1, maximum=1000, value=-1, step=1) target_fps = gr.Slider(label="Target FPS", minimum=-1, maximum=30, value=-1, step=1) max_res = gr.Slider(label="Max side resolution", minimum=480, maximum=1920, value=1280, step=1) stitch_option = gr.Checkbox(label="Stitch RGB & Depth Videos", value=True) grayscale_option = gr.Checkbox(label="Output Depth as Grayscale", value=True) convert_from_color_option = gr.Checkbox(label="Convert Grayscale from Color", value=True) blur_slider = gr.Slider(minimum=0, maximum=1, step=0.01, label="Depth Blur (can reduce edge artifacts on display)", value=0.3) generate_btn = gr.Button("Generate") with gr.Column(scale=2): pass gr.Examples( examples=examples, inputs=[input_video, max_len, target_fps, max_res, stitch_option, grayscale_option, convert_from_color_option, blur_slider], outputs=[processed_video, depth_vis_video, stitched_video], fn=infer_video_depth, cache_examples=False, cache_mode="lazy", ) generate_btn.click( fn=infer_video_depth, inputs=[input_video, max_len, target_fps, max_res, stitch_option, grayscale_option, convert_from_color_option, blur_slider], outputs=[processed_video, depth_vis_video, stitched_video], ) return demo if __name__ == "__main__": demo = construct_demo() demo.queue(max_size=2).launch()