import os import time import asyncio from fastapi import FastAPI import edge_tts from fastapi.responses import FileResponse from pydub import AudioSegment app = FastAPI() # 🔹 Function to split text into manageable chunks def split_text(text, max_chunk_size=500): """Split text into smaller chunks at sentence boundaries.""" sentences = text.replace('।', '.').replace('؟', '?').split('.') chunks = [] current_chunk = [] current_length = 0 for sentence in sentences: sentence = sentence.strip() + '.' sentence_length = len(sentence) if current_length + sentence_length > max_chunk_size and current_chunk: chunks.append(' '.join(current_chunk)) current_chunk = [] current_length = 0 current_chunk.append(sentence) current_length += sentence_length if current_chunk: chunks.append(' '.join(current_chunk)) return chunks # 🔹 Function to process a single chunk asynchronously async def process_chunk(text, voice, temp_dir, chunk_index): """Generate speech for a single chunk and save as MP3.""" tmp_path = os.path.join(temp_dir, f"chunk_{chunk_index}.mp3") print(f"🎤 Processing chunk {chunk_index}: {text[:50]}...") # Logging for debugging communicate = edge_tts.Communicate(text, voice) await communicate.save(tmp_path) return tmp_path # 🔹 Function to merge all chunked MP3 files into a single audio file async def combine_audio_files(chunk_files, output_path): """Combine multiple MP3 files into one final MP3.""" combined = AudioSegment.empty() for file in chunk_files: print(f"🔹 Adding {file} to final output") # Logging for debugging combined += AudioSegment.from_mp3(file) combined.export(output_path, format="mp3") # Remove temporary files for file in chunk_files: os.remove(file) @app.get("/") def home(): return {"message": "✅ EdgeTTS FastAPI is running!"} # 🔹 Main TTS API @app.get("/tts") async def tts(text: str, voice: str = "en-US-AriaNeural"): """Main API function to handle text-to-speech conversion.""" temp_dir = "temp_audio" os.makedirs(temp_dir, exist_ok=True) chunks = split_text(text) # If text is short, process directly if len(chunks) == 1: print("📢 Processing without chunking...") output_file = await process_chunk(text, voice, temp_dir, 0) return FileResponse(output_file, media_type="audio/mpeg", filename="speech.mp3") print(f"🚀 Splitting into {len(chunks)} chunks and processing concurrently...") # 🔹 Concurrently process all chunks chunk_files = await asyncio.gather(*[ process_chunk(ch, voice, temp_dir, i) for i, ch in enumerate(chunks) ]) # 🔹 Merge all MP3 files output_file = "final_output.mp3" await combine_audio_files(chunk_files, output_file) print("✅ TTS Generation Complete. Sending response...") return FileResponse(output_file, media_type="audio/mpeg", filename="speech.mp3") # 🔹 Ensure app starts in Hugging Face Spaces if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)