generateAudio / app.py
SAUL19's picture
Update app.py
c5e2889
import gradio as gr
from gradio.inputs import Textbox
import re
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan
from datasets import load_dataset
import torch
import soundfile as sf
import boto3
from io import BytesIO
import os
import botocore
from time import sleep
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
S3_BUCKET_NAME = os.getenv("BUCKET_NAME")
FOLDER = 'public/mdx/'
device = "cuda" if torch.cuda.is_available() else "cpu"
# load the processor
processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")
# load the model
model = SpeechT5ForTextToSpeech.from_pretrained(
"microsoft/speecht5_tts").to(device)
# load the vocoder, that is the voice encoder
vocoder = SpeechT5HifiGan.from_pretrained(
"microsoft/speecht5_hifigan").to(device)
# we load this dataset to get the speaker embeddings
embeddings_dataset = load_dataset(
"Matthijs/cmu-arctic-xvectors", split="validation")
# speaker ids from the embeddings dataset
speakers = {
'awb': 0, # Scottish male
'bdl': 1138, # US male
'clb': 2271, # US female
'jmk': 3403, # Canadian male
'ksp': 4535, # Indian male
'rms': 5667, # US male
'slt': 6799 # US female
}
def generateAudio(text_to_audio, s3_save_as, key_id):
if AWS_ACCESS_KEY_ID != key_id:
return "not permition"
s3_save_as = '-'.join(s3_save_as.split()) + ".wav"
def cut_text(text, max_tokens=500):
# Remove non-alphanumeric characters, except periods and commas
text = re.sub(r"[^\w\s.,]", "", text)
# Replace multiple spaces with a single space
text = re.sub(r"\s{2,}", " ", text)
# Remove line breaks
text = re.sub(r"\n", " ", text)
return text
def save_audio_to_s3(audio):
try:
# Create an instance of the S3 client
s3 = boto3.client('s3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
# Full path of the file in the bucket
s3_key = "public/" + s3_save_as
# Upload the audio file to the S3 bucket
s3.upload_fileobj(audio, S3_BUCKET_NAME, s3_key)
print("SUCCESS SAVE IN S3 WHERE" + s3_key + " & " + S3_BUCKET_NAME)
except Exception as err:
print("Error al guardar")
print(err)
def save_text_to_speech(text, speaker=None):
# Preprocess text and recortar
text = cut_text(text, max_tokens=500)
# Divide el texto en segmentos de 30 palabras
palabras = text.split()
segmentos = [' '.join(palabras[i:i+30]) for i in range(0, len(palabras), 30)]
# Generar audio para cada segmento y combinarlos
audio_segments = []
for segment in segmentos:
inputs = processor(text=segment, return_tensors="pt").to(device)
if speaker is not None:
speaker_embeddings = torch.tensor(embeddings_dataset[speaker]["xvector"]).unsqueeze(0).to(device)
else:
speaker_embeddings = torch.randn((1, 512)).to(device)
speech = model.generate_speech(inputs["input_ids"], speaker_embeddings, vocoder=vocoder)
audio_segments.append(speech)
combined_audio = torch.cat(audio_segments, dim=0)
# Crear objeto BytesIO para almacenar el audio
audio_buffer = BytesIO()
sf.write(audio_buffer, combined_audio.cpu().numpy(), samplerate=16000, format='WAV')
audio_buffer.seek(0)
# Guardar el audio combinado en S3
save_audio_to_s3(audio_buffer)
save_text_to_speech(text_to_audio, 2271)
return s3_save_as
def check_if_exist(bucket_name, key):
s3 = boto3.resource('s3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
try:
s3.Object(bucket_name, key).load()
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
# The object does not exist.
return False
else:
# Something else has gone wrong.
raise
else:
return True
def list_s3_files():
s3_client = boto3.client('s3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
s3 = boto3.resource('s3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
my_bucket = s3.Bucket(S3_BUCKET_NAME)
for objects in my_bucket.objects.filter(Prefix=FOLDER):
filename_ext = '%s' % os.path.basename(objects.key)
filename = os.path.splitext(filename_ext)[0]
s3audio = 'public/%s.wav' % filename
if check_if_exist(S3_BUCKET_NAME, s3audio):
print('Audio %s already exists!' % s3audio)
else:
KEY = 'public/mdx/' + filename_ext
response = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=KEY)
content = response['Body'].read().decode('utf-8')
if (len(content)):
generateAudio(content, filename, AWS_ACCESS_KEY_ID)
print("SUCCESS " + filename + ".wap")
else:
print("NOT CONTENT:" + filename_ext + ".md")
sleep(500/1000)
demo = gr.Blocks()
with demo:
text = gr.Textbox()
bimage = gr.Button("Generate Blog Images for PineSearch!")
bimage.click(list_s3_files, outputs=text)
demo.launch()