from fastapi import FastAPI, BackgroundTasks import edge_tts import asyncio import os import time from fastapi.responses import FileResponse, JSONResponse from typing import List import pydub app = FastAPI() # Global dictionary to track active requests active_requests = {} def split_text(text, max_chunk_size=500): """Split text into chunks at sentence boundaries.""" sentences = text.replace('।', '.').replace('؟', '?').split('.') chunks = [] current_chunk = [] current_length = 0 for sentence in sentences: sentence = sentence.strip() + '.' sentence_length = len(sentence) if current_length + sentence_length > max_chunk_size and current_chunk: chunks.append(' '.join(current_chunk)) current_chunk = [] current_length = 0 current_chunk.append(sentence) current_length += sentence_length if current_chunk: chunks.append(' '.join(current_chunk)) return chunks async def process_chunk(text, voice, temp_dir, chunk_index): """Process a single chunk of text asynchronously.""" tmp_path = os.path.join(temp_dir, f"chunk_{chunk_index}_{int(time.time())}.mp3") communicate = edge_tts.Communicate(text, voice) await communicate.save(tmp_path) return tmp_path async def combine_audio_files(chunk_files, output_path): """Combine multiple MP3 files into one.""" combined = pydub.AudioSegment.empty() for file in chunk_files: audio_segment = pydub.AudioSegment.from_mp3(file) combined += audio_segment combined.export(output_path, format="mp3") # Cleanup chunk files for file in chunk_files: try: os.remove(file) except: pass @app.get("/") def home(): return {"message": "EdgeTTS FastAPI is running!"} @app.get("/health") def health_check(): """Check if the API is running and how many requests are active.""" return {"status": "running", "active_requests": len(active_requests)} @app.get("/status") def status(): """Return the list of active requests being processed.""" return {"active_requests": list(active_requests.keys())} @app.get("/tts") async def tts(text: str, voice: str = "en-US-JennyNeural", background_tasks: BackgroundTasks = None): """Generate speech from text using EdgeTTS with parallel processing.""" request_id = f"{int(time.time())}_{os.urandom(4).hex()}" active_requests[request_id] = "processing" try: output_file = f"output_{request_id}.mp3" temp_dir = f"temp_{request_id}" os.makedirs(temp_dir, exist_ok=True) chunks = split_text(text) tasks = [process_chunk(chunk, voice, temp_dir, i) for i, chunk in enumerate(chunks)] chunk_files = await asyncio.gather(*tasks) await combine_audio_files(chunk_files, output_file) background_tasks.add_task(cleanup_request, request_id) return FileResponse(output_file, media_type="audio/mpeg", filename="speech.mp3") except Exception as e: del active_requests[request_id] return JSONResponse(content={"error": str(e)}, status_code=500) def cleanup_request(request_id): """Cleanup function to remove temporary files.""" del active_requests[request_id] temp_dir = f"temp_{request_id}" if os.path.exists(temp_dir): for file in os.listdir(temp_dir): os.remove(os.path.join(temp_dir, file)) os.rmdir(temp_dir) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)