File size: 3,199 Bytes
42fcedf 366652b a66f077 e60c650 a66f077 42fcedf a66f077 42fcedf a66f077 42fcedf a66f077 366652b a66f077 42fcedf a66f077 366652b a66f077 42fcedf a66f077 42fcedf a66f077 366652b 42fcedf 366652b 42fcedf a66f077 42fcedf 366652b e60c650 a66f077 e60c650 a66f077 366652b a66f077 366652b a66f077 366652b a66f077 366652b a66f077 366652b a66f077 366652b a66f077 366652b a66f077 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
import os
import time
import asyncio
from fastapi import FastAPI
import edge_tts
from fastapi.responses import FileResponse
from pydub import AudioSegment
app = FastAPI()
# ๐น Function to split text into manageable chunks
def split_text(text, max_chunk_size=500):
"""Split text into smaller chunks at sentence boundaries."""
sentences = text.replace('เฅค', '.').replace('ุ', '?').split('.')
chunks = []
current_chunk = []
current_length = 0
for sentence in sentences:
sentence = sentence.strip() + '.'
sentence_length = len(sentence)
if current_length + sentence_length > max_chunk_size and current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = []
current_length = 0
current_chunk.append(sentence)
current_length += sentence_length
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
# ๐น Function to process a single chunk asynchronously
async def process_chunk(text, voice, temp_dir, chunk_index):
"""Generate speech for a single chunk and save as MP3."""
tmp_path = os.path.join(temp_dir, f"chunk_{chunk_index}.mp3")
print(f"๐ค Processing chunk {chunk_index}: {text[:50]}...") # Logging for debugging
communicate = edge_tts.Communicate(text, voice)
await communicate.save(tmp_path)
return tmp_path
# ๐น Function to merge all chunked MP3 files into a single audio file
async def combine_audio_files(chunk_files, output_path):
"""Combine multiple MP3 files into one final MP3."""
combined = AudioSegment.empty()
for file in chunk_files:
print(f"๐น Adding {file} to final output") # Logging for debugging
combined += AudioSegment.from_mp3(file)
combined.export(output_path, format="mp3")
# Remove temporary files
for file in chunk_files:
os.remove(file)
@app.get("/")
def home():
return {"message": "โ
EdgeTTS FastAPI is running!"}
# ๐น Main TTS API
@app.get("/tts")
async def tts(text: str, voice: str = "en-US-AriaNeural"):
"""Main API function to handle text-to-speech conversion."""
temp_dir = "temp_audio"
os.makedirs(temp_dir, exist_ok=True)
chunks = split_text(text)
# If text is short, process directly
if len(chunks) == 1:
print("๐ข Processing without chunking...")
output_file = await process_chunk(text, voice, temp_dir, 0)
return FileResponse(output_file, media_type="audio/mpeg", filename="speech.mp3")
print(f"๐ Splitting into {len(chunks)} chunks and processing concurrently...")
# ๐น Concurrently process all chunks
chunk_files = await asyncio.gather(*[
process_chunk(ch, voice, temp_dir, i) for i, ch in enumerate(chunks)
])
# ๐น Merge all MP3 files
output_file = "final_output.mp3"
await combine_audio_files(chunk_files, output_file)
print("โ
TTS Generation Complete. Sending response...")
return FileResponse(output_file, media_type="audio/mpeg", filename="speech.mp3")
# ๐น Ensure app starts in Hugging Face Spaces
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
|