File size: 8,840 Bytes
a9536c4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | # -*- coding: utf-8 -*-
"""Run real AI cover processing modes and write a JSON report."""
from __future__ import annotations
import argparse
import json
import platform
import sys
import time
import traceback
from pathlib import Path
from typing import Any, Dict, Iterable, Optional
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
CASES: tuple[dict[str, Any], ...] = (
dict(name="current_roformer_auto_source_auto", separator="roformer", vc_preprocess_mode="auto", source_constraint_mode="auto", vc_pipeline_mode="current", karaoke_separation=False, singing_repair=False),
dict(name="current_roformer_uvr_deecho_source_on", separator="roformer", vc_preprocess_mode="uvr_deecho", source_constraint_mode="on", vc_pipeline_mode="current", karaoke_separation=False, singing_repair=False),
dict(name="current_roformer_karaoke_source_off", separator="roformer", vc_preprocess_mode="auto", source_constraint_mode="off", vc_pipeline_mode="current", karaoke_separation=True, singing_repair=False),
dict(name="current_demucs_auto_source_auto", separator="demucs", vc_preprocess_mode="auto", source_constraint_mode="auto", vc_pipeline_mode="current", karaoke_separation=False, singing_repair=False),
dict(name="current_uvr5_auto_source_auto", separator="uvr5", vc_preprocess_mode="auto", source_constraint_mode="auto", vc_pipeline_mode="current", karaoke_separation=False, singing_repair=False),
dict(name="official_uvr5_one_to_one", separator="uvr5", vc_preprocess_mode="auto", source_constraint_mode="auto", vc_pipeline_mode="official", karaoke_separation=False, singing_repair=False),
dict(name="official_uvr5_singing_repair", separator="uvr5", vc_preprocess_mode="auto", source_constraint_mode="auto", vc_pipeline_mode="official", karaoke_separation=False, singing_repair=True),
)
def _require_file(path: Path, label: str) -> Path:
resolved = path.expanduser().resolve()
if not resolved.exists():
raise FileNotFoundError(f"{label} does not exist: {resolved}")
if resolved.stat().st_size <= 0:
raise RuntimeError(f"{label} is empty: {resolved}")
return resolved
def _default_index(model_path: Path) -> Optional[Path]:
direct = model_path.with_suffix(".index")
if direct.exists():
return direct
indexes = sorted(model_path.parent.glob("*.index"))
return indexes[0] if len(indexes) == 1 else None
def _device(require_cuda: bool) -> str:
import torch
if torch.cuda.is_available():
return "cuda"
if require_cuda:
raise RuntimeError("CUDA is required, but torch.cuda.is_available() is False.")
return "cpu"
def _validate_outputs(result: Dict[str, str], keys: Iterable[str]) -> Dict[str, Dict[str, Any]]:
files: Dict[str, Dict[str, Any]] = {}
for key in keys:
value = result.get(key)
if not value:
raise RuntimeError(f"Pipeline result missing key: {key}")
path = Path(value)
if not path.exists():
raise FileNotFoundError(f"Pipeline output for {key} does not exist: {path}")
size = path.stat().st_size
if size <= 0:
raise RuntimeError(f"Pipeline output for {key} is empty: {path}")
files[key] = {"path": str(path), "bytes": size}
return files
def _run_case(case: Dict[str, Any], input_audio: Path, model_path: Path, index_path: Optional[Path], output_root: Path, device: str) -> Dict[str, Any]:
from infer.cover_pipeline import CoverPipeline
started = time.time()
case_output = output_root / case["name"]
case_output.mkdir(parents=True, exist_ok=True)
pipeline = CoverPipeline(device=device)
try:
result = pipeline.process(
input_audio=str(input_audio),
model_path=str(model_path),
index_path=str(index_path) if index_path else None,
pitch_shift=0,
index_ratio=0.5,
filter_radius=3,
rms_mix_rate=0.0,
protect=0.33,
speaker_id=0,
f0_method="rmvpe",
demucs_model="htdemucs_ft",
demucs_shifts=1,
demucs_overlap=0.25,
demucs_split=True,
roformer_model="ensemble:vocal_rvc",
separator=case["separator"],
uvr5_model="HP2_all_vocals",
uvr5_agg=10,
uvr5_format="wav",
use_official=True,
hubert_layer=12,
vocals_volume=1.0,
accompaniment_volume=1.0,
reverb_amount=0.0,
backing_mix=0.0,
karaoke_separation=bool(case["karaoke_separation"]),
karaoke_model="ensemble:karaoke",
karaoke_merge_backing_into_accompaniment=True,
vc_preprocess_mode=case["vc_preprocess_mode"],
source_constraint_mode=case["source_constraint_mode"],
vc_pipeline_mode=case["vc_pipeline_mode"],
singing_repair=bool(case["singing_repair"]),
output_dir=str(case_output),
model_display_name="matrix",
)
keys = ["cover", "vocals", "converted_vocals", "accompaniment", "all_files_dir"]
if case["karaoke_separation"]:
keys.extend(["lead_vocals", "backing_vocals"])
return {
"name": case["name"],
"status": "pass",
"seconds": round(time.time() - started, 3),
"parameters": case,
"result": result,
"files": _validate_outputs(result, keys),
}
except Exception as exc:
return {
"name": case["name"],
"status": "fail",
"seconds": round(time.time() - started, 3),
"parameters": case,
"error": str(exc),
"traceback": traceback.format_exc(),
}
finally:
if pipeline.separator is not None:
pipeline.separator.unload_model()
if pipeline.karaoke_separator is not None:
pipeline.karaoke_separator.unload_model()
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True)
parser.add_argument("--model-path", default=str(REPO_ROOT / "assets" / "weights" / "characters" / "rin" / "Rin.pth"))
parser.add_argument("--index-path")
parser.add_argument("--output-dir", default=str(REPO_ROOT / "outputs" / "mode_matrix"))
parser.add_argument("--require-cuda", action="store_true")
parser.add_argument("--case", action="append", choices=[case["name"] for case in CASES])
args = parser.parse_args()
input_audio = _require_file(Path(args.input), "Input audio")
model_path = _require_file(Path(args.model_path), "RVC model")
index_path = _require_file(Path(args.index_path), "RVC index") if args.index_path else _default_index(model_path)
device = _device(args.require_cuda)
stamp = time.strftime("%Y%m%d_%H%M%S")
output_root = Path(args.output_dir).expanduser().resolve() / f"{platform.system().lower()}_{stamp}"
output_root.mkdir(parents=True, exist_ok=True)
selected = set(args.case or [])
cases = [case for case in CASES if not selected or case["name"] in selected]
report = {
"platform": {"system": platform.system(), "release": platform.release(), "python": platform.python_version()},
"device": device,
"input_audio": str(input_audio),
"model_path": str(model_path),
"index_path": str(index_path) if index_path else None,
"output_root": str(output_root),
"cases": [],
}
report_path = output_root / "mode_matrix_results.json"
for case in cases:
print(f"\n=== {case['name']} ===", flush=True)
record = _run_case(case, input_audio, model_path, index_path, output_root, device)
print(f"{case['name']}: {record['status']} ({record['seconds']}s)", flush=True)
if record["status"] == "fail":
print(record["error"], flush=True)
report["cases"].append(record)
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
failed = [case["name"] for case in report["cases"] if case["status"] != "pass"]
report["summary"] = {"total": len(cases), "passed": len(cases) - len(failed), "failed": len(failed), "failed_cases": failed}
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\nReport: {report_path}", flush=True)
print(json.dumps(report["summary"], ensure_ascii=False, indent=2), flush=True)
return 1 if failed else 0
if __name__ == "__main__":
raise SystemExit(main())
|