File size: 4,278 Bytes
ccbe063
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Command-line interface: `sf-cluster build ...`."""
from __future__ import annotations

import argparse
import json
import sys
from pathlib import Path

import numpy as np

from . import __version__
from .methods import N_SUBSETS, TARGET_SIZE, build_subsets


def _add_build_parser(sub: argparse._SubParsersAction) -> None:
    p = sub.add_parser(
        "build",
        help="Build N MSA subsets from a filtered A3M + per-residue FI matrix.",
    )
    p.add_argument("--a3m", required=True, type=Path,
                   help="path to filtered A3M file")
    p.add_argument("--fi", required=True, type=Path,
                   help="path to per-residue FI matrix .npy (N_seq, L)")
    p.add_argument("--method", required=True, choices=["mosaic", "gradient"],
                   help="subset construction method")
    p.add_argument("--n-subsets", type=int, default=N_SUBSETS,
                   help=f"number of subsets (default {N_SUBSETS})")
    p.add_argument("--subset-size", type=int, default=TARGET_SIZE,
                   help=f"sequences per subset (default {TARGET_SIZE})")
    p.add_argument("--hv-percentile", type=float, default=80.0,
                   help="column-variance percentile for HV mask (default 80)")
    p.add_argument("--seed", type=int, default=20260422,
                   help="global RNG seed tag (recorded in sidecar; "
                        "per-subset seeds are method-deterministic)")
    p.add_argument("--query-index", type=int, default=0,
                   help="index of query in the A3M pool (default 0)")
    p.add_argument("--out", required=True, type=Path,
                   help="output directory for subset A3Ms")
    p.set_defaults(func=_cmd_build)


def _cmd_build(args: argparse.Namespace) -> int:
    if not args.a3m.exists():
        print(f"error: A3M not found: {args.a3m}", file=sys.stderr)
        return 2
    if not args.fi.exists():
        print(f"error: FI matrix not found: {args.fi}", file=sys.stderr)
        return 2

    args.out.mkdir(parents=True, exist_ok=True)

    pool, score, subsets, paths = build_subsets(
        a3m_path=args.a3m,
        fi_npy_path=args.fi,
        method=args.method,
        n_subsets=args.n_subsets,
        subset_size=args.subset_size,
        hv_percentile=args.hv_percentile,
        out_dir=args.out,
        query_index=args.query_index,
    )

    # Sidecar: subset index TSV
    idx_tsv = args.out / f"{args.method}_subset_index.tsv"
    with open(idx_tsv, "w") as fh:
        fh.write("subset_id\tseq_index\tpool_index\theader\tcontrast_hvlv\n")
        for s_i, idx_list in enumerate(subsets):
            for j, p_i in enumerate(idx_list):
                fh.write(f"{s_i:03d}\t{j}\t{p_i}\t{pool.headers[p_i]}\t"
                         f"{score[p_i]:.6f}\n")

    # Sidecar: provenance JSON
    meta = {
        "sf_cluster_version": __version__,
        "method": args.method,
        "a3m": str(args.a3m.resolve()),
        "fi_matrix": str(args.fi.resolve()),
        "n_subsets": args.n_subsets,
        "subset_size": args.subset_size,
        "hv_percentile": args.hv_percentile,
        "pool_size": pool.n_seq,
        "n_cols": pool.n_cols,
        "seed_tag": args.seed,
        "query_header": pool.headers[args.query_index],
        "score_stats": {
            "min": float(np.min(score)),
            "max": float(np.max(score)),
            "mean": float(np.mean(score)),
            "std": float(np.std(score)),
        },
    }
    (args.out / f"{args.method}_meta.json").write_text(json.dumps(meta, indent=2))

    print(f"[sf-cluster] method={args.method}  pool={pool.n_seq}  "
          f"wrote {len(paths)} A3Ms to {args.out}")
    return 0


def build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        prog="sf-cluster",
        description="Frustration-guided MSA subset builders "
                    "(mosaic + gradient).",
    )
    p.add_argument("--version", action="version",
                   version=f"sf-cluster {__version__}")
    sub = p.add_subparsers(dest="command", required=True)
    _add_build_parser(sub)
    return p


def main(argv=None) -> int:
    parser = build_parser()
    args = parser.parse_args(argv)
    return args.func(args)


if __name__ == "__main__":
    sys.exit(main())