rolo-models / app.py
wuhp's picture
Update app.py
3414d75 verified
raw
history blame
11.5 kB
import gradio as gr
import json
import os
from pathlib import Path
from PIL import Image
import shutil
from ultralytics import YOLO
import requests
import zipfile
import uuid
# Constants
MODELS_DIR = "models"
MODELS_INFO_FILE = "models_info.json"
TEMP_DIR = "temp"
OUTPUT_DIR = "outputs"
ZIP_DIR = "zips"
def download_file(url, dest_path):
"""
Download a file from a URL to the destination path.
Args:
url (str): The URL to download from.
dest_path (str): The local path to save the file.
Returns:
bool: True if download succeeded, False otherwise.
"""
try:
response = requests.get(url, stream=True)
response.raise_for_status() # Raise an error on bad status
with open(dest_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Downloaded {url} to {dest_path}.")
return True
except Exception as e:
print(f"Failed to download {url}. Error: {e}")
return False
def load_models(models_dir=MODELS_DIR, info_file=MODELS_INFO_FILE):
"""
Load YOLO models and their information from the specified directory and JSON file.
Downloads models if they are not already present.
Args:
models_dir (str): Path to the models directory.
info_file (str): Path to the JSON file containing model info.
Returns:
dict: A dictionary of models and their associated information.
"""
with open(info_file, 'r') as f:
models_info = json.load(f)
models = {}
for model_info in models_info:
model_name = model_info['model_name']
display_name = model_info.get('display_name', model_name)
model_dir = os.path.join(models_dir, model_name)
os.makedirs(model_dir, exist_ok=True)
model_path = os.path.join(model_dir, f"{model_name}.pt") # e.g., models/human/human.pt
download_url = model_info['download_url']
# Check if the model file exists
if not os.path.isfile(model_path):
print(f"Model '{display_name}' not found locally. Downloading from {download_url}...")
success = download_file(download_url, model_path)
if not success:
print(f"Skipping model '{display_name}' due to download failure.")
continue # Skip loading this model
try:
# Load the YOLO model
model = YOLO(model_path)
models[model_name] = {
'display_name': display_name,
'model': model,
'info': model_info
}
print(f"Loaded model '{display_name}' from '{model_path}'.")
except Exception as e:
print(f"Error loading model '{display_name}': {e}")
return models
def get_model_info(model_info):
"""
Retrieve formatted model information for display.
Args:
model_info (dict): The model's information dictionary.
Returns:
str: A formatted string containing model details.
"""
info = model_info
class_ids = info.get('class_ids', {})
class_image_counts = info.get('class_image_counts', {})
datasets_used = info.get('datasets_used', [])
class_ids_formatted = "\n".join([f"{cid}: {cname}" for cid, cname in class_ids.items()])
class_image_counts_formatted = "\n".join([f"{cname}: {count}" for cname, count in class_image_counts.items()])
datasets_used_formatted = "\n".join([f"- {dataset}" for dataset in datasets_used])
info_text = (
f"**{info.get('display_name', 'Model Name')}**\n\n"
f"**Architecture:** {info.get('architecture', 'N/A')}\n\n"
f"**Training Epochs:** {info.get('training_epochs', 'N/A')}\n\n"
f"**Batch Size:** {info.get('batch_size', 'N/A')}\n\n"
f"**Optimizer:** {info.get('optimizer', 'N/A')}\n\n"
f"**Learning Rate:** {info.get('learning_rate', 'N/A')}\n\n"
f"**Data Augmentation Level:** {info.get('data_augmentation_level', 'N/A')}\n\n"
f"**[email protected]:** {info.get('mAP_score', 'N/A')}\n\n"
f"**Number of Images Trained On:** {info.get('num_images', 'N/A')}\n\n"
f"**Class IDs:**\n{class_ids_formatted}\n\n"
f"**Datasets Used:**\n{datasets_used_formatted}\n\n"
f"**Class Image Counts:**\n{class_image_counts_formatted}"
)
return info_text
def zip_processed_images(processed_image_paths, model_name):
"""
Create a ZIP file containing all processed images.
Args:
processed_image_paths (list): List of file paths to processed images.
model_name (str): Name of the model used for processing.
Returns:
str: Path to the created ZIP file.
"""
os.makedirs(ZIP_DIR, exist_ok=True)
zip_filename = f"{model_name}_processed_images_{uuid.uuid4().hex}.zip"
zip_path = os.path.join(ZIP_DIR, zip_filename)
with zipfile.ZipFile(zip_path, 'w') as zipf:
for img_path in processed_image_paths:
arcname = os.path.basename(img_path)
zipf.write(img_path, arcname)
print(f"Created ZIP file at {zip_path}.")
return zip_path
def predict_image(model_name, images, confidence, models):
"""
Perform prediction on uploaded images using the selected YOLO model.
Args:
model_name (str): The name of the selected model.
images (list): List of uploaded PIL.Image.Image objects.
confidence (float): The confidence threshold for detections.
models (dict): The dictionary containing models and their info.
Returns:
tuple: A status message, list of processed images, and a ZIP file for download.
"""
model_entry = models.get(model_name, {})
model = model_entry.get('model', None)
if not model:
return "Error: Model not found.", None, None
try:
# Ensure temporary and output directories exist
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
processed_image_paths = []
processed_images = []
for idx, image in enumerate(images):
# Generate unique filenames to avoid conflicts
unique_id = uuid.uuid4().hex
input_image_path = os.path.join(TEMP_DIR, f"{model_name}_input_image_{unique_id}.jpg")
output_image_path = os.path.join(OUTPUT_DIR, f"{model_name}_output_image_{unique_id}.jpg")
# Save the uploaded image to a temporary path
image.save(input_image_path)
# Perform prediction with user-specified confidence
results = model(input_image_path, save=True, save_txt=False, conf=confidence)
# Determine the output path
# Ultralytics YOLO saves the results in 'runs/detect/predict' by default
latest_run = sorted(Path("runs/detect").glob("predict*"), key=os.path.getmtime)[-1]
detected_image_path = os.path.join(latest_run, Path(input_image_path).name)
if not os.path.isfile(detected_image_path):
# Alternative method to get the output path
detected_image_path = results[0].save()[0]
# Copy the output image to OUTPUT_DIR with a unique name
shutil.copy(detected_image_path, output_image_path)
processed_image_paths.append(output_image_path)
# Open the processed image for display
processed_image = Image.open(output_image_path)
processed_images.append(processed_image)
# Create a ZIP file containing all processed images
zip_path = zip_processed_images(processed_image_paths, model_name)
return "βœ… Prediction completed successfully.", processed_images, zip_path
except Exception as e:
return f"❌ Error during prediction: {str(e)}", None, None
def main():
# Load the models and their information
models = load_models()
if not models:
print("No models loaded. Please check your models_info.json and model URLs.")
return
# Initialize Gradio Blocks interface
with gr.Blocks() as demo:
gr.Markdown("# πŸ§ͺ YOLOv11 Model Tester")
gr.Markdown(
"""
Upload one or multiple images to test different YOLOv11 models. Select a model from the dropdown to see its details.
"""
)
# Model selection and info
with gr.Row():
model_dropdown = gr.Dropdown(
choices=[models[m]['display_name'] for m in models],
label="Select Model",
value=None
)
model_info = gr.Markdown("**Model Information will appear here.**")
# Mapping from display_name to model_name
display_to_name = {models[m]['display_name']: m for m in models}
# Update model_info when a model is selected
def update_model_info(selected_display_name):
if not selected_display_name:
return "Please select a model."
model_name = display_to_name.get(selected_display_name)
if not model_name:
return "Model information not available."
model_entry = models[model_name]['info']
return get_model_info(model_entry)
model_dropdown.change(
fn=update_model_info,
inputs=model_dropdown,
outputs=model_info
)
# Confidence Threshold Slider
with gr.Row():
confidence_slider = gr.Slider(
minimum=0.0,
maximum=1.0,
step=0.01,
value=0.25,
label="Confidence Threshold",
info="Adjust the minimum confidence required for detections to be displayed."
)
# Image Prediction Tab (now supporting multiple images)
with gr.Tab("πŸ–ΌοΈ Image"):
with gr.Column():
image_input = gr.Images(
label="Upload Images for Prediction",
type='pil'
)
image_predict_btn = gr.Button("πŸ” Predict on Images")
image_status = gr.Markdown("**Status will appear here.**")
image_gallery = gr.Gallery(label="Predicted Images").style(grid=[2], height="auto")
image_download_btn = gr.File(label="⬇️ Download All Processed Images (ZIP)")
# Define the image prediction function
def process_image(selected_display_name, images, confidence):
if not selected_display_name:
return "❌ Please select a model.", None, None
if not images:
return "❌ Please upload at least one image.", None, None
model_name = display_to_name.get(selected_display_name)
return predict_image(model_name, images, confidence, models)
# Connect the predict button
image_predict_btn.click(
fn=process_image,
inputs=[model_dropdown, image_input, confidence_slider],
outputs=[image_status, image_gallery, image_download_btn]
)
gr.Markdown(
"""
---
**Note:** Models are downloaded from GitHub upon first use. Ensure that you have a stable internet connection and sufficient storage space.
"""
)
# Launch the Gradio app
demo.launch()
if __name__ == "__main__":
main()