|
import argparse |
|
import gc |
|
import hashlib |
|
import json |
|
import os |
|
import shlex |
|
import subprocess |
|
from contextlib import suppress |
|
from urllib.parse import urlparse, parse_qs |
|
|
|
import gradio as gr |
|
import librosa |
|
import numpy as np |
|
import soundfile as sf |
|
import sox |
|
import yt_dlp |
|
from pedalboard import Pedalboard, Reverb, Compressor, HighpassFilter |
|
from pedalboard.io import AudioFile |
|
from pydub import AudioSegment |
|
|
|
from mdx import run_mdx |
|
from rvc import Config, load_hubert, get_vc, rvc_infer |
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
mdxnet_models_dir = os.path.join(BASE_DIR, 'mdxnet_models') |
|
rvc_models_dir = os.path.join(BASE_DIR, 'rvc_models') |
|
output_dir = os.path.join(BASE_DIR, 'song_output') |
|
|
|
|
|
def get_youtube_video_id(url, ignore_playlist=True): |
|
""" |
|
Extracts the video ID from a YouTube URL. |
|
""" |
|
query = urlparse(url) |
|
if query.hostname == 'youtu.be': |
|
if query.path[1:] == 'watch': |
|
return query.query[2:] |
|
return query.path[1:] |
|
|
|
if query.hostname in {'www.youtube.com', 'youtube.com', 'music.youtube.com'}: |
|
if not ignore_playlist: |
|
with suppress(KeyError): |
|
return parse_qs(query.query)['list'][0] |
|
if query.path == '/watch': |
|
return parse_qs(query.query)['v'][0] |
|
if query.path[:7] == '/watch/': |
|
return query.path.split('/')[1] |
|
if query.path[:7] == '/embed/': |
|
return query.path.split('/')[2] |
|
if query.path[:3] == '/v/': |
|
return query.path.split('/')[2] |
|
if query.path[:8] == '/shorts/': |
|
return query.path.split('/')[2] |
|
|
|
return None |
|
|
|
|
|
|
|
def yt_download(link): |
|
""" |
|
Downloads the best audio format from a YouTube link. |
|
""" |
|
ydl_opts = { |
|
'format': 'bestaudio', |
|
'outtmpl': '%(title)s', |
|
'nocheckcertificate': True, |
|
'ignoreerrors': True, |
|
'no_warnings': True, |
|
'quiet': True, |
|
'extractaudio': True, |
|
'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3'}], |
|
} |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
result = ydl.extract_info(link, download=True) |
|
download_path = ydl.prepare_filename(result, outtmpl='%(title)s.mp3') |
|
|
|
return download_path |
|
|
|
|
|
def raise_exception(error_msg, is_webui): |
|
if is_webui: |
|
raise gr.Error(error_msg) |
|
else: |
|
raise Exception(error_msg) |
|
|
|
|
|
def get_rvc_model(voice_model, is_webui): |
|
rvc_model_filename, rvc_index_filename = None, None |
|
model_dir = os.path.join(rvc_models_dir, voice_model) |
|
for file in os.listdir(model_dir): |
|
ext = os.path.splitext(file)[1] |
|
if ext == '.pth': |
|
rvc_model_filename = file |
|
if ext == '.index': |
|
rvc_index_filename = file |
|
|
|
if rvc_model_filename is None: |
|
error_msg = f'No model file exists in {model_dir}.' |
|
raise_exception(error_msg, is_webui) |
|
|
|
return os.path.join(model_dir, rvc_model_filename), os.path.join(model_dir, rvc_index_filename) if rvc_index_filename else '' |
|
|
|
|
|
def get_audio_paths(song_dir): |
|
orig_song_path = None |
|
instrumentals_path = None |
|
main_vocals_dereverb_path = None |
|
backup_vocals_path = None |
|
|
|
for file in os.listdir(song_dir): |
|
if file.endswith('_Instrumental.wav'): |
|
instrumentals_path = os.path.join(song_dir, file) |
|
orig_song_path = instrumentals_path.replace('_Instrumental', '') |
|
|
|
elif file.endswith('_Vocals_Main_DeReverb.wav'): |
|
main_vocals_dereverb_path = os.path.join(song_dir, file) |
|
|
|
elif file.endswith('_Vocals_Backup.wav'): |
|
backup_vocals_path = os.path.join(song_dir, file) |
|
|
|
return orig_song_path, instrumentals_path, main_vocals_dereverb_path, backup_vocals_path |
|
|
|
|
|
def convert_to_stereo(audio_path): |
|
wave, sr = librosa.load(audio_path, mono=False, sr=44100) |
|
|
|
if type(wave[0]) != np.ndarray: |
|
stereo_path = f'{os.path.splitext(audio_path)[0]}_stereo.wav' |
|
command = shlex.split(f'ffmpeg -y -loglevel error -i "{audio_path}" -ac 2 -f wav "{stereo_path}"') |
|
subprocess.run(command) |
|
return stereo_path |
|
else: |
|
return audio_path |
|
|
|
|
|
def pitch_shift(audio_path, pitch_change): |
|
output_path = f'{os.path.splitext(audio_path)[0]}_p{pitch_change}.wav' |
|
if not os.path.exists(output_path): |
|
y, sr = sf.read(audio_path) |
|
tfm = sox.Transformer() |
|
tfm.pitch(pitch_change) |
|
y_shifted = tfm.build_array(input_array=y, sample_rate_in=sr) |
|
sf.write(output_path, y_shifted, sr) |
|
|
|
return output_path |
|
|
|
|
|
def get_hash(filepath): |
|
with open(filepath, 'rb') as f: |
|
file_hash = hashlib.blake2b() |
|
while chunk := f.read(8192): |
|
file_hash.update(chunk) |
|
|
|
return file_hash.hexdigest()[:11] |
|
|
|
|
|
def display_progress(message, percent, is_webui, progress=None): |
|
if is_webui: |
|
progress(percent, desc=message) |
|
else: |
|
print(message) |
|
|
|
|
|
def preprocess_song(song_input, mdx_model_params, song_id, is_webui, input_type, progress=None): |
|
keep_orig = False |
|
if input_type == 'yt': |
|
display_progress('[~] Downloading song...', 0, is_webui, progress) |
|
song_link = song_input.split('&')[0] |
|
orig_song_path = yt_download(song_link) |
|
elif input_type == 'local': |
|
orig_song_path = song_input |
|
keep_orig = True |
|
else: |
|
orig_song_path = None |
|
|
|
song_output_dir = os.path.join(output_dir, song_id) |
|
orig_song_path = convert_to_stereo(orig_song_path) |
|
|
|
display_progress('[~] Separating Vocals from Instrumental...', 0.1, is_webui, progress) |
|
vocals_path, instrumentals_path = run_mdx(mdx_model_params, song_output_dir, os.path.join(mdxnet_models_dir, 'Kim_Vocal_2.onnx'), orig_song_path, denoise=True, keep_orig=keep_orig) |
|
|
|
display_progress('[~] Separating Main Vocals from Backup Vocals...', 0.2, is_webui, progress) |
|
backup_vocals_path, main_vocals_path = run_mdx(mdx_model_params, song_output_dir, os.path.join(mdxnet_models_dir, 'UVR_MDXNET_KARA_2.onnx'), vocals_path, suffix='Backup', invert_suffix='Main', denoise=True) |
|
|
|
display_progress('[~] Applying DeReverb to Vocals...', 0.3, is_webui, progress) |
|
_, main_vocals_dereverb_path = run_mdx(mdx_model_params, song_output_dir, os.path.join(mdxnet_models_dir, 'Reverb_HQ_By_FoxJoy.onnx'), main_vocals_path, invert_suffix='DeReverb', exclude_main=True, denoise=True) |
|
|
|
return orig_song_path, vocals_path, instrumentals_path, main_vocals_path, backup_vocals_path, main_vocals_dereverb_path |
|
|
|
|
|
def voice_change(voice_model, vocals_path, output_path, pitch_change, f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui): |
|
rvc_model_path, rvc_index_path = get_rvc_model(voice_model, is_webui) |
|
device = 'cuda:0' |
|
config = Config(device, True) |
|
hubert_model = load_hubert(device, config.is_half, os.path.join(rvc_models_dir, 'hubert_base.pt')) |
|
cpt, version, net_g, tgt_sr, vc = get_vc(device, config.is_half, config, rvc_model_path) |
|
|
|
rvc_infer(rvc_index_path, index_rate, vocals_path, output_path, pitch_change, f0_method, cpt, version, net_g, filter_radius, tgt_sr, rms_mix_rate, protect, crepe_hop_length, vc, hubert_model) |
|
del hubert_model, cpt |
|
gc.collect() |
|
|
|
|
|
def add_audio_effects(audio_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping): |
|
output_path = f'{os.path.splitext(audio_path)[0]}_mixed.wav' |
|
|
|
board = Pedalboard( |
|
[ |
|
HighpassFilter(), |
|
Compressor(ratio=4, threshold_db=-15), |
|
Reverb(room_size=reverb_rm_size, dry_level=reverb_dry, wet_level=reverb_wet, damping=reverb_damping) |
|
] |
|
) |
|
|
|
with AudioFile(audio_path) as f: |
|
with AudioFile(output_path, 'w', f.samplerate, f.num_channels) as o: |
|
while f.tell() < f.frames: |
|
chunk = f.read(int(f.samplerate)) |
|
effected = board(chunk, f.samplerate, reset=False) |
|
o.write(effected) |
|
|
|
return output_path |
|
|
|
|
|
def merge_audios(audio_paths, output_path): |
|
combined = AudioSegment.from_file(audio_paths[0]) |
|
for path in audio_paths[1:]: |
|
combined = combined.overlay(AudioSegment.from_file(path)) |
|
combined.export(output_path, format='wav') |
|
|
|
|
|
def process_and_save_song(song_input, input_type, voice_model, pitch_change, f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping, progress, is_webui=False): |
|
song_id = get_hash(song_input) |
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
|
|
song_output_dir = os.path.join(output_dir, song_id) |
|
if not os.path.exists(song_output_dir): |
|
os.makedirs(song_output_dir) |
|
|
|
if input_type == 'yt' and not get_youtube_video_id(song_input): |
|
raise_exception('[!] Invalid YouTube link.', is_webui) |
|
|
|
mdx_model_params = { |
|
'demucs_model_path': os.path.join(mdxnet_models_dir, 'models_demucs.h5'), |
|
'mdx_model_path': os.path.join(mdxnet_models_dir, 'models_mdx.h5'), |
|
'output_path': output_dir, |
|
'noise_protect': 0.33, |
|
'voc_model_path': os.path.join(mdxnet_models_dir, 'models_vocal.h5') |
|
} |
|
|
|
try: |
|
orig_song_path, vocals_path, instrumentals_path, main_vocals_path, backup_vocals_path, main_vocals_dereverb_path = preprocess_song(song_input, mdx_model_params, song_id, is_webui, input_type, progress) |
|
|
|
display_progress('[~] Changing Main Vocals to Target Voice...', 0.4, is_webui, progress) |
|
pitch_shifted_main_vocals_path = pitch_shift(main_vocals_dereverb_path, pitch_change) |
|
output_vocals_path = os.path.join(song_output_dir, 'main_vocals_changed.wav') |
|
voice_change(voice_model, pitch_shifted_main_vocals_path, output_vocals_path, pitch_change, f0_method, index_rate, filter_radius, rms_mix_rate, protect, crepe_hop_length, is_webui) |
|
|
|
display_progress('[~] Adding Audio Effects...', 0.5, is_webui, progress) |
|
final_output_vocals_path = add_audio_effects(output_vocals_path, reverb_rm_size, reverb_wet, reverb_dry, reverb_damping) |
|
|
|
display_progress('[~] Merging Vocal and Instrumental Tracks...', 0.6, is_webui, progress) |
|
final_output_path = os.path.join(output_dir, f'{os.path.basename(orig_song_path)}_{voice_model}_vocal_conversion.wav') |
|
merge_audios([final_output_vocals_path, instrumentals_path], final_output_path) |
|
|
|
display_progress('[~] Done!', 1.0, is_webui, progress) |
|
except Exception as e: |
|
raise_exception(f'[!] Processing failed: {str(e)}', is_webui) |
|
finally: |
|
with suppress(FileNotFoundError): |
|
os.remove(orig_song_path) |
|
|
|
return final_output_path |
|
|
|
|
|
if __name__ == '__main__': |
|
parser = argparse.ArgumentParser(description='Process song with RVC.') |
|
parser.add_argument('--input', type=str, required=True, help='Path to the input song file or YouTube link.') |
|
parser.add_argument('--type', type=str, required=True, choices=['local', 'yt'], help='Type of input: "local" for a file, "yt" for a YouTube link.') |
|
parser.add_argument('--voice_model', type=str, required=True, help='Name of the voice model to use.') |
|
parser.add_argument('--pitch_change', type=float, required=False, default=0, help='Pitch change amount in semitones.') |
|
parser.add_argument('--f0_method', type=str, required=False, default='crepe', help='F0 method to use.') |
|
parser.add_argument('--index_rate', type=float, required=False, default=1.0, help='Index rate.') |
|
parser.add_argument('--filter_radius', type=float, required=False, default=3.0, help='Filter radius.') |
|
parser.add_argument('--rms_mix_rate', type=float, required=False, default=0.25, help='RMS mix rate.') |
|
parser.add_argument('--protect', type=float, required=False, default=0.33, help='Protection rate.') |
|
parser.add_argument('--crepe_hop_length', type=int, required=False, default=128, help='Crepe hop length.') |
|
parser.add_argument('--reverb_rm_size', type=float, required=False, default=0.3, help='Reverb room size.') |
|
parser.add_argument('--reverb_wet', type=float, required=False, default=0.25, help='Reverb wet level.') |
|
parser.add_argument('--reverb_dry', type=float, required=False, default=0.75, help='Reverb dry level.') |
|
parser.add_argument('--reverb_damping', type=float, required=False, default=0.5, help='Reverb damping.') |
|
|
|
args = parser.parse_args() |
|
|
|
process_and_save_song( |
|
song_input=args.input, |
|
input_type=args.type, |
|
voice_model=args.voice_model, |
|
pitch_change=args.pitch_change, |
|
f0_method=args.f0_method, |
|
index_rate=args.index_rate, |
|
filter_radius=args.filter_radius, |
|
rms_mix_rate=args.rms_mix_rate, |
|
protect=args.protect, |
|
crepe_hop_length=args.crepe_hop_length, |
|
reverb_rm_size=args.reverb_rm_size, |
|
reverb_wet=args.reverb_wet, |
|
reverb_dry=args.reverb_dry, |
|
reverb_damping=args.reverb_damping, |
|
progress=None |
|
) |
|
|