api / main.py
saq1b's picture
Make upload_video call asynchronous in make_slideshow function
858e1a3
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, HttpUrl
from dotenv import load_dotenv
from typing import List
import os
import asyncio
import uuid
import aiohttp
import re
import shutil
import aiofiles
load_dotenv()
# Create FastAPI app
app = FastAPI()
# Create and mount staticfiles directory
os.makedirs("staticfiles", exist_ok=True)
app.mount("/static", StaticFiles(directory="staticfiles"), name="static")
# Define input model for the request
class SlideshowRequest(BaseModel):
image_urls: List[HttpUrl]
audio_url: HttpUrl
duration: int
zoom: bool = False
def extract_google_drive_id(url):
"""Extract file ID from a Google Drive URL"""
pattern = r'(?:/file/d/|id=|/open\?id=)([^/&]+)'
match = re.search(pattern, str(url))
return match.group(1) if match else None
async def download_file(url, local_path):
"""Download a file from URL to local path asynchronously"""
try:
# Handle Google Drive URLs
if "drive.google.com" in str(url):
file_id = extract_google_drive_id(url)
if file_id:
url = f"https://drive.google.com/uc?export=download&id={file_id}"
async with aiohttp.ClientSession() as session:
async with session.get(str(url)) as response:
response.raise_for_status()
async with aiofiles.open(local_path, 'wb') as f:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
await f.write(chunk)
return True
except Exception as e:
print(f"Error downloading {url}: {str(e)}")
return False
async def upload_video(url):
"""Upload a video and return the upload URL"""
try:
data = {
'file': str(url),
'upload_preset': 'video-input-production',
}
async with aiohttp.ClientSession() as session:
async with session.post(
os.getenv("VIDEO_UPLOAD_URL"),
data=data
) as response:
if response.status != 200:
print(f"Upload failed with status {response.status}")
return None
result = await response.json()
return result.get('secure_url')
except Exception as e:
print(f"Error uploading video: {str(e)}")
return None
async def create_slideshow(image_paths, audio_path, output_path, duration, zoom=False, zoom_ratio=0.04):
"""Generate slideshow from images and audio using ffmpeg asynchronously"""
# Create temporary file list for ffmpeg concat (used in non-zoom mode)
concat_file = "temp_concat.txt"
if not zoom:
# Original implementation without zoom effect
async with aiofiles.open(concat_file, "w") as f:
for img in image_paths:
await f.write(f"file '{img}'\n")
await f.write(f"duration {duration}\n")
# Add the last image again without duration (required by ffmpeg)
if image_paths:
await f.write(f"file '{image_paths[-1]}'\n")
# Run ffmpeg command to create slideshow with audio
total_duration = len(image_paths) * duration
cmd = [
"ffmpeg",
"-f", "concat",
"-safe", "0",
"-i", concat_file,
"-i", audio_path,
"-c:v", "libx264",
"-pix_fmt", "yuv420p",
"-c:a", "aac",
"-shortest",
"-y",
"-t", str(total_duration),
output_path
]
else:
# Complex implementation with zoom effect
# For each image, we remove the per-input duration and instead do a trimmed zoompan.
filters = []
for i, img in enumerate(image_paths):
filter_str = (
f"[{i}:v]scale=1920:1080:force_original_aspect_ratio=decrease,"
f"pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,"
f"zoompan=z='min(zoom+0.0015,1.5)':d={duration*25}:"
f"x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)',"
f"trim=duration={duration},setpts=PTS-STARTPTS[v{i}];"
)
filters.append(filter_str)
# Concatenate all zoompan outputs
filter_complex = "".join(filters)
for i in range(len(image_paths)):
filter_complex += f"[v{i}]"
filter_complex += f"concat=n={len(image_paths)}:v=1:a=0[outv]"
cmd = ["ffmpeg", "-y"]
# Remove duration here; let zoompan+trim control each clip's length.
for img in image_paths:
cmd.extend(["-loop", "1", "-i", img])
cmd.extend([
"-i", audio_path,
"-filter_complex", filter_complex,
"-map", "[outv]",
"-map", f"{len(image_paths)}:a",
"-c:v", "libx264",
"-pix_fmt", "yuv420p",
"-c:a", "aac",
"-shortest",
output_path
])
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if os.path.exists(concat_file):
os.remove(concat_file)
if process.returncode != 0:
print(f"FFmpeg error: {stderr.decode()}")
return False
return True
except Exception as e:
print(f"FFmpeg error: {str(e)}")
if os.path.exists(concat_file):
os.remove(concat_file)
return False
@app.post("/make_slideshow")
async def make_slideshow(request: SlideshowRequest):
"""
Create a slideshow from images and audio with specified duration per image.
Returns the URL of the generated video.
"""
# Create unique directory for this request
request_id = str(uuid.uuid4())
request_dir = os.path.join("staticfiles", request_id)
os.makedirs(request_dir, exist_ok=True)
try:
# Download images
image_paths = []
download_tasks = []
for i, url in enumerate(request.image_urls):
image_path = os.path.join(request_dir, f"image_{i:03d}.png")
image_paths.append(image_path)
download_tasks.append(download_file(url, image_path))
# Download audio
audio_path = os.path.join(request_dir, "audio.mp3")
download_tasks.append(download_file(request.audio_url, audio_path))
# Wait for all downloads to complete
results = await asyncio.gather(*download_tasks)
# Check if all downloads were successful
if not all(results[:-1]): # All image downloads
raise HTTPException(status_code=400, detail="Failed to download one or more images")
if not results[-1]: # Audio download
raise HTTPException(status_code=400, detail=f"Failed to download audio: {request.audio_url}")
# Output video path
output_path = os.path.join(request_dir, "slideshow.mp4")
# Generate slideshow
if not await create_slideshow(image_paths, audio_path, output_path, request.duration, request.zoom):
raise HTTPException(status_code=500, detail="Failed to create slideshow")
# Return URL to the video
base_url = "https://saq1b-api.hf.space/static"
video_url = f"{base_url}/{request_id}/slideshow.mp4"
uploaded_video_url = await upload_video(video_url)
return {"url": uploaded_video_url}
except Exception as e:
# Clean up on error
if os.path.exists(request_dir):
shutil.rmtree(request_dir)
raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)