|
import torch |
|
from transformers import ( |
|
Qwen2VLForConditionalGeneration, |
|
AutoProcessor, |
|
AutoModelForCausalLM, |
|
AutoTokenizer |
|
) |
|
from qwen_vl_utils import process_vision_info |
|
from PIL import Image |
|
import cv2 |
|
import numpy as np |
|
import gradio as gr |
|
import spaces |
|
from huggingface_hub import login |
|
import os |
|
|
|
|
|
MAX_GPU_TIME_PER_REQUEST = 59 |
|
COOLDOWN_PERIOD = 300 |
|
|
|
|
|
def init_huggingface_auth(): |
|
|
|
token = os.getenv("HUGGINGFACE_TOKEN") |
|
if token: |
|
login(token=token) |
|
print("Successfully authenticated with Hugging Face") |
|
else: |
|
raise ValueError("HUGGINGFACE_TOKEN not found in environment variables") |
|
|
|
|
|
def load_models(): |
|
try: |
|
|
|
init_huggingface_auth() |
|
|
|
|
|
vision_model = Qwen2VLForConditionalGeneration.from_pretrained( |
|
"Qwen/Qwen2-VL-2B-Instruct", |
|
torch_dtype=torch.float16, |
|
device_map="auto", |
|
use_auth_token=True |
|
) |
|
vision_processor = AutoProcessor.from_pretrained( |
|
"Qwen/Qwen2-VL-2B-Instruct", |
|
use_auth_token=True |
|
) |
|
|
|
|
|
code_model = AutoModelForCausalLM.from_pretrained( |
|
"Qwen/Qwen2.5-Coder-1.5B-Instruct", |
|
torch_dtype=torch.float16, |
|
device_map="auto", |
|
use_auth_token=True |
|
) |
|
code_tokenizer = AutoTokenizer.from_pretrained( |
|
"Qwen/Qwen2.5-Coder-1.5B-Instruct", |
|
use_auth_token=True |
|
) |
|
|
|
|
|
torch.cuda.empty_cache() |
|
|
|
return vision_model, vision_processor, code_model, code_tokenizer |
|
except Exception as e: |
|
print(f"Error loading models: {str(e)}") |
|
raise |
|
|
|
vision_model, vision_processor, code_model, code_tokenizer = load_models() |
|
|
|
VISION_SYSTEM_PROMPT = """Extract code from images/videos: |
|
1. Output exact code snippets only |
|
2. Keep original formatting/indentation |
|
focus on code-relevant frames only |
|
[code] |
|
If multiple code sections are visible, separate them with --- |
|
Note: In video, irrelevant frames may occur (e.g., other windows tabs, eterniq website, etc.) in video. Please focus on code-specific frames as we have to extract that content only. |
|
""" |
|
|
|
CODE_SYSTEM_PROMPT = """Debug code as an expert: |
|
- Analyze OCR-extracted code + user's issue |
|
- Find bugs/issues |
|
- Provide fixes |
|
- Explain corrections |
|
|
|
Output: |
|
Fixed Code: |
|
[corrected code] |
|
|
|
Original Issue: |
|
[brief analysis] |
|
Note: Please provide the output in a well-structured Markdown format. Remove all unnecessary information and exclude any additional code formatting such as triple backticks or language identifiers. |
|
""" |
|
def process_video_for_code(video_path, transcribed_text, max_frames=16, frame_interval=30): |
|
cap = cv2.VideoCapture(video_path) |
|
frames = [] |
|
frame_count = 0 |
|
|
|
while len(frames) < max_frames: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
if frame_count % frame_interval == 0: |
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
frame = Image.fromarray(frame) |
|
frames.append(frame) |
|
|
|
frame_count += 1 |
|
|
|
cap.release() |
|
|
|
if not frames: |
|
return "No frames could be extracted from the video.", "No code could be analyzed." |
|
|
|
|
|
vision_descriptions = [] |
|
for frame in frames: |
|
vision_description = process_image_for_vision(frame, transcribed_text) |
|
vision_descriptions.append(vision_description) |
|
|
|
|
|
combined_vision_description = "\n\n".join(vision_descriptions) |
|
|
|
|
|
fixed_code_response = process_for_code(combined_vision_description) |
|
|
|
return combined_vision_description, fixed_code_response |
|
|
|
def process_image_for_vision(image, transcribed_text): |
|
vision_messages = [ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "image", "image": image}, |
|
{"type": "text", "text": f"{VISION_SYSTEM_PROMPT}\n\nDescribe the code and any errors you see in this image. User's description: {transcribed_text}"}, |
|
], |
|
} |
|
] |
|
|
|
vision_text = vision_processor.apply_chat_template( |
|
vision_messages, |
|
tokenize=False, |
|
add_generation_prompt=True |
|
) |
|
image_inputs, video_inputs = process_vision_info(vision_messages) |
|
|
|
vision_inputs = vision_processor( |
|
text=[vision_text], |
|
images=image_inputs, |
|
videos=video_inputs, |
|
padding=True, |
|
return_tensors="pt", |
|
).to(vision_model.device) |
|
|
|
with torch.no_grad(): |
|
vision_output_ids = vision_model.generate(**vision_inputs, max_new_tokens=512) |
|
vision_output_trimmed = [ |
|
out_ids[len(in_ids):] for in_ids, out_ids in zip(vision_inputs.input_ids, vision_output_ids) |
|
] |
|
return vision_processor.batch_decode( |
|
vision_output_trimmed, |
|
skip_special_tokens=True, |
|
clean_up_tokenization_spaces=False |
|
)[0] |
|
|
|
def process_for_code(vision_description): |
|
code_messages = [ |
|
{"role": "system", "content": CODE_SYSTEM_PROMPT}, |
|
{"role": "user", "content": f"Here's a description of code with errors:\n\n{vision_description}\n\nPlease analyze and fix the code."} |
|
] |
|
|
|
code_text = code_tokenizer.apply_chat_template( |
|
code_messages, |
|
tokenize=False, |
|
add_generation_prompt=True |
|
) |
|
|
|
code_inputs = code_tokenizer([code_text], return_tensors="pt").to(code_model.device) |
|
|
|
with torch.no_grad(): |
|
code_output_ids = code_model.generate( |
|
**code_inputs, |
|
max_new_tokens=1024, |
|
temperature=0.7, |
|
top_p=0.95, |
|
) |
|
|
|
code_output_trimmed = [ |
|
out_ids[len(in_ids):] for in_ids, out_ids in zip(code_inputs.input_ids, code_output_ids) |
|
] |
|
return code_tokenizer.batch_decode( |
|
code_output_trimmed, |
|
skip_special_tokens=True |
|
)[0] |
|
|
|
@spaces.GPU |
|
def process_content(video, transcribed_text): |
|
try: |
|
if video is None: |
|
return "Please upload a video file of code with errors.", "" |
|
|
|
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
if torch.cuda.is_available(): |
|
available_memory = torch.cuda.get_device_properties(0).total_memory |
|
if available_memory < 1e9: |
|
raise RuntimeError("Insufficient GPU memory available") |
|
|
|
vision_output, code_output = process_video_for_code( |
|
video.name, |
|
transcribed_text, |
|
max_frames=8 |
|
) |
|
|
|
return vision_output, code_output |
|
|
|
except spaces.zero.gradio.HTMLError as e: |
|
if "exceeded your GPU quota" in str(e): |
|
return ( |
|
"GPU quota exceeded. Please try again later or consider upgrading to a paid plan.", |
|
"" |
|
) |
|
except Exception as e: |
|
return f"Error processing content: {str(e)}", "" |
|
finally: |
|
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
iface = gr.Interface( |
|
fn=process_content, |
|
inputs=[ |
|
gr.File(label="Upload Video of Code with Errors"), |
|
gr.Textbox(label="Transcribed Audio") |
|
], |
|
outputs=[ |
|
gr.Textbox(label="Vision Model Output (Code Description)"), |
|
gr.Code(label="Fixed Code", language="python") |
|
], |
|
title="Vision Code Debugger", |
|
description="Upload a video of code with errors and provide transcribed audio, and the AI will analyze and fix the issues.", |
|
allow_flagging="never", |
|
cache_examples=True |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch(show_error=True) |