| | from multiprocessing import cpu_count |
| | import threading, pdb, librosa |
| | from time import sleep |
| | from subprocess import Popen |
| | from time import sleep |
| | import torch, os, traceback, sys, warnings, shutil, numpy as np |
| | import faiss |
| | from random import shuffle |
| | import scipy.io.wavfile as wavfile |
| | now_dir = os.getcwd() |
| | sys.path.append(now_dir) |
| | tmp = os.path.join(now_dir, "TEMP") |
| | shutil.rmtree(tmp, ignore_errors=True) |
| | os.makedirs(tmp, exist_ok=True) |
| | os.makedirs("audios",exist_ok=True) |
| | os.makedirs(os.path.join(now_dir, "logs"), exist_ok=True) |
| | os.makedirs(os.path.join(now_dir, "weights"), exist_ok=True) |
| | os.environ["TEMP"] = tmp |
| | warnings.filterwarnings("ignore") |
| | torch.manual_seed(114514) |
| | from i18n import I18nAuto |
| | import ffmpeg |
| |
|
| |
|
| | i18n = I18nAuto() |
| | |
| | ncpu = cpu_count() |
| | ngpu = torch.cuda.device_count() |
| | gpu_infos = [] |
| | mem = [] |
| | if (not torch.cuda.is_available()) or ngpu == 0: |
| | if_gpu_ok = False |
| | else: |
| | if_gpu_ok = False |
| | for i in range(ngpu): |
| | gpu_name = torch.cuda.get_device_name(i) |
| | if ( |
| | "10" in gpu_name |
| | or "16" in gpu_name |
| | or "20" in gpu_name |
| | or "30" in gpu_name |
| | or "40" in gpu_name |
| | or "A2" in gpu_name.upper() |
| | or "A3" in gpu_name.upper() |
| | or "A4" in gpu_name.upper() |
| | or "P4" in gpu_name.upper() |
| | or "A50" in gpu_name.upper() |
| | or "70" in gpu_name |
| | or "80" in gpu_name |
| | or "90" in gpu_name |
| | or "M4" in gpu_name.upper() |
| | or "T4" in gpu_name.upper() |
| | or "TITAN" in gpu_name.upper() |
| | ): |
| | if_gpu_ok = True |
| | gpu_infos.append("%s\t%s" % (i, gpu_name)) |
| | mem.append( |
| | int( |
| | torch.cuda.get_device_properties(i).total_memory |
| | / 1024 |
| | / 1024 |
| | / 1024 |
| | + 0.4 |
| | ) |
| | ) |
| | if if_gpu_ok == True and len(gpu_infos) > 0: |
| | gpu_info = "\n".join(gpu_infos) |
| | default_batch_size = min(mem) // 2 |
| | else: |
| | gpu_info = "很遗憾您这没有能用的显卡来支持您训练" |
| | default_batch_size = 1 |
| | gpus = "-".join([i[0] for i in gpu_infos]) |
| | from infer_pack.models import SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono |
| | from scipy.io import wavfile |
| | from fairseq import checkpoint_utils |
| | import gradio as gr |
| | import logging |
| | from vc_infer_pipeline import VC |
| | from config import ( |
| | is_half, |
| | device, |
| | python_cmd, |
| | listen_port, |
| | iscolab, |
| | noparallel, |
| | noautoopen, |
| | ) |
| | from infer_uvr5 import _audio_pre_ |
| | from my_utils import load_audio |
| | from train.process_ckpt import show_info, change_info, merge, extract_small_model |
| |
|
| | |
| | logging.getLogger("numba").setLevel(logging.WARNING) |
| |
|
| |
|
| | class ToolButton(gr.Button, gr.components.FormComponent): |
| | """Small button with single emoji as text, fits inside gradio forms""" |
| |
|
| | def __init__(self, **kwargs): |
| | super().__init__(variant="tool", **kwargs) |
| |
|
| | def get_block_name(self): |
| | return "button" |
| |
|
| |
|
| | hubert_model = None |
| |
|
| |
|
| | def load_hubert(): |
| | global hubert_model |
| | models, _, _ = checkpoint_utils.load_model_ensemble_and_task( |
| | ["hubert_base.pt"], |
| | suffix="", |
| | ) |
| | hubert_model = models[0] |
| | hubert_model = hubert_model.to(device) |
| | if is_half: |
| | hubert_model = hubert_model.half() |
| | else: |
| | hubert_model = hubert_model.float() |
| | hubert_model.eval() |
| |
|
| |
|
| | weight_root = "weights" |
| | weight_uvr5_root = "uvr5_weights" |
| | names = [] |
| | for name in os.listdir(weight_root): |
| | if name.endswith(".pth"): |
| | names.append(name) |
| | |
| | uvr5_names = [] |
| | for name in os.listdir(weight_uvr5_root): |
| | if name.endswith(".pth"): |
| | uvr5_names.append(name.replace(".pth", "")) |
| |
|
| | def find_parent(search_dir, file_name): |
| | for dirpath, dirnames, filenames in os.walk(search_dir): |
| | if file_name in filenames: |
| | return os.path.abspath(dirpath) |
| | return None |
| |
|
| | def vc_single( |
| | sid, |
| | input_audio, |
| | f0_up_key, |
| | f0_file, |
| | f0_method, |
| | file_index, |
| | |
| | index_rate, |
| | ): |
| | global tgt_sr, net_g, vc, hubert_model |
| | if input_audio is None: |
| | return "You need to upload an audio", None |
| | f0_up_key = int(f0_up_key) |
| | try: |
| | parent_dir = find_parent(".",input_audio) |
| | audio = load_audio(parent_dir+'/'+input_audio, 16000) |
| | times = [0, 0, 0] |
| | if hubert_model == None: |
| | load_hubert() |
| | if_f0 = cpt.get("f0", 1) |
| | file_index = ( |
| | file_index.strip(" ") |
| | .strip('"') |
| | .strip("\n") |
| | .strip('"') |
| | .strip(" ") |
| | .replace("trained", "added") |
| | ) |
| | |
| | |
| | |
| | audio_opt = vc.pipeline( |
| | hubert_model, |
| | net_g, |
| | sid, |
| | audio, |
| | times, |
| | f0_up_key, |
| | f0_method, |
| | file_index, |
| | |
| | index_rate, |
| | if_f0, |
| | f0_file=f0_file, |
| | ) |
| | print( |
| | "npy: ", times[0], "s, f0: ", times[1], "s, infer: ", times[2], "s", sep="" |
| | ) |
| | return "Success", (tgt_sr, audio_opt) |
| | except: |
| | info = traceback.format_exc() |
| | print(info) |
| | return info, (None, None) |
| |
|
| |
|
| | def vc_multi( |
| | sid, |
| | dir_path, |
| | opt_root, |
| | paths, |
| | f0_up_key, |
| | f0_method, |
| | file_index, |
| | |
| | index_rate, |
| | ): |
| | try: |
| | dir_path = ( |
| | dir_path.strip(" ").strip('"').strip("\n").strip('"').strip(" ") |
| | ) |
| | opt_root = opt_root.strip(" ").strip('"').strip("\n").strip('"').strip(" ") |
| | os.makedirs(opt_root, exist_ok=True) |
| | try: |
| | if dir_path != "": |
| | paths = [os.path.join(dir_path, name) for name in os.listdir(dir_path)] |
| | else: |
| | paths = [path.name for path in paths] |
| | except: |
| | traceback.print_exc() |
| | paths = [path.name for path in paths] |
| | infos = [] |
| | file_index = ( |
| | file_index.strip(" ") |
| | .strip('"') |
| | .strip("\n") |
| | .strip('"') |
| | .strip(" ") |
| | .replace("trained", "added") |
| | ) |
| | for path in paths: |
| | info, opt = vc_single( |
| | sid, |
| | path, |
| | f0_up_key, |
| | None, |
| | f0_method, |
| | file_index, |
| | |
| | index_rate, |
| | ) |
| | if info == "Success": |
| | try: |
| | tgt_sr, audio_opt = opt |
| | wavfile.write( |
| | "%s/%s" % (opt_root, os.path.basename(path)), tgt_sr, audio_opt |
| | ) |
| | except: |
| | info = traceback.format_exc() |
| | infos.append("%s->%s" % (os.path.basename(path), info)) |
| | yield "\n".join(infos) |
| | yield "\n".join(infos) |
| | except: |
| | yield traceback.format_exc() |
| |
|
| | |
| | def get_vc(sid): |
| | global n_spk, tgt_sr, net_g, vc, cpt |
| | if sid == []: |
| | global hubert_model |
| | if hubert_model != None: |
| | print("clean_empty_cache") |
| | del net_g, n_spk, vc, hubert_model, tgt_sr |
| | hubert_model = net_g = n_spk = vc = hubert_model = tgt_sr = None |
| | if torch.cuda.is_available(): |
| | torch.cuda.empty_cache() |
| | |
| | if_f0 = cpt.get("f0", 1) |
| | if if_f0 == 1: |
| | net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=is_half) |
| | else: |
| | net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) |
| | del net_g, cpt |
| | if torch.cuda.is_available(): |
| | torch.cuda.empty_cache() |
| | cpt = None |
| | return {"visible": False, "__type__": "update"} |
| | person = "%s/%s" % (weight_root, sid) |
| | print("loading %s" % person) |
| | cpt = torch.load(person, map_location="cpu") |
| | tgt_sr = cpt["config"][-1] |
| | cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] |
| | if_f0 = cpt.get("f0", 1) |
| | if if_f0 == 1: |
| | net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=is_half) |
| | else: |
| | net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) |
| | del net_g.enc_q |
| | print(net_g.load_state_dict(cpt["weight"], strict=False)) |
| | net_g.eval().to(device) |
| | if is_half: |
| | net_g = net_g.half() |
| | else: |
| | net_g = net_g.float() |
| | vc = VC(tgt_sr, device, is_half) |
| | n_spk = cpt["config"][-3] |
| | return {"visible": False, "maximum": n_spk, "__type__": "update"} |
| |
|
| |
|
| | def change_choices(): |
| | names = [] |
| | for name in os.listdir(weight_root): |
| | if name.endswith(".pth"): |
| | names.append(name) |
| | return {"choices": sorted(names), "__type__": "update"} |
| |
|
| | def change_choices2(): |
| | audio_files = [] |
| | for dirpath, dirnames, filenames in os.walk("."): |
| | for filename in filenames: |
| | if filename.endswith(('.wav', '.mp3')) and filename not in ('mute.wav', 'mute32k.wav', 'mute40k.wav', 'mute48k.wav', 'audio.wav'): |
| | if "tmp" not in filename: |
| | audio_files.append(filename) |
| | return {"choices": sorted(audio_files), "__type__": "update"} |
| |
|
| | def clean(): |
| | return {"value": "", "__type__": "update"} |
| |
|
| | def change_sr2(sr2, if_f0_3): |
| | if if_f0_3 == "是": |
| | return "pretrained/f0G%s.pth" % sr2, "pretrained/f0D%s.pth" % sr2 |
| | else: |
| | return "pretrained/G%s.pth" % sr2, "pretrained/D%s.pth" % sr2 |
| | |
| | def get_index(): |
| | if iscolab: |
| | chosen_model=sorted(names)[0].split(".")[0] |
| | logs_path="/content/Retrieval-based-Voice-Conversion-WebUI/logs/"+chosen_model |
| | for file in os.listdir(logs_path): |
| | if file.endswith(".index"): |
| | return os.path.join(logs_path, file) |
| | return '' |
| | else: |
| | return '' |
| | |
| | def get_indexes(): |
| | indexes_list=[] |
| | if iscolab: |
| | for dirpath, dirnames, filenames in os.walk("/content/Retrieval-based-Voice-Conversion-WebUI/logs/"): |
| | for filename in filenames: |
| | if filename.endswith(".index"): |
| | indexes_list.append(os.path.join(dirpath,filename)) |
| | return indexes_list |
| | else: |
| | return '' |
| | |
| | audio_files=[] |
| | for dirpath, dirnames, filenames in os.walk("."): |
| | for filename in filenames: |
| | if filename.endswith(('.wav', '.mp3')) and filename not in ('mute.wav', 'mute32k.wav', 'mute40k.wav', 'mute48k.wav'): |
| | if "tmp" not in filename: |
| | audio_files.append(filename) |
| | def audios(): |
| | audio_files = [] |
| | for dirpath, dirnames, filenames in os.walk("."): |
| | for filename in filenames: |
| | if filename.endswith(('.wav', '.mp3')) and filename not in ('mute.wav', 'mute32k.wav', 'mute40k.wav', 'mute48k.wav'): |
| | if "tmp" not in filename: |
| | audio_files.append(filename) |
| | return audio_files |
| |
|
| | def get_name(): |
| | if len(audio_files) > 0: |
| | return sorted(audio_files)[0] |
| | else: |
| | return '' |
| | |
| | def save_to_wav(record_button): |
| | shutil.move(record_button,'audios/recording.wav') |
| |
|
| | |
| | with gr.Blocks(theme=gr.themes.Base()) as app: |
| | with gr.Row(): |
| | warntext=gr.Markdown("Do not call your audio 'audio.wav' since that is used by the program to keep track of temporary files.") |
| | with gr.Row(): |
| | sid0 = gr.Dropdown(label="1.Choose your Model.", choices=sorted(names), value=sorted(names)[0]) |
| | get_vc(sorted(names)[0]) |
| | vc_transform0 = gr.Number(label="Optional: You can change the pitch here or leave it at 0.", value=0) |
| | |
| | |
| | |
| | spk_item = gr.Slider(minimum=0,maximum=2333,step=1,label="Please select speaker id",value=0,visible=False,interactive=True) |
| | |
| | sid0.change( |
| | fn=get_vc, |
| | inputs=[sid0], |
| | outputs=[], |
| | ) |
| | but0 = gr.Button("Convert", variant="primary") |
| | with gr.Row(): |
| | with gr.Column(): |
| | with gr.Row(): |
| | dropbox = gr.File(label="Drop your audio here & hit the Reload button.") |
| | with gr.Row(): |
| | record_button=gr.Audio(source="microphone", label="OR Record audio.", type="filepath") |
| | with gr.Row(): |
| | |
| | input_audio0 = gr.Dropdown(choices=sorted(audio_files), label="2.Choose your audio.", value=get_name()) |
| | dropbox.upload(fn=change_choices2, inputs=[], outputs=[input_audio0]) |
| | refresh_button2 = gr.Button("Reload Audios", variant="primary") |
| | refresh_button2.click(fn=change_choices2, inputs=[], outputs=[input_audio0]) |
| | record_button.change(fn=save_to_wav, inputs=[record_button], outputs=[]) |
| | with gr.Column(): |
| | file_index1 = gr.Dropdown( |
| | label="3. Path to your added.index file (if it didn't automatically find it.)", |
| | value=get_index(), |
| | choices=get_indexes(), |
| | interactive=True, |
| | ) |
| | index_rate1 = gr.Slider( |
| | minimum=0, |
| | maximum=1, |
| | label="Strength:", |
| | value=0.69, |
| | interactive=True, |
| | ) |
| | with gr.Row(): |
| | vc_output2 = gr.Audio(label="Output Audio (Click on the Three Dots in the Right Corner to Download)") |
| | with gr.Row(): |
| | f0method0 = gr.Radio( |
| | label="Optional: Change the Pitch Extraction Algorithm. Use PM for fast results or Harvest for better low range (but it's extremely slow)", |
| | choices=["pm", "harvest"], |
| | value="pm", |
| | interactive=True, |
| | ) |
| | with gr.Row(): |
| | vc_output1 = gr.Textbox(label="") |
| | with gr.Row(): |
| | instructions = gr.Markdown(""" |
| | This is simply a modified version of the RVC GUI found here: |
| | https://github.com/RVC-Project/Retrieval-based-Voice-Conversion-WebUI |
| | """) |
| | f0_file = gr.File(label="F0 Curve File (Optional, One Pitch Per Line, Replaces Default F0 and Pitch Shift)", visible=False) |
| | but0.click( |
| | vc_single, |
| | [ |
| | spk_item, |
| | input_audio0, |
| | vc_transform0, |
| | f0_file, |
| | f0method0, |
| | file_index1, |
| | index_rate1, |
| | ], |
| | [vc_output1, vc_output2] |
| | ) |
| | if iscolab: |
| | app.queue().launch(share=True) |
| | else: |
| | app.queue(concurrency_count=511, max_size=1022).launch( |
| | server_name="0.0.0.0", |
| | inbrowser=not noautoopen, |
| | server_port=listen_port, |
| | quiet=True, |
| | ) |
| |
|