|
import torch |
|
from transformers import ( |
|
Qwen2VLForConditionalGeneration, |
|
AutoProcessor, |
|
AutoModelForCausalLM, |
|
AutoTokenizer |
|
) |
|
from qwen_vl_utils import process_vision_info |
|
from PIL import Image |
|
import cv2 |
|
import numpy as np |
|
import gradio as gr |
|
import spaces |
|
from huggingface_hub import login |
|
import os |
|
|
|
|
|
MAX_GPU_TIME_PER_REQUEST = 60 |
|
COOLDOWN_PERIOD = 300 |
|
|
|
|
|
def init_huggingface_auth(): |
|
|
|
token = os.getenv("HUGGINGFACE_TOKEN") |
|
if token: |
|
login(token=token) |
|
print("Successfully authenticated with Hugging Face") |
|
else: |
|
raise ValueError("HUGGINGFACE_TOKEN not found in environment variables") |
|
|
|
|
|
def load_models(): |
|
try: |
|
|
|
init_huggingface_auth() |
|
|
|
|
|
vision_model = Qwen2VLForConditionalGeneration.from_pretrained( |
|
"Qwen/Qwen2-VL-2B-Instruct", |
|
torch_dtype=torch.float16, |
|
device_map="auto", |
|
use_auth_token=True |
|
) |
|
vision_processor = AutoProcessor.from_pretrained( |
|
"Qwen/Qwen2-VL-2B-Instruct", |
|
use_auth_token=True |
|
) |
|
|
|
|
|
code_model = AutoModelForCausalLM.from_pretrained( |
|
"Qwen/Qwen2.5-Coder-1.5B-Instruct", |
|
torch_dtype=torch.float16, |
|
device_map="auto", |
|
use_auth_token=True |
|
) |
|
code_tokenizer = AutoTokenizer.from_pretrained( |
|
"Qwen/Qwen2.5-Coder-1.5B-Instruct", |
|
use_auth_token=True |
|
) |
|
|
|
|
|
torch.cuda.empty_cache() |
|
|
|
return vision_model, vision_processor, code_model, code_tokenizer |
|
except Exception as e: |
|
print(f"Error loading models: {str(e)}") |
|
raise |
|
|
|
vision_model, vision_processor, code_model, code_tokenizer = load_models() |
|
|
|
VISION_SYSTEM_PROMPT = """You are an OCR system specialized in extracting code from images and videos. Your task is to: |
|
1. Extract and output ONLY the exact code snippets visible in the image |
|
2. Maintain exact formatting, indentation, and whitespace |
|
3. Do not add any descriptions, analysis, or commentary |
|
4. If there are error messages or console outputs visible, include them exactly as shown |
|
Output Format: |
|
```[language] |
|
[extracted code here] |
|
If multiple code sections are visible, separate them with --- |
|
Note: In video, irrelevant frames may occur (e.g., other windows tabs, eterniq website, etc.) in video. Please focus on code-specific frames as we have to extract that content only. |
|
""" |
|
|
|
CODE_SYSTEM_PROMPT = """You are an expert code debugging assistant. You will receive: |
|
1. Original code (extracted by OCR) |
|
2. User's description of the issue |
|
3. Additional context if any |
|
Your task is to: |
|
1. Analyze the provided code considering the user's description |
|
2. Identify bugs and issues |
|
3. Provide a corrected version of the code |
|
4. Explain the specific fixes made |
|
Output Format: |
|
Fixed Code: |
|
[corrected code here] |
|
Original Code Issue: |
|
[Brief description of the issues based on user input and code analysis] |
|
Note: Please provide the output in a well-structured Markdown format. Remove all unnecessary information and exclude any additional code formatting such as triple backticks or language identifiers. The response should be ready to be rendered as Markdown content. |
|
""" |
|
def process_video_for_code(video_path, transcribed_text, max_frames=16, frame_interval=30): |
|
cap = cv2.VideoCapture(video_path) |
|
frames = [] |
|
frame_count = 0 |
|
|
|
while len(frames) < max_frames: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
if frame_count % frame_interval == 0: |
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
frame = Image.fromarray(frame) |
|
frames.append(frame) |
|
|
|
frame_count += 1 |
|
|
|
cap.release() |
|
|
|
if not frames: |
|
return "No frames could be extracted from the video.", "No code could be analyzed." |
|
|
|
|
|
vision_descriptions = [] |
|
for frame in frames: |
|
vision_description = process_image_for_vision(frame, transcribed_text) |
|
vision_descriptions.append(vision_description) |
|
|
|
|
|
combined_vision_description = "\n\n".join(vision_descriptions) |
|
|
|
|
|
fixed_code_response = process_for_code(combined_vision_description) |
|
|
|
return combined_vision_description, fixed_code_response |
|
|
|
def process_image_for_vision(image, transcribed_text): |
|
vision_messages = [ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "image", "image": image}, |
|
{"type": "text", "text": f"{VISION_SYSTEM_PROMPT}\n\nDescribe the code and any errors you see in this image. User's description: {transcribed_text}"}, |
|
], |
|
} |
|
] |
|
|
|
vision_text = vision_processor.apply_chat_template( |
|
vision_messages, |
|
tokenize=False, |
|
add_generation_prompt=True |
|
) |
|
image_inputs, video_inputs = process_vision_info(vision_messages) |
|
|
|
vision_inputs = vision_processor( |
|
text=[vision_text], |
|
images=image_inputs, |
|
videos=video_inputs, |
|
padding=True, |
|
return_tensors="pt", |
|
).to(vision_model.device) |
|
|
|
with torch.no_grad(): |
|
vision_output_ids = vision_model.generate(**vision_inputs, max_new_tokens=512) |
|
vision_output_trimmed = [ |
|
out_ids[len(in_ids):] for in_ids, out_ids in zip(vision_inputs.input_ids, vision_output_ids) |
|
] |
|
return vision_processor.batch_decode( |
|
vision_output_trimmed, |
|
skip_special_tokens=True, |
|
clean_up_tokenization_spaces=False |
|
)[0] |
|
|
|
def process_for_code(vision_description): |
|
code_messages = [ |
|
{"role": "system", "content": CODE_SYSTEM_PROMPT}, |
|
{"role": "user", "content": f"Here's a description of code with errors:\n\n{vision_description}\n\nPlease analyze and fix the code."} |
|
] |
|
|
|
code_text = code_tokenizer.apply_chat_template( |
|
code_messages, |
|
tokenize=False, |
|
add_generation_prompt=True |
|
) |
|
|
|
code_inputs = code_tokenizer([code_text], return_tensors="pt").to(code_model.device) |
|
|
|
with torch.no_grad(): |
|
code_output_ids = code_model.generate( |
|
**code_inputs, |
|
max_new_tokens=1024, |
|
temperature=0.7, |
|
top_p=0.95, |
|
) |
|
|
|
code_output_trimmed = [ |
|
out_ids[len(in_ids):] for in_ids, out_ids in zip(code_inputs.input_ids, code_output_ids) |
|
] |
|
return code_tokenizer.batch_decode( |
|
code_output_trimmed, |
|
skip_special_tokens=True |
|
)[0] |
|
|
|
@spaces.GPU |
|
def process_content(video, transcribed_text): |
|
try: |
|
if video is None: |
|
return "Please upload a video file of code with errors.", "" |
|
|
|
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
if torch.cuda.is_available(): |
|
available_memory = torch.cuda.get_device_properties(0).total_memory |
|
if available_memory < 1e9: |
|
raise RuntimeError("Insufficient GPU memory available") |
|
|
|
vision_output, code_output = process_video_for_code( |
|
video.name, |
|
transcribed_text, |
|
max_frames=8 |
|
) |
|
|
|
return vision_output, code_output |
|
|
|
except spaces.zero.gradio.HTMLError as e: |
|
if "exceeded your GPU quota" in str(e): |
|
return ( |
|
"GPU quota exceeded. Please try again later or consider upgrading to a paid plan.", |
|
"" |
|
) |
|
except Exception as e: |
|
return f"Error processing content: {str(e)}", "" |
|
finally: |
|
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
iface = gr.Interface( |
|
fn=process_content, |
|
inputs=[ |
|
gr.File(label="Upload Video of Code with Errors"), |
|
gr.Textbox(label="Transcribed Audio") |
|
], |
|
outputs=[ |
|
gr.Textbox(label="Vision Model Output (Code Description)"), |
|
gr.Code(label="Fixed Code", language="python") |
|
], |
|
title="Vision Code Debugger", |
|
description="Upload a video of code with errors and provide transcribed audio, and the AI will analyze and fix the issues.", |
|
allow_flagging="never", |
|
cache_examples=True |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch(show_error=True) |