from fastapi import FastAPI, HTTPException from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, HttpUrl from dotenv import load_dotenv from typing import List import os import asyncio import uuid import aiohttp import re import shutil import aiofiles load_dotenv() # Create FastAPI app app = FastAPI() # Create and mount staticfiles directory os.makedirs("staticfiles", exist_ok=True) app.mount("/static", StaticFiles(directory="staticfiles"), name="static") # Define input model for the request class SlideshowRequest(BaseModel): image_urls: List[HttpUrl] audio_url: HttpUrl duration: int zoom: bool = False def extract_google_drive_id(url): """Extract file ID from a Google Drive URL""" pattern = r'(?:/file/d/|id=|/open\?id=)([^/&]+)' match = re.search(pattern, str(url)) return match.group(1) if match else None async def download_file(url, local_path): """Download a file from URL to local path asynchronously""" try: # Handle Google Drive URLs if "drive.google.com" in str(url): file_id = extract_google_drive_id(url) if file_id: url = f"https://drive.google.com/uc?export=download&id={file_id}" async with aiohttp.ClientSession() as session: async with session.get(str(url)) as response: response.raise_for_status() async with aiofiles.open(local_path, 'wb') as f: while True: chunk = await response.content.read(8192) if not chunk: break await f.write(chunk) return True except Exception as e: print(f"Error downloading {url}: {str(e)}") return False async def upload_video(url): """Upload a video and return the upload URL""" try: data = { 'file': str(url), 'upload_preset': 'video-input-production', } async with aiohttp.ClientSession() as session: async with session.post( os.getenv("VIDEO_UPLOAD_URL"), data=data ) as response: if response.status != 200: print(f"Upload failed with status {response.status}") return None result = await response.json() return result.get('secure_url') except Exception as e: print(f"Error uploading video: {str(e)}") return None async def create_slideshow(image_paths, audio_path, output_path, duration, zoom=False, zoom_ratio=0.04): """Generate slideshow from images and audio using ffmpeg asynchronously""" # Create temporary file list for ffmpeg concat (used in non-zoom mode) concat_file = "temp_concat.txt" if not zoom: # Original implementation without zoom effect async with aiofiles.open(concat_file, "w") as f: for img in image_paths: await f.write(f"file '{img}'\n") await f.write(f"duration {duration}\n") # Add the last image again without duration (required by ffmpeg) if image_paths: await f.write(f"file '{image_paths[-1]}'\n") # Run ffmpeg command to create slideshow with audio total_duration = len(image_paths) * duration cmd = [ "ffmpeg", "-f", "concat", "-safe", "0", "-i", concat_file, "-i", audio_path, "-c:v", "libx264", "-pix_fmt", "yuv420p", "-c:a", "aac", "-shortest", "-y", "-t", str(total_duration), output_path ] else: # Complex implementation with zoom effect # For each image, we remove the per-input duration and instead do a trimmed zoompan. filters = [] for i, img in enumerate(image_paths): filter_str = ( f"[{i}:v]scale=1920:1080:force_original_aspect_ratio=decrease," f"pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1," f"zoompan=z='min(zoom+0.0015,1.5)':d={duration*25}:" f"x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)'," f"trim=duration={duration},setpts=PTS-STARTPTS[v{i}];" ) filters.append(filter_str) # Concatenate all zoompan outputs filter_complex = "".join(filters) for i in range(len(image_paths)): filter_complex += f"[v{i}]" filter_complex += f"concat=n={len(image_paths)}:v=1:a=0[outv]" cmd = ["ffmpeg", "-y"] # Remove duration here; let zoompan+trim control each clip's length. for img in image_paths: cmd.extend(["-loop", "1", "-i", img]) cmd.extend([ "-i", audio_path, "-filter_complex", filter_complex, "-map", "[outv]", "-map", f"{len(image_paths)}:a", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-c:a", "aac", "-shortest", output_path ]) try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if os.path.exists(concat_file): os.remove(concat_file) if process.returncode != 0: print(f"FFmpeg error: {stderr.decode()}") return False return True except Exception as e: print(f"FFmpeg error: {str(e)}") if os.path.exists(concat_file): os.remove(concat_file) return False @app.post("/make_slideshow") async def make_slideshow(request: SlideshowRequest): """ Create a slideshow from images and audio with specified duration per image. Returns the URL of the generated video. """ # Create unique directory for this request request_id = str(uuid.uuid4()) request_dir = os.path.join("staticfiles", request_id) os.makedirs(request_dir, exist_ok=True) try: # Download images image_paths = [] download_tasks = [] for i, url in enumerate(request.image_urls): image_path = os.path.join(request_dir, f"image_{i:03d}.png") image_paths.append(image_path) download_tasks.append(download_file(url, image_path)) # Download audio audio_path = os.path.join(request_dir, "audio.mp3") download_tasks.append(download_file(request.audio_url, audio_path)) # Wait for all downloads to complete results = await asyncio.gather(*download_tasks) # Check if all downloads were successful if not all(results[:-1]): # All image downloads raise HTTPException(status_code=400, detail="Failed to download one or more images") if not results[-1]: # Audio download raise HTTPException(status_code=400, detail=f"Failed to download audio: {request.audio_url}") # Output video path output_path = os.path.join(request_dir, "slideshow.mp4") # Generate slideshow if not await create_slideshow(image_paths, audio_path, output_path, request.duration, request.zoom): raise HTTPException(status_code=500, detail="Failed to create slideshow") # Return URL to the video base_url = "https://saq1b-api.hf.space/static" video_url = f"{base_url}/{request_id}/slideshow.mp4" uploaded_video_url = await upload_video(video_url) return {"url": uploaded_video_url} except Exception as e: # Clean up on error if os.path.exists(request_dir): shutil.rmtree(request_dir) raise HTTPException(status_code=500, detail=f"Error: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)