|
import os |
|
import time |
|
import asyncio |
|
from fastapi import FastAPI |
|
import edge_tts |
|
from fastapi.responses import FileResponse |
|
from pydub import AudioSegment |
|
|
|
app = FastAPI() |
|
|
|
|
|
def split_text(text, max_chunk_size=500): |
|
"""Split text into smaller chunks at sentence boundaries.""" |
|
sentences = text.replace('ΰ₯€', '.').replace('Ψ', '?').split('.') |
|
chunks = [] |
|
current_chunk = [] |
|
current_length = 0 |
|
|
|
for sentence in sentences: |
|
sentence = sentence.strip() + '.' |
|
sentence_length = len(sentence) |
|
|
|
if current_length + sentence_length > max_chunk_size and current_chunk: |
|
chunks.append(' '.join(current_chunk)) |
|
current_chunk = [] |
|
current_length = 0 |
|
|
|
current_chunk.append(sentence) |
|
current_length += sentence_length |
|
|
|
if current_chunk: |
|
chunks.append(' '.join(current_chunk)) |
|
|
|
return chunks |
|
|
|
|
|
async def process_chunk(text, voice, temp_dir, chunk_index): |
|
"""Generate speech for a single chunk and save as MP3.""" |
|
tmp_path = os.path.join(temp_dir, f"chunk_{chunk_index}.mp3") |
|
print(f"π€ Processing chunk {chunk_index}: {text[:50]}...") |
|
communicate = edge_tts.Communicate(text, voice) |
|
await communicate.save(tmp_path) |
|
return tmp_path |
|
|
|
|
|
async def combine_audio_files(chunk_files, output_path): |
|
"""Combine multiple MP3 files into one final MP3.""" |
|
combined = AudioSegment.empty() |
|
|
|
for file in chunk_files: |
|
print(f"πΉ Adding {file} to final output") |
|
combined += AudioSegment.from_mp3(file) |
|
|
|
combined.export(output_path, format="mp3") |
|
|
|
|
|
for file in chunk_files: |
|
os.remove(file) |
|
|
|
@app.get("/") |
|
def home(): |
|
return {"message": "β
EdgeTTS FastAPI is running!"} |
|
|
|
|
|
@app.get("/tts") |
|
async def tts(text: str, voice: str = "en-US-AriaNeural"): |
|
"""Main API function to handle text-to-speech conversion.""" |
|
temp_dir = "temp_audio" |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
chunks = split_text(text) |
|
|
|
|
|
if len(chunks) == 1: |
|
print("π’ Processing without chunking...") |
|
output_file = await process_chunk(text, voice, temp_dir, 0) |
|
return FileResponse(output_file, media_type="audio/mpeg", filename="speech.mp3") |
|
|
|
print(f"π Splitting into {len(chunks)} chunks and processing concurrently...") |
|
|
|
|
|
chunk_files = await asyncio.gather(*[ |
|
process_chunk(ch, voice, temp_dir, i) for i, ch in enumerate(chunks) |
|
]) |
|
|
|
|
|
output_file = "final_output.mp3" |
|
await combine_audio_files(chunk_files, output_file) |
|
|
|
print("β
TTS Generation Complete. Sending response...") |
|
return FileResponse(output_file, media_type="audio/mpeg", filename="speech.mp3") |
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|