apitts / app.py
hivecorp's picture
Update app.py
a66f077 verified
import os
import time
import asyncio
from fastapi import FastAPI
import edge_tts
from fastapi.responses import FileResponse
from pydub import AudioSegment
app = FastAPI()
# πŸ”Ή Function to split text into manageable chunks
def split_text(text, max_chunk_size=500):
"""Split text into smaller chunks at sentence boundaries."""
sentences = text.replace('ΰ₯€', '.').replace('؟', '?').split('.')
chunks = []
current_chunk = []
current_length = 0
for sentence in sentences:
sentence = sentence.strip() + '.'
sentence_length = len(sentence)
if current_length + sentence_length > max_chunk_size and current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = []
current_length = 0
current_chunk.append(sentence)
current_length += sentence_length
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
# πŸ”Ή Function to process a single chunk asynchronously
async def process_chunk(text, voice, temp_dir, chunk_index):
"""Generate speech for a single chunk and save as MP3."""
tmp_path = os.path.join(temp_dir, f"chunk_{chunk_index}.mp3")
print(f"🎀 Processing chunk {chunk_index}: {text[:50]}...") # Logging for debugging
communicate = edge_tts.Communicate(text, voice)
await communicate.save(tmp_path)
return tmp_path
# πŸ”Ή Function to merge all chunked MP3 files into a single audio file
async def combine_audio_files(chunk_files, output_path):
"""Combine multiple MP3 files into one final MP3."""
combined = AudioSegment.empty()
for file in chunk_files:
print(f"πŸ”Ή Adding {file} to final output") # Logging for debugging
combined += AudioSegment.from_mp3(file)
combined.export(output_path, format="mp3")
# Remove temporary files
for file in chunk_files:
os.remove(file)
@app.get("/")
def home():
return {"message": "βœ… EdgeTTS FastAPI is running!"}
# πŸ”Ή Main TTS API
@app.get("/tts")
async def tts(text: str, voice: str = "en-US-AriaNeural"):
"""Main API function to handle text-to-speech conversion."""
temp_dir = "temp_audio"
os.makedirs(temp_dir, exist_ok=True)
chunks = split_text(text)
# If text is short, process directly
if len(chunks) == 1:
print("πŸ“’ Processing without chunking...")
output_file = await process_chunk(text, voice, temp_dir, 0)
return FileResponse(output_file, media_type="audio/mpeg", filename="speech.mp3")
print(f"πŸš€ Splitting into {len(chunks)} chunks and processing concurrently...")
# πŸ”Ή Concurrently process all chunks
chunk_files = await asyncio.gather(*[
process_chunk(ch, voice, temp_dir, i) for i, ch in enumerate(chunks)
])
# πŸ”Ή Merge all MP3 files
output_file = "final_output.mp3"
await combine_audio_files(chunk_files, output_file)
print("βœ… TTS Generation Complete. Sending response...")
return FileResponse(output_file, media_type="audio/mpeg", filename="speech.mp3")
# πŸ”Ή Ensure app starts in Hugging Face Spaces
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)