Spaces:
Running
Running
import os | |
import json | |
import gradio as gr | |
import torch | |
import spaces | |
import tempfile | |
from pathlib import Path | |
import subprocess | |
import logging | |
import xml.etree.ElementTree as ET | |
from xml.dom import minidom | |
from transformers import AutoProcessor, AutoModelForImageTextToText | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def load_examples(json_path: str) -> dict: | |
with open(json_path, 'r') as f: | |
return json.load(f) | |
def format_duration(seconds: float) -> str: | |
hours = int(seconds // 3600) | |
minutes = int((seconds % 3600) // 60) | |
secs = int(seconds % 60) | |
return f"{hours:02d}:{minutes:02d}:{secs:02d}" | |
def get_video_duration_seconds(video_path: str) -> float: | |
cmd = [ | |
"ffprobe", | |
"-v", "quiet", | |
"-print_format", "json", | |
"-show_format", | |
video_path | |
] | |
result = subprocess.run(cmd, capture_output=True, text=True) | |
info = json.loads(result.stdout) | |
return float(info["format"]["duration"]) | |
class VideoHighlightDetector: | |
def __init__( | |
self, | |
model_path: str, | |
device: str = "cuda", | |
batch_size: int = 8 | |
): | |
self.device = device | |
self.batch_size = batch_size | |
# Initialize model and processor | |
self.processor = AutoProcessor.from_pretrained(model_path) | |
self.model = AutoModelForImageTextToText.from_pretrained( | |
model_path, | |
torch_dtype=torch.bfloat16 | |
).to(device) | |
def analyze_video_content(self, video_path: str) -> str: | |
system_message = "You are a helpful assistant that can understand videos. Describe what type of video this is and what's happening in it." | |
messages = [ | |
{ | |
"role": "system", | |
"content": [{"type": "text", "text": system_message}] | |
}, | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "video", "path": video_path}, | |
{"type": "text", "text": "What type of video is this and what's happening in it? Be specific about the content type and general activities you observe."} | |
] | |
} | |
] | |
inputs = self.processor.apply_chat_template( | |
messages, | |
add_generation_prompt=True, | |
tokenize=True, | |
return_dict=True, | |
return_tensors="pt" | |
).to(self.device) | |
outputs = self.model.generate(**inputs, max_new_tokens=512, do_sample=True, temperature=0.7) | |
return self.processor.decode(outputs[0], skip_special_tokens=True).lower().split("assistant: ")[1] | |
def analyze_segment(self, video_path: str) -> str: | |
"""Analyze a specific video segment and provide a brief description.""" | |
messages = [ | |
{ | |
"role": "system", | |
"content": [{"type": "text", "text": "Focus only on describing the key dramatic action or notable event occurring in this video segment. Skip general context or scene-setting details unless they are crucial to understanding the main action."}] | |
}, | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "video", "path": video_path}, | |
{"type": "text", "text": "WWhat is the main action or notable event happening in this segment? Describe it in one brief sentence."} | |
] | |
} | |
] | |
inputs = self.processor.apply_chat_template( | |
messages, | |
add_generation_prompt=True, | |
tokenize=True, | |
return_dict=True, | |
return_tensors="pt" | |
).to(self.device) | |
outputs = self.model.generate(**inputs, max_new_tokens=128, do_sample=True, temperature=0.7) | |
return self.processor.decode(outputs[0], skip_special_tokens=True).split("Assistant: ")[1] | |
def determine_highlights(self, video_description: str, prompt_num: int = 1) -> str: | |
"""Determine what constitutes highlights based on video description with different prompts.""" | |
system_prompts = { | |
1: "You are a highlight editor. List archetypal dramatic moments that would make compelling highlights if they appear in the video. Each moment should be specific enough to be recognizable but generic enough to potentially exist in other videos of this type.", | |
2: "You are a helpful visual-language assistant that can understand videos and edit. You are tasked helping the user to create highlight reels for videos. Highlights should be rare and important events in the video in question." | |
} | |
user_prompts = { | |
1: "List potential highlight moments to look for in this video:", | |
2: "List dramatic moments that would make compelling highlights if they appear in the video. Each moment should be specific enough to be recognizable but generic enough to potentially exist in any video of this type:" | |
} | |
messages = [ | |
{ | |
"role": "system", | |
"content": [{"type": "text", "text": system_prompts[prompt_num]}] | |
}, | |
{ | |
"role": "user", | |
"content": [{"type": "text", "text": f"""Here is a description of a video:\n\n{video_description}\n\n{user_prompts[prompt_num]}"""}] | |
} | |
] | |
print(f"Using prompt {prompt_num} for highlight detection") | |
print(messages) | |
inputs = self.processor.apply_chat_template( | |
messages, | |
add_generation_prompt=True, | |
tokenize=True, | |
return_dict=True, | |
return_tensors="pt" | |
).to(self.device) | |
outputs = self.model.generate(**inputs, max_new_tokens=256, do_sample=True, temperature=0.7) | |
return self.processor.decode(outputs[0], skip_special_tokens=True).split("Assistant: ")[1] | |
def process_segment(self, video_path: str, highlight_types: str) -> bool: | |
messages = [ | |
{ | |
"role": "system", | |
"content": [{"type": "text", "text": "You are a video highlight analyzer. Your role is to identify moments that have high dramatic value, focusing on displays of skill, emotion, personality, or tension. Compare video segments against provided example highlights to find moments with similar emotional impact and visual interest, even if the specific actions differ."}] | |
}, | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "video", "path": video_path}, | |
{"type": "text", "text": f"""Given these highlight examples:\n{highlight_types}\n\nDoes this video contain a moment that matches the core action of one of the highlights? Answer with:\n'yes' or 'no'\nIf yes, justify it"""}] | |
} | |
] | |
inputs = self.processor.apply_chat_template( | |
messages, | |
add_generation_prompt=True, | |
tokenize=True, | |
return_dict=True, | |
return_tensors="pt" | |
).to(self.device) | |
outputs = self.model.generate(**inputs, max_new_tokens=64, do_sample=False) | |
response = self.processor.decode(outputs[0], skip_special_tokens=True).lower().split("assistant: ")[1] | |
return "yes" in response | |
def create_xspf_playlist(video_path: str, segments: list, descriptions: list) -> str: | |
"""Create XSPF playlist from segments with descriptions.""" | |
# Get video filename with full path | |
video_filename = os.path.basename(video_path) | |
# Create the XML structure as a string | |
xml_content = [ | |
'<?xml version="1.0" encoding="UTF-8"?>', | |
'<playlist version="1" xmlns="http://xspf.org/ns/0/" xmlns:vlc="http://www.videolan.org/vlc/playlist/0/">', | |
f' <title>{video_filename} - Highlights</title>', | |
' <trackList>' | |
] | |
for idx, ((start_time, end_time), description) in enumerate(zip(segments, descriptions)): | |
track = [ | |
' <track>', | |
f' <location>file:///{video_filename}</location>', | |
f' <title>{description}</title>', | |
f' <annotation>{description}</annotation>', | |
' <extension application="http://www.videolan.org/vlc/playlist/0">', | |
f' <vlc:id>{idx}</vlc:id>', | |
f' <vlc:option>start-time={int(start_time)}</vlc:option>', | |
f' <vlc:option>stop-time={int(end_time)}</vlc:option>', | |
' </extension>', | |
' </track>' | |
] | |
xml_content.extend(track) | |
xml_content.extend([ | |
' </trackList>', | |
'</playlist>' | |
]) | |
return '\n'.join(xml_content) | |
def create_ui(examples_path: str, model_path: str): | |
examples_data = load_examples(examples_path) | |
with gr.Blocks() as app: | |
# gr.Markdown("# VLC Highlight Generator") | |
# gr.Markdown("Upload a video and get a list of highlights!") | |
with gr.Row(): | |
gr.HTML(""" | |
<div style="display: flex; align-items: center; gap: 10px;"> | |
<img src="https://upload.wikimedia.org/wikipedia/commons/3/38/VLC_icon.png" style="width: 40px; height: 40px;"/> | |
<h1 style="margin: 0;">VLC Highlight Generator</h1> | |
</div> | |
""") | |
gr.Markdown("Upload a video and get a list of highlights!") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
input_video = gr.Video( | |
label="Upload your video (max 30 minutes)", | |
interactive=True | |
) | |
process_btn = gr.Button("Process Video", variant="primary") | |
with gr.Column(scale=1): | |
output_playlist = gr.File( | |
label="Highlight Playlist (XSPF)", | |
visible=False, | |
interactive=False, | |
) | |
status = gr.Markdown() | |
analysis_accordion = gr.Accordion( | |
"Analysis Details", | |
open=True, | |
visible=False | |
) | |
with analysis_accordion: | |
video_description = gr.Markdown("") | |
highlight_types = gr.Markdown("") | |
def on_process(video): | |
if not video: | |
return [ | |
None, | |
"Please upload a video", | |
"", | |
"", | |
gr.update(visible=False) | |
] | |
try: | |
duration = get_video_duration_seconds(video) | |
if duration > 1800: # 30 minutes | |
return [ | |
None, | |
"Video must be shorter than 30 minutes", | |
"", | |
"", | |
gr.update(visible=False) | |
] | |
yield [ | |
None, | |
"Initializing video highlight detector...", | |
"", | |
"", | |
gr.update(visible=False) | |
] | |
detector = VideoHighlightDetector(model_path=model_path, batch_size=16) | |
yield [ | |
None, | |
"Analyzing video content...", | |
"", | |
"", | |
gr.update(visible=False) | |
] | |
# Analyze video content | |
video_desc = detector.analyze_video_content(video) | |
formatted_desc = f"### Video Summary:\n{video_desc}" | |
yield [ | |
None, | |
"Determining highlight types...", | |
formatted_desc, | |
"", | |
gr.update(visible=True) | |
] | |
highlights1 = detector.determine_highlights(video_desc, prompt_num=1) | |
highlights2 = detector.determine_highlights(video_desc, prompt_num=2) | |
formatted_highlights = f"### Highlight Criteria:\nSet 1:\n{highlights1}\n\nSet 2:\n{highlights2}" | |
# Process video in segments | |
segment_length = 10.0 | |
kept_segments1 = [] | |
kept_segments2 = [] | |
segment_descriptions1 = [] | |
segment_descriptions2 = [] | |
segments_processed = 0 | |
total_segments = int(duration / segment_length) | |
for start_time in range(0, int(duration), int(segment_length)): | |
end_time = min(start_time + segment_length, duration) | |
progress = int((segments_processed / total_segments) * 100) | |
yield [ | |
None, | |
f"Processing segments... {progress}% complete", | |
formatted_desc, | |
formatted_highlights, | |
gr.update(visible=True) | |
] | |
# Create temporary segment | |
with tempfile.NamedTemporaryFile(suffix='.mp4') as temp_segment: | |
cmd = [ | |
"ffmpeg", | |
"-y", | |
"-i", video, | |
"-ss", str(start_time), | |
"-t", str(segment_length), | |
"-c:v", "libx264", | |
"-preset", "ultrafast", | |
temp_segment.name | |
] | |
subprocess.run(cmd, check=True) | |
# Process with both highlight sets | |
if detector.process_segment(temp_segment.name, highlights1): | |
description = detector.analyze_segment(temp_segment.name) | |
kept_segments1.append((start_time, end_time)) | |
segment_descriptions1.append(description) | |
if detector.process_segment(temp_segment.name, highlights2): | |
description = detector.analyze_segment(temp_segment.name) | |
kept_segments2.append((start_time, end_time)) | |
segment_descriptions2.append(description) | |
segments_processed += 1 | |
# Calculate percentages of video kept for each highlight set | |
total_duration = duration | |
duration1 = sum(end - start for start, end in kept_segments1) | |
duration2 = sum(end - start for start, end in kept_segments2) | |
percent1 = (duration1 / total_duration) * 100 | |
percent2 = (duration2 / total_duration) * 100 | |
print(f"Highlight set 1: {percent1:.1f}% of video") | |
print(f"Highlight set 2: {percent2:.1f}% of video") | |
# Choose the set with lower percentage unless it's zero | |
if (0 < percent2 <= percent1 or percent1 == 0): | |
final_segments = kept_segments2 | |
segment_descriptions = segment_descriptions2 | |
selected_set = "2" | |
percent_used = percent2 | |
else: | |
final_segments = kept_segments1 | |
segment_descriptions = segment_descriptions1 | |
selected_set = "1" | |
percent_used = percent1 | |
if final_segments: | |
# Create XSPF playlist | |
playlist_content = create_xspf_playlist(video, final_segments, segment_descriptions) | |
# Save playlist to temporary file | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.xspf', delete=False) as f: | |
f.write(playlist_content) | |
playlist_path = f.name | |
completion_message = f"Processing complete! Using highlight set {selected_set} ({percent_used:.1f}% of video). You can download the playlist." | |
yield [ | |
gr.update(value=playlist_path, visible=True), | |
completion_message, | |
formatted_desc, | |
formatted_highlights, | |
gr.update(visible=True) | |
] | |
else: | |
yield [ | |
None, | |
"No highlights detected in the video.", | |
formatted_desc, | |
formatted_highlights, | |
gr.update(visible=True) | |
] | |
except Exception as e: | |
logger.exception("Error processing video") | |
return [ | |
None, | |
f"Error processing video: {str(e)}", | |
"", | |
"", | |
gr.update(visible=False) | |
] | |
finally: | |
torch.cuda.empty_cache() | |
process_btn.click( | |
on_process, | |
inputs=[input_video], | |
outputs=[ | |
output_playlist, | |
status, | |
video_description, | |
highlight_types, | |
analysis_accordion | |
], | |
queue=True, | |
) | |
return app | |
if __name__ == "__main__": | |
app = create_ui("video_spec.json", "HuggingFaceTB/SmolVLM2-2.2B-Instruct") | |
app.launch() |