# Copyright (c) Facebook, Inc. and its affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import os import re import tempfile import torch import sys import numpy as np from huggingface_hub import hf_hub_download from torch.cuda.amp import autocast # Setup TTS env if "vits" not in sys.path: sys.path.append("vits") from vits import commons, utils from vits.models import SynthesizerTrn # Load TTS languages only once TTS_LANGUAGES = {} with open("data/tts/all_langs.tsv") as f: for line in f: iso, name = line.split(" ", 1) TTS_LANGUAGES[iso.strip()] = name.strip() # Set device once (prefer GPU if available) device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() and torch.backends.mps.is_built() else "cpu") print(f"Using device: {device}") class TextMapper: def __init__(self, vocab_file): self.symbols = [x.replace("\n", "") for x in open(vocab_file, encoding="utf-8").readlines()] self.SPACE_ID = self.symbols.index(" ") self._symbol_to_id = {s: i for i, s in enumerate(self.symbols)} self._id_to_symbol = {i: s for i, s in enumerate(self.symbols)} def text_to_sequence(self, text, cleaner_names): """Converts a string of text to a sequence of IDs corresponding to the symbols in the text.""" sequence = [] clean_text = text.strip() for symbol in clean_text: symbol_id = self._symbol_to_id[symbol] sequence.append(symbol_id) return sequence def uromanize(self, text, uroman_pl): """Romanizes text using the uroman script.""" iso = "xxx" with tempfile.NamedTemporaryFile() as tf, tempfile.NamedTemporaryFile() as tf2: with open(tf.name, "w") as f: f.write("\n".join([text])) cmd = f"perl {uroman_pl} -l {iso} < {tf.name} > {tf2.name}" os.system(cmd) with open(tf2.name) as f: outtexts = [re.sub(r"\s+", " ", line).strip() for line in f] return outtexts[0] def get_text(self, text, hps): """Normalizes text and converts it to a tensor.""" text_norm = self.text_to_sequence(text, hps.data.text_cleaners) if hps.data.add_blank: text_norm = commons.intersperse(text_norm, 0) return torch.LongTensor(text_norm) def filter_oov(self, text, lang=None): """Filters out-of-vocabulary characters.""" text = self.preprocess_char(text, lang=lang) val_chars = self._symbol_to_id return "".join([x for x in text if x in val_chars]) def preprocess_char(self, text, lang=None): """Special treatment of characters in certain languages.""" if lang == "ron": text = text.replace("ț", "ţ") print(f"{lang} (ț -> ţ): {text}") return text def synthesize(text=None, lang=None, speed=1.0): """Synthesizes speech from text using the specified language and speed.""" if not text: return None, "<>" lang_code = lang.split()[0].strip() # Download model files (vocab, config, and checkpoint) vocab_file = hf_hub_download(repo_id="facebook/mms-tts", filename="vocab.txt", subfolder=f"models/{lang_code}") config_file = hf_hub_download(repo_id="facebook/mms-tts", filename="config.json", subfolder=f"models/{lang_code}") g_pth = hf_hub_download(repo_id="facebook/mms-tts", filename="G_100000.pth", subfolder=f"models/{lang_code}") # Load model configuration hps = utils.get_hparams_from_file(config_file) text_mapper = TextMapper(vocab_file) # Initialize and load the model net_g = SynthesizerTrn( len(text_mapper.symbols), hps.data.filter_length // 2 + 1, hps.train.segment_size // hps.data.hop_length, **hps.model, ).to(device) _ = net_g.eval() _ = utils.load_checkpoint(g_pth, net_g, None) # Romanize text if required is_uroman = hps.data.training_files.split(".")[-1] == "uroman" if is_uroman: uroman_pl = os.path.join("uroman", "bin", "uroman.pl") text = text_mapper.uromanize(text, uroman_pl) # Preprocess text text = text.lower() text = text_mapper.filter_oov(text, lang=lang) stn_tst = text_mapper.get_text(text, hps) # Perform inference with mixed precision with torch.no_grad(), autocast(): x_tst = stn_tst.unsqueeze(0).to(device) x_tst_lengths = torch.LongTensor([stn_tst.size(0)]).to(device) hyp = ( net_g.infer( x_tst, x_tst_lengths, noise_scale=0.667, noise_scale_w=0.8, length_scale=1.0 / speed, )[0][0, 0] .cpu() .float() .numpy() ) return (hps.data.sampling_rate, hyp), text