|
from fastapi import FastAPI, HTTPException |
|
from fastapi.staticfiles import StaticFiles |
|
from pydantic import BaseModel, HttpUrl |
|
from dotenv import load_dotenv |
|
from typing import List |
|
import os |
|
import asyncio |
|
import uuid |
|
import aiohttp |
|
import re |
|
import shutil |
|
import aiofiles |
|
|
|
load_dotenv() |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
os.makedirs("staticfiles", exist_ok=True) |
|
app.mount("/static", StaticFiles(directory="staticfiles"), name="static") |
|
|
|
|
|
class SlideshowRequest(BaseModel): |
|
image_urls: List[HttpUrl] |
|
audio_url: HttpUrl |
|
duration: int |
|
zoom: bool = False |
|
|
|
def extract_google_drive_id(url): |
|
"""Extract file ID from a Google Drive URL""" |
|
pattern = r'(?:/file/d/|id=|/open\?id=)([^/&]+)' |
|
match = re.search(pattern, str(url)) |
|
return match.group(1) if match else None |
|
|
|
async def download_file(url, local_path): |
|
"""Download a file from URL to local path asynchronously""" |
|
try: |
|
|
|
if "drive.google.com" in str(url): |
|
file_id = extract_google_drive_id(url) |
|
if file_id: |
|
url = f"https://drive.google.com/uc?export=download&id={file_id}" |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.get(str(url)) as response: |
|
response.raise_for_status() |
|
|
|
async with aiofiles.open(local_path, 'wb') as f: |
|
while True: |
|
chunk = await response.content.read(8192) |
|
if not chunk: |
|
break |
|
await f.write(chunk) |
|
return True |
|
except Exception as e: |
|
print(f"Error downloading {url}: {str(e)}") |
|
return False |
|
|
|
async def upload_video(url): |
|
"""Upload a video and return the upload URL""" |
|
try: |
|
data = { |
|
'file': str(url), |
|
'upload_preset': 'video-input-production', |
|
} |
|
|
|
async with aiohttp.ClientSession() as session: |
|
async with session.post( |
|
os.getenv("VIDEO_UPLOAD_URL"), |
|
data=data |
|
) as response: |
|
if response.status != 200: |
|
print(f"Upload failed with status {response.status}") |
|
return None |
|
|
|
result = await response.json() |
|
return result.get('secure_url') |
|
|
|
except Exception as e: |
|
print(f"Error uploading video: {str(e)}") |
|
return None |
|
|
|
async def create_slideshow(image_paths, audio_path, output_path, duration, zoom=False, zoom_ratio=0.04): |
|
"""Generate slideshow from images and audio using ffmpeg asynchronously""" |
|
|
|
concat_file = "temp_concat.txt" |
|
|
|
if not zoom: |
|
|
|
async with aiofiles.open(concat_file, "w") as f: |
|
for img in image_paths: |
|
await f.write(f"file '{img}'\n") |
|
await f.write(f"duration {duration}\n") |
|
|
|
|
|
if image_paths: |
|
await f.write(f"file '{image_paths[-1]}'\n") |
|
|
|
|
|
total_duration = len(image_paths) * duration |
|
cmd = [ |
|
"ffmpeg", |
|
"-f", "concat", |
|
"-safe", "0", |
|
"-i", concat_file, |
|
"-i", audio_path, |
|
"-c:v", "libx264", |
|
"-pix_fmt", "yuv420p", |
|
"-c:a", "aac", |
|
"-shortest", |
|
"-y", |
|
"-t", str(total_duration), |
|
output_path |
|
] |
|
else: |
|
|
|
|
|
filters = [] |
|
for i, img in enumerate(image_paths): |
|
filter_str = ( |
|
f"[{i}:v]scale=1920:1080:force_original_aspect_ratio=decrease," |
|
f"pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1," |
|
f"zoompan=z='min(zoom+0.0015,1.5)':d={duration*25}:" |
|
f"x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)'," |
|
f"trim=duration={duration},setpts=PTS-STARTPTS[v{i}];" |
|
) |
|
filters.append(filter_str) |
|
|
|
|
|
filter_complex = "".join(filters) |
|
for i in range(len(image_paths)): |
|
filter_complex += f"[v{i}]" |
|
filter_complex += f"concat=n={len(image_paths)}:v=1:a=0[outv]" |
|
|
|
cmd = ["ffmpeg", "-y"] |
|
|
|
for img in image_paths: |
|
cmd.extend(["-loop", "1", "-i", img]) |
|
|
|
cmd.extend([ |
|
"-i", audio_path, |
|
"-filter_complex", filter_complex, |
|
"-map", "[outv]", |
|
"-map", f"{len(image_paths)}:a", |
|
"-c:v", "libx264", |
|
"-pix_fmt", "yuv420p", |
|
"-c:a", "aac", |
|
"-shortest", |
|
output_path |
|
]) |
|
try: |
|
process = await asyncio.create_subprocess_exec( |
|
*cmd, |
|
stdout=asyncio.subprocess.PIPE, |
|
stderr=asyncio.subprocess.PIPE |
|
) |
|
stdout, stderr = await process.communicate() |
|
|
|
if os.path.exists(concat_file): |
|
os.remove(concat_file) |
|
|
|
if process.returncode != 0: |
|
print(f"FFmpeg error: {stderr.decode()}") |
|
return False |
|
return True |
|
except Exception as e: |
|
print(f"FFmpeg error: {str(e)}") |
|
if os.path.exists(concat_file): |
|
os.remove(concat_file) |
|
return False |
|
|
|
@app.post("/make_slideshow") |
|
async def make_slideshow(request: SlideshowRequest): |
|
""" |
|
Create a slideshow from images and audio with specified duration per image. |
|
Returns the URL of the generated video. |
|
""" |
|
|
|
request_id = str(uuid.uuid4()) |
|
request_dir = os.path.join("staticfiles", request_id) |
|
os.makedirs(request_dir, exist_ok=True) |
|
|
|
try: |
|
|
|
image_paths = [] |
|
download_tasks = [] |
|
|
|
for i, url in enumerate(request.image_urls): |
|
image_path = os.path.join(request_dir, f"image_{i:03d}.png") |
|
image_paths.append(image_path) |
|
download_tasks.append(download_file(url, image_path)) |
|
|
|
|
|
audio_path = os.path.join(request_dir, "audio.mp3") |
|
download_tasks.append(download_file(request.audio_url, audio_path)) |
|
|
|
|
|
results = await asyncio.gather(*download_tasks) |
|
|
|
|
|
if not all(results[:-1]): |
|
raise HTTPException(status_code=400, detail="Failed to download one or more images") |
|
|
|
if not results[-1]: |
|
raise HTTPException(status_code=400, detail=f"Failed to download audio: {request.audio_url}") |
|
|
|
|
|
output_path = os.path.join(request_dir, "slideshow.mp4") |
|
|
|
|
|
if not await create_slideshow(image_paths, audio_path, output_path, request.duration, request.zoom): |
|
raise HTTPException(status_code=500, detail="Failed to create slideshow") |
|
|
|
|
|
base_url = "https://saq1b-api.hf.space/static" |
|
video_url = f"{base_url}/{request_id}/slideshow.mp4" |
|
uploaded_video_url = await upload_video(video_url) |
|
return {"url": uploaded_video_url} |
|
|
|
except Exception as e: |
|
|
|
if os.path.exists(request_dir): |
|
shutil.rmtree(request_dir) |
|
raise HTTPException(status_code=500, detail=f"Error: {str(e)}") |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |