Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import os | |
from pathlib import Path | |
from PIL import Image | |
import shutil | |
from ultralytics import YOLO | |
import requests | |
import zipfile | |
import uuid | |
# Constants | |
MODELS_DIR = "models" | |
MODELS_INFO_FILE = "models_info.json" | |
TEMP_DIR = "temp" | |
OUTPUT_DIR = "outputs" | |
ZIP_DIR = "zips" | |
def download_file(url, dest_path): | |
""" | |
Download a file from a URL to the destination path. | |
Args: | |
url (str): The URL to download from. | |
dest_path (str): The local path to save the file. | |
Returns: | |
bool: True if download succeeded, False otherwise. | |
""" | |
try: | |
response = requests.get(url, stream=True) | |
response.raise_for_status() # Raise an error on bad status | |
with open(dest_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
print(f"Downloaded {url} to {dest_path}.") | |
return True | |
except Exception as e: | |
print(f"Failed to download {url}. Error: {e}") | |
return False | |
def load_models(models_dir=MODELS_DIR, info_file=MODELS_INFO_FILE): | |
""" | |
Load YOLO models and their information from the specified directory and JSON file. | |
Downloads models if they are not already present. | |
Args: | |
models_dir (str): Path to the models directory. | |
info_file (str): Path to the JSON file containing model info. | |
Returns: | |
dict: A dictionary of models and their associated information. | |
""" | |
with open(info_file, 'r') as f: | |
models_info = json.load(f) | |
models = {} | |
for model_info in models_info: | |
model_name = model_info['model_name'] | |
display_name = model_info.get('display_name', model_name) | |
model_dir = os.path.join(models_dir, model_name) | |
os.makedirs(model_dir, exist_ok=True) | |
model_path = os.path.join(model_dir, f"{model_name}.pt") # e.g., models/human/human.pt | |
download_url = model_info['download_url'] | |
# Check if the model file exists | |
if not os.path.isfile(model_path): | |
print(f"Model '{display_name}' not found locally. Downloading from {download_url}...") | |
success = download_file(download_url, model_path) | |
if not success: | |
print(f"Skipping model '{display_name}' due to download failure.") | |
continue # Skip loading this model | |
try: | |
# Load the YOLO model | |
model = YOLO(model_path) | |
models[model_name] = { | |
'display_name': display_name, | |
'model': model, | |
'info': model_info | |
} | |
print(f"Loaded model '{display_name}' from '{model_path}'.") | |
except Exception as e: | |
print(f"Error loading model '{display_name}': {e}") | |
return models | |
def get_model_info(model_info): | |
""" | |
Retrieve formatted model information for display. | |
Args: | |
model_info (dict): The model's information dictionary. | |
Returns: | |
str: A formatted string containing model details. | |
""" | |
info = model_info | |
class_ids = info.get('class_ids', {}) | |
class_image_counts = info.get('class_image_counts', {}) | |
datasets_used = info.get('datasets_used', []) | |
class_ids_formatted = "\n".join([f"{cid}: {cname}" for cid, cname in class_ids.items()]) | |
class_image_counts_formatted = "\n".join([f"{cname}: {count}" for cname, count in class_image_counts.items()]) | |
datasets_used_formatted = "\n".join([f"- {dataset}" for dataset in datasets_used]) | |
info_text = ( | |
f"**{info.get('display_name', 'Model Name')}**\n\n" | |
f"**Architecture:** {info.get('architecture', 'N/A')}\n\n" | |
f"**Training Epochs:** {info.get('training_epochs', 'N/A')}\n\n" | |
f"**Batch Size:** {info.get('batch_size', 'N/A')}\n\n" | |
f"**Optimizer:** {info.get('optimizer', 'N/A')}\n\n" | |
f"**Learning Rate:** {info.get('learning_rate', 'N/A')}\n\n" | |
f"**Data Augmentation Level:** {info.get('data_augmentation_level', 'N/A')}\n\n" | |
f"**[email protected]:** {info.get('mAP_score', 'N/A')}\n\n" | |
f"**Number of Images Trained On:** {info.get('num_images', 'N/A')}\n\n" | |
f"**Class IDs:**\n{class_ids_formatted}\n\n" | |
f"**Datasets Used:**\n{datasets_used_formatted}\n\n" | |
f"**Class Image Counts:**\n{class_image_counts_formatted}" | |
) | |
return info_text | |
def zip_processed_images(processed_image_paths, model_name): | |
""" | |
Create a ZIP file containing all processed images. | |
Args: | |
processed_image_paths (list): List of file paths to processed images. | |
model_name (str): Name of the model used for processing. | |
Returns: | |
str: Path to the created ZIP file. | |
""" | |
os.makedirs(ZIP_DIR, exist_ok=True) | |
zip_filename = f"{model_name}_processed_images_{uuid.uuid4().hex}.zip" | |
zip_path = os.path.join(ZIP_DIR, zip_filename) | |
with zipfile.ZipFile(zip_path, 'w') as zipf: | |
for img_path in processed_image_paths: | |
arcname = os.path.basename(img_path) | |
zipf.write(img_path, arcname) | |
print(f"Created ZIP file at {zip_path}.") | |
return zip_path | |
def predict_image(model_name, images, confidence, models): | |
""" | |
Perform prediction on uploaded images using the selected YOLO model. | |
Args: | |
model_name (str): The name of the selected model. | |
images (list): List of uploaded PIL.Image.Image objects. | |
confidence (float): The confidence threshold for detections. | |
models (dict): The dictionary containing models and their info. | |
Returns: | |
tuple: A status message, list of processed images, and a ZIP file for download. | |
""" | |
model_entry = models.get(model_name, {}) | |
model = model_entry.get('model', None) | |
if not model: | |
return "Error: Model not found.", None, None | |
try: | |
# Ensure temporary and output directories exist | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
processed_image_paths = [] | |
processed_images = [] | |
for idx, image in enumerate(images): | |
# Generate unique filenames to avoid conflicts | |
unique_id = uuid.uuid4().hex | |
input_image_path = os.path.join(TEMP_DIR, f"{model_name}_input_image_{unique_id}.jpg") | |
output_image_path = os.path.join(OUTPUT_DIR, f"{model_name}_output_image_{unique_id}.jpg") | |
# Save the uploaded image to a temporary path | |
image.save(input_image_path) | |
# Perform prediction with user-specified confidence | |
results = model(input_image_path, save=True, save_txt=False, conf=confidence) | |
# Determine the output path | |
# Ultralytics YOLO saves the results in 'runs/detect/predict' by default | |
latest_run = sorted(Path("runs/detect").glob("predict*"), key=os.path.getmtime)[-1] | |
detected_image_path = os.path.join(latest_run, Path(input_image_path).name) | |
if not os.path.isfile(detected_image_path): | |
# Alternative method to get the output path | |
detected_image_path = results[0].save()[0] | |
# Copy the output image to OUTPUT_DIR with a unique name | |
shutil.copy(detected_image_path, output_image_path) | |
processed_image_paths.append(output_image_path) | |
# Open the processed image for display | |
processed_image = Image.open(output_image_path) | |
processed_images.append(processed_image) | |
# Create a ZIP file containing all processed images | |
zip_path = zip_processed_images(processed_image_paths, model_name) | |
return "β Prediction completed successfully.", processed_images, zip_path | |
except Exception as e: | |
return f"β Error during prediction: {str(e)}", None, None | |
def main(): | |
# Load the models and their information | |
models = load_models() | |
if not models: | |
print("No models loaded. Please check your models_info.json and model URLs.") | |
return | |
# Initialize Gradio Blocks interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# π§ͺ YOLOv11 Model Tester") | |
gr.Markdown( | |
""" | |
Upload one or multiple images to test different YOLOv11 models. Select a model from the dropdown to see its details. | |
""" | |
) | |
# Model selection and info | |
with gr.Row(): | |
model_dropdown = gr.Dropdown( | |
choices=[models[m]['display_name'] for m in models], | |
label="Select Model", | |
value=None | |
) | |
model_info = gr.Markdown("**Model Information will appear here.**") | |
# Mapping from display_name to model_name | |
display_to_name = {models[m]['display_name']: m for m in models} | |
# Update model_info when a model is selected | |
def update_model_info(selected_display_name): | |
if not selected_display_name: | |
return "Please select a model." | |
model_name = display_to_name.get(selected_display_name) | |
if not model_name: | |
return "Model information not available." | |
model_entry = models[model_name]['info'] | |
return get_model_info(model_entry) | |
model_dropdown.change( | |
fn=update_model_info, | |
inputs=model_dropdown, | |
outputs=model_info | |
) | |
# Confidence Threshold Slider | |
with gr.Row(): | |
confidence_slider = gr.Slider( | |
minimum=0.0, | |
maximum=1.0, | |
step=0.01, | |
value=0.25, | |
label="Confidence Threshold", | |
info="Adjust the minimum confidence required for detections to be displayed." | |
) | |
# Image Prediction Tab (now supporting multiple images) | |
with gr.Tab("πΌοΈ Image"): | |
with gr.Column(): | |
image_input = gr.Images( | |
label="Upload Images for Prediction", | |
type='pil' | |
) | |
image_predict_btn = gr.Button("π Predict on Images") | |
image_status = gr.Markdown("**Status will appear here.**") | |
image_gallery = gr.Gallery(label="Predicted Images").style(grid=[2], height="auto") | |
image_download_btn = gr.File(label="β¬οΈ Download All Processed Images (ZIP)") | |
# Define the image prediction function | |
def process_image(selected_display_name, images, confidence): | |
if not selected_display_name: | |
return "β Please select a model.", None, None | |
if not images: | |
return "β Please upload at least one image.", None, None | |
model_name = display_to_name.get(selected_display_name) | |
return predict_image(model_name, images, confidence, models) | |
# Connect the predict button | |
image_predict_btn.click( | |
fn=process_image, | |
inputs=[model_dropdown, image_input, confidence_slider], | |
outputs=[image_status, image_gallery, image_download_btn] | |
) | |
gr.Markdown( | |
""" | |
--- | |
**Note:** Models are downloaded from GitHub upon first use. Ensure that you have a stable internet connection and sufficient storage space. | |
""" | |
) | |
# Launch the Gradio app | |
demo.launch() | |
if __name__ == "__main__": | |
main() | |