#!/usr/bin/env python3 """ TopoDevPOC_n39.py Topologically Unique Developing Point of Control Patterns Pre-market K-Lines, n = 39 three-minute candlesticks Paper : TopoDevPOC.tex (ConQ Research Team, Continual Quasars) Compute: Vectorized NumPy + GPU Torch (T4) + Warp branchless ops Architectural patterns from core_engine_v11.py (Hyper-Warp Edition) Outputs (CLI): 1. Total combination count for n=39 2. Matrix state-transition validation 3. 100 random ternary matrices (1×38 each) 4. 100 random symbolic sequences (length-38 strings) 5. 100 developing_poc charts saved as PNG + ASCII CLI preview 6. CSV export — bit-packed uint64 pattern_id (39-bit encoding, min bytes) 7. Binary export — raw .npy uint64 array (fastest machine read) COMPRESSION SCHEME: Each pattern encodes into a single uint64 (8 bytes): bit 0 : direction (0=Bullish, 1=Bearish) bits 1..38 : transitions (1=strict, 0=equality) Decode: direction = id & 1; trans[k] = (id >> (k+1)) & 1 Full matrix + direction reconstructed from one integer in O(1). Entire 2^39 space = integers [0, 2^39). No file required for enumeration. Run on Google Colab T4: !python TopoDevPOC_n39.py """ # ── stdlib ──────────────────────────────────────────────────────────────────── import os, sys, time, math, csv, struct import numpy as np # ── matplotlib (non-interactive for Colab CLI) ──────────────────────────────── import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import matplotlib.ticker as mticker # ── GPU setup (T4 Colab) ───────────────────────────────────────────────────── try: import torch HAS_CUDA = torch.cuda.is_available() DEVICE = 'cuda' if HAS_CUDA else 'cpu' except ImportError: HAS_CUDA = False DEVICE = 'cpu' torch = None # Optional Warp (core_engine_v11.py pattern) ───────────────────────────────── HAS_WARP = False try: import warp as wp wp.init() wp.set_module_options({"enable_backward": False, "fast_math": True, "max_unroll": 8}) HAS_WARP = bool(wp.get_cuda_devices()) except Exception: pass # ───────────────────────────────────────────────────────────────────────────── # CONSTANTS (tex Section II) # ───────────────────────────────────────────────────────────────────────────── N = 39 # pre-market 3-min candles (C_0 … C_{-(n-1)}) N_TRANS = N - 1 # 38 adjacent-pair relations N_SAMPLES = 100 SEED = int(time.time() * 1000) & 0x7FFFFFFF SEP = "=" * 74 # ───────────────────────────────────────────────────────────────────────────── # 0. HEADER # ───────────────────────────────────────────────────────────────────────────── print(SEP) print(" TopoDevPOC — Developing POC Pattern Enumerator") print(f" n = {N} candles | {N_TRANS} transitions | device = {DEVICE}") if HAS_CUDA and torch: print(f" GPU = {torch.cuda.get_device_name(0)}") if HAS_WARP: print(f" Warp = enabled (branchless kernel path)") print(SEP) # ───────────────────────────────────────────────────────────────────────────── # SECTION 1 — COMBINATORIAL ENUMERATION (tex Theorem, Sec. III) # # Bullish: each of (n-1) transitions ∈ {>, =} → 2^(n-1) patterns # Bearish: each of (n-1) transitions ∈ {<, =} → 2^(n-1) patterns # Total : 2^(n-1) + 2^(n-1) = 2^n (disjoint families) # n=39 : 2^39 = 549,755,813,888 # ───────────────────────────────────────────────────────────────────────────── TOTAL = 1 << N # exact Python int (arbitrary precision) HALF = 1 << (N - 1) print(f"\n[THEOREM] Total unique developing POC patterns for n={N}") print(f" Bullish (non-increasing) : 2^{N-1} = {HALF:,}") print(f" Bearish (non-decreasing) : 2^{N-1} = {HALF:,}") print(f" Total (2^{N}) : {TOTAL:,}") # ───────────────────────────────────────────────────────────────────────────── # SECTION 2 — MATRIX STATE-TRANSITION VALIDATION (tex Sec. IV) # # States : S_0 (equality), S_± (strict move) # A = [[1,1],[1,1]] (fully-connected 2-state digraph) # v_0 = [1,1]^T (both states reachable initially) # # B_n = 1^T · A^(n-2) · v_0 # = 2^(n-3) · 1^T · A · 1 (using A^k = 2^(k-1)·A for k≥1) # = 2^(n-3) · 4 = 2^(n-1) per direction # # Implementation: exact Python-int matrix exponentiation (no float rounding) # ───────────────────────────────────────────────────────────────────────────── def _mm2(A, B): """Exact 2×2 matrix multiply with Python arbitrary-precision ints.""" return [ [A[0][0]*B[0][0] + A[0][1]*B[1][0], A[0][0]*B[0][1] + A[0][1]*B[1][1]], [A[1][0]*B[0][0] + A[1][1]*B[1][0], A[1][0]*B[0][1] + A[1][1]*B[1][1]], ] def _mpow2(M, k): """Fast 2×2 matrix power, exact ints, O(log k).""" if k == 0: return [[1,0],[0,1]] if k == 1: return M h = _mpow2(M, k >> 1) s = _mm2(h, h) return s if (k & 1) == 0 else _mm2(s, M) A_mat = [[1,1],[1,1]] v0 = [1, 1] A_pow = _mpow2(A_mat, N - 2) # 1^T · A^(n-2) · v0 Av0_0 = A_pow[0][0]*v0[0] + A_pow[0][1]*v0[1] Av0_1 = A_pow[1][0]*v0[0] + A_pow[1][1]*v0[1] B_n = Av0_0 + Av0_1 # = 1^T · (A^(n-2)·v0) print(f"\n[MATRIX] A = [[1,1],[1,1]] | v_0 = [1,1]^T") print(f" A^{N-2} = [[{A_pow[0][0]}, {A_pow[0][1]}],") print(f" [{A_pow[1][0]}, {A_pow[1][1]}]]") print(f" B_{N} = 1^T · A^{N-2} · v_0 = {B_n:,}") print(f" Expected 2^{{n-1}} = {HALF:,}") assert B_n == HALF, f"Matrix B_n mismatch: {B_n} ≠ {HALF}" assert 2 * B_n == TOTAL, f"Total mismatch: {2*B_n} ≠ {TOTAL}" print(f" [OK] 2 × {B_n:,} = {TOTAL:,} ✓") # ───────────────────────────────────────────────────────────────────────────── # SECTION 3 — PATTERN ENCODING SCHEME # # Each of the 2^39 patterns maps bijectively to a 39-bit integer: # bit 0 : direction (0 = Bullish, 1 = Bearish) # bits 1 … N-1 : transitions (1 = strict move, 0 = equality) # # pattern_id ∈ [0, 2^39) uniquely identifies every valid pattern. # # Ternary matrix m ∈ {+1,0}^(n-1) for bullish, # m ∈ {-1,0}^(n-1) for bearish. # Bijection: m_k = sign(direction) × trans_bit_k # ───────────────────────────────────────────────────────────────────────────── # ───────────────────────────────────────────────────────────────────────────── # SECTION 4 — GPU-ACCELERATED RANDOM SAMPLING # Generate (N_SAMPLES × N) binary matrix at once on T4 GPU. # Inspired by core_engine_v11.py XOR-shift PRNG kernel design. # bits[:,0] = direction flags # bits[:,1:] = N_TRANS transition flags per pattern # ───────────────────────────────────────────────────────────────────────────── t_sample = time.perf_counter() if HAS_WARP: # ── Warp branchless path (core_engine_v11.py style) ───────────────────── # Launch one thread per sample; XOR-shift fills N bits per thread. _seed_wp = SEED @wp.kernel def k_rng_bits(seed: int, N_cols: int, out: wp.array(dtype=wp.int8)): tid = wp.tid() rng = wp.uint32(seed) ^ wp.uint32(tid) if rng == wp.uint32(0): rng = wp.uint32(123456789) base = tid * N_cols for col in range(N_cols): rng = rng ^ (rng << wp.uint32(13)) rng = rng ^ (rng >> wp.uint32(17)) rng = rng ^ (rng << wp.uint32(5)) out[base + col] = wp.int8(int(rng) & 1) out_wp = wp.zeros(N_SAMPLES * N, dtype=wp.int8, device='cuda') wp.launch(k_rng_bits, dim=N_SAMPLES, block_dim=128, inputs=[_seed_wp, N, out_wp], device='cuda') wp.synchronize() bits_np = out_wp.numpy().reshape(N_SAMPLES, N) print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled via Warp XOR-shift kernel") elif HAS_CUDA and torch is not None: # ── Torch GPU path ─────────────────────────────────────────────────────── gen = torch.Generator(device='cuda') gen.manual_seed(SEED) bits_t = torch.randint(0, 2, (N_SAMPLES, N), device='cuda', generator=gen, dtype=torch.int8) bits_np = bits_t.cpu().numpy() print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled on GPU (torch.randint)") else: # ── CPU NumPy fallback ──────────────────────────────────────────────────── rng_cpu = np.random.default_rng(SEED) bits_np = rng_cpu.integers(0, 2, size=(N_SAMPLES, N), dtype=np.int8) print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled on CPU (NumPy)") sample_ms = (time.perf_counter() - t_sample) * 1e3 print(f" Sampling time: {sample_ms:.2f} ms") # ───────────────────────────────────────────────────────────────────────────── # SECTION 5 — VECTORISED DECODING (NumPy, no Python loops over patterns) # # bits[:,0] → direction array (0=Bullish, 1=Bearish), shape (100,) # bits[:,1:] → trans array, shape (100, 38) # ternary_mat: +trans if Bullish, -trans if Bearish → shape (100, 38) # sign: Bullish→+1, Bearish→-1, broadcast multiply # ───────────────────────────────────────────────────────────────────────────── dir_bits = bits_np[:, 0].astype(np.int8) # 0 or 1 trans = bits_np[:, 1:].astype(np.int8) # (100,38), values 0 or 1 signs = 1 - 2 * dir_bits # 0→+1 (bull), 1→-1 (bear) ternary = (signs[:, None] * trans).astype(np.int8) # (100,38): +1/0/-1 # Compact 39-bit pattern IDs pows64 = (np.uint64(1) << np.arange(N, dtype=np.uint64)) pat_ids = (bits_np.astype(np.uint64) * pows64[None, :]).sum(axis=1) # Symbolic sequences: vectorised char lookup # Bullish (sign=+1): trans=1 → '>', trans=0 → '=' # Bearish (sign=-1): trans=1 → '<', trans=0 → '=' SYM_BULL = np.array(['=', '>'], dtype='15d} | M={row_str}") # ───────────────────────────────────────────────────────────────────────────── # OUTPUT 2 — SYMBOLIC SEQUENCES (length 38) # ───────────────────────────────────────────────────────────────────────────── print(f"\n{SEP}") print(f" OUTPUT 2 — SYMBOLIC SEQUENCES (Σ, length {N_TRANS})") print(f" Format: [#] Direction | PatternID | Σ = σ_0 σ_1 … σ_37") print(SEP) for i in range(N_SAMPLES): d_label = "Bullish" if dir_bits[i] == 0 else "Bearish" pid = int(pat_ids[i]) seq_str = ' '.join(sym_matrix[i]) print(f" [{i+1:3d}] {d_label:7s} | ID={pid:>15d} | Σ = {seq_str}") # ───────────────────────────────────────────────────────────────────────────── # OUTPUT 3 — CHARTS # (a) Full matplotlib figure: 10×10 grid, saved to PNG # (b) ASCII CLI preview for first 10 patterns # ───────────────────────────────────────────────────────────────────────────── print(f"\n{SEP}") print(f" OUTPUT 3 — DEVELOPING POC CHARTS (100 patterns)") print(SEP) # ── x-axis: position 0=oldest C_{-(N-1)}, N-1=newest C_0 ────────────────── x_pos = np.arange(N, dtype=np.float32) x_tick_pos = [0, 9, 19, 29, N-1] x_tick_lbl = [f'C_{{-{N-1}}}', f'C_{{-29}}', f'C_{{-19}}', f'C_{{-9}}', 'C_0'] ROWS, COLS = 10, 10 fig = plt.figure(figsize=(COLS * 3.2, ROWS * 2.0)) fig.suptitle( f"TopoDevPOC — 100 Random Developing POC Patterns " f"n={N} pre-market 3-min K-lines\n" f"Total pattern space: 2^{N} = {TOTAL:,} " f"(Bullish: {HALF:,} | Bearish: {HALF:,})", fontsize=10, y=1.005 ) for i in range(N_SAMPLES): ax = fig.add_subplot(ROWS, COLS, i + 1) poc = poc_disp[i] is_bull = (dir_bits[i] == 0) color = '#1a6eb5' if is_bull else '#c0392b' label = 'B↑' if is_bull else 'B↓' # Background shade ax.set_facecolor('#f7f9fc' if is_bull else '#fdf4f4') # POC line ax.plot(x_pos, poc, color=color, linewidth=1.2, zorder=3) # Mark strict-move positions (non-zero ternary in display coords) # Transition k corresponds to display segment [N-2-k, N-1-k] # Highlight the newer-side node at display index N-1-k = N-1-k strict_k = np.where(ternary[i] != 0)[0] # transition indices strict_disp = (N - 1 - strict_k).astype(int) # display x-positions if strict_disp.size > 0: ax.scatter(strict_disp, poc[strict_disp], color=color, s=5, zorder=5, linewidths=0) # Flat segments (equality) flat_k = np.where(ternary[i] == 0)[0] flat_disp = (N - 1 - flat_k).astype(int) if flat_disp.size > 0: ax.scatter(flat_disp, poc[flat_disp], color='gray', s=3, zorder=4, linewidths=0, alpha=0.5) n_strict = int(np.abs(ternary[i]).sum()) pid_short = int(pat_ids[i]) % 10**9 # last 9 digits for readability ax.set_title(f"#{i+1} {label} mv={n_strict} …{pid_short:09d}", fontsize=5.5, pad=2, color=color) ax.set_xlim(-0.5, N - 0.5) ax.set_xticks(x_tick_pos) ax.set_xticklabels(['←old', '', '', '', 'new→'], fontsize=3.5) ax.tick_params(axis='y', labelsize=3.5) ax.yaxis.set_major_locator(mticker.MaxNLocator(4)) for sp in ('top', 'right'): ax.spines[sp].set_visible(False) ax.spines['left'].set_color(color) ax.spines['left'].set_linewidth(1.5) ax.spines['bottom'].set_color('#cccccc') plt.tight_layout(rect=[0, 0, 1, 1]) chart_path = "TopoDevPOC_n39_100samples.png" plt.savefig(chart_path, dpi=110, bbox_inches='tight') plt.close(fig) print(f" [SAVED] {chart_path}") # ── ASCII CLI chart for first 10 patterns ──────────────────────────────────── H = 7 # chart height in rows W = 39 # chart width = N print(f"\n ASCII CLI Charts — first 10 samples (right side = C_0 = newest)\n") for i in range(10): poc = poc_disp[i] d_lbl = "Bullish" if dir_bits[i] == 0 else "Bearish" pid = int(pat_ids[i]) n_mv = int(np.abs(ternary[i]).sum()) pmin, pmax = poc.min(), poc.max() span = pmax - pmin if pmax != pmin else 1.0 # Map each x-position to a row rows = (H - 1 - ((poc - pmin) / span * (H - 1))).round().astype(int) rows = np.clip(rows, 0, H - 1) grid = [[' '] * W for _ in range(H)] for j in range(W): r = rows[j] # Strict-move node in the pair ending at display j: # transition index k = N-1-j (if j < N-1) is_strict = (j < N - 1) and (ternary[i, N - 2 - j] != 0) grid[r][j] = '●' if is_strict else '·' # Connect with horizontal dash where poc is flat for row_idx in range(H): line = grid[row_idx] for j in range(1, W): if line[j] == ' ' and rows[j] == row_idx: line[j] = '-' print(f" [{i+1:2d}] {d_lbl:7s} | ID={pid} | strict_moves={n_mv}/{N_TRANS}") poc_hi = poc[N-1]; poc_lo = poc[0] print(f" POC range: oldest={poc_lo:.0f} → newest={poc_hi:.0f}") print(f" ┌{'─'*W}┐") for row_idx in range(H): print(f" │{''.join(grid[row_idx])}│") print(f" └{'─'*W}┘") sym_preview = ' '.join(sym_matrix[i, :12]) + ' …' print(f" Σ (first 12): {sym_preview}\n") # ───────────────────────────────────────────────────────────────────────────── # OUTPUT 4 — COMPRESSED CSV EXPORT (bit-packed uint64 pattern_id) # # COMPRESSION LOGIC: # Naive CSV text per row (direction label + 38 integers): ~110 bytes/row # Compressed: single uint64 hex string per row ~ 18 bytes/row # Reduction: ~6× on text CSV; binary .npy = 8 bytes/row (machine-optimal) # # ENCODING (39-bit integer into uint64): # pattern_id = dir_bit | (trans[0]<<1) | (trans[1]<<2) | … | (trans[37]<<38) # bit 0 = direction (0=Bullish, 1=Bearish) # bits 1..38 = transition flags # # DECODE (one-liner, O(1)): # direction = "Bullish" if (pid & 1) == 0 else "Bearish" # trans[k] = (pid >> (k+1)) & 1 for k in 0..37 # ternary[k] = trans[k] if Bullish else -trans[k] # # NOTE ON "ALL PATTERNS": # 2^39 = 549,755,813,888 patterns. At 8 bytes/pattern → 4.4 TB. # At minimum 39 bits/pattern (bit-packed) → 2.68 TB. # The pattern space IS [0, 2^39). Pattern k = integer k. No file needed. # Use the streaming generator below for on-demand enumeration. # ───────────────────────────────────────────────────────────────────────────── print(f"\n{SEP}") print(f" OUTPUT 4 — COMPRESSED EXPORT") print(SEP) # ── Verify encoding round-trip before writing ───────────────────────────── def encode_pattern_id(dir_bit, trans_bits): """Pack direction + 38 transition bits into one uint64.""" pid = np.uint64(dir_bit) for k, t in enumerate(trans_bits): pid |= (np.uint64(int(t)) << np.uint64(k + 1)) return int(pid) def decode_pattern_id(pid, n_trans=38): """Unpack uint64 → direction + ternary matrix. O(1) per pattern.""" pid = int(pid) dir_bit = pid & 1 direction = "Bullish" if dir_bit == 0 else "Bearish" sign = 1 if dir_bit == 0 else -1 trans = np.array([(pid >> (k + 1)) & 1 for k in range(n_trans)], dtype=np.int8) ternary = (sign * trans).astype(np.int8) return direction, trans, ternary # Round-trip verification on all 100 samples for i in range(N_SAMPLES): pid_ref = int(pat_ids[i]) pid_enc = encode_pattern_id(int(dir_bits[i]), trans[i]) assert pid_ref == pid_enc, f"Encode mismatch at sample {i}" d2, t2, m2 = decode_pattern_id(pid_enc) assert (d2 == ("Bullish" if dir_bits[i] == 0 else "Bearish")), "Direction mismatch" assert np.array_equal(m2, ternary[i]), f"Ternary mismatch at sample {i}" print(" [OK] 100-sample encode/decode round-trip verified") # ── CSV: bit-packed uint64 — one integer per row ────────────────────────── # Columns: seq_id (1-based), pattern_id_uint64 (hex), direction, n_strict # Total row bytes: ~50 chars vs ~110 for expanded matrix text CSV_PATH = "TopoDevPOC_n39_samples.csv" with open(CSV_PATH, 'w', newline='') as f: writer = csv.writer(f) # Header — human-readable but compact writer.writerow([ "seq_id", # 1-based sample index "pattern_id_uint64",# hex string — encodes everything, 18 chars "pattern_id_dec", # decimal for inspection "direction", # B+ or B- (1 char each saves space vs full label) "n_strict_moves", # popcount of transitions ]) for i in range(N_SAMPLES): pid = int(pat_ids[i]) d_char = "B+" if dir_bits[i] == 0 else "B-" n_mv = int(np.abs(ternary[i]).sum()) writer.writerow([ i + 1, f"0x{pid:016X}", # 18-char hex — decode with int(x, 16) pid, d_char, n_mv, ]) csv_bytes = os.path.getsize(CSV_PATH) print(f" [CSV] {CSV_PATH} → {csv_bytes:,} bytes ({csv_bytes/N_SAMPLES:.1f} bytes/row)") # ── Binary .npy — uint64 array, 8 bytes/row, machine-optimal ───────────── NPY_PATH = "TopoDevPOC_n39_samples.npy" np.save(NPY_PATH, pat_ids.astype(np.uint64)) npy_bytes = os.path.getsize(NPY_PATH) print(f" [NPY] {NPY_PATH} → {npy_bytes:,} bytes ({(npy_bytes-128)/N_SAMPLES:.1f} bytes/row payload)") # ── Raw bit-packed .bin — 39 bits/pattern → ceil(39/8)=5 bytes each ────── # Pack 8 patterns × 39 bits = 312 bits = 39 bytes per block (tight packing) # Simple implementation: pack pattern_ids as 5-byte little-endian chunks BIN_PATH = "TopoDevPOC_n39_samples.bin" BYTES_PER = 5 # ceil(39 / 8) = 5 bytes holds one 39-bit pattern_id with open(BIN_PATH, 'wb') as f: # 8-byte magic + 4-byte n_patterns + 1-byte bits_per_pattern f.write(b'TPOC3900') f.write(struct.pack('> (j + 1)) & 1) for j in range(n - 1)], dtype=np.int8 ) yield k, direction, ternary_k # Show the first 4 and last 4 pattern_ids to confirm ordering print() print(f" STREAMING GENERATOR — first 4 / last 4 of all {TOTAL:,} patterns:") gen = stream_all_patterns(N) for _ in range(4): pid_s, d_s, _ = next(gen) print(f" pattern_id={pid_s:>3d} direction={d_s}") print(f" ... ({TOTAL - 8:,} patterns omitted) ...") # Last 4: decode directly from known IDs for pid_s in range(TOTAL - 4, TOTAL): d_s = "Bullish" if (pid_s & 1) == 0 else "Bearish" print(f" pattern_id={pid_s} direction={d_s}") # ───────────────────────────────────────────────────────────────────────────── # FINAL SUMMARY # ───────────────────────────────────────────────────────────────────────────── total_ms = (time.perf_counter() - t_sample) * 1e3 print(SEP) print(" SUMMARY") print(SEP) print(f" n (candles) = {N}") print(f" Transitions per pattern = {N_TRANS}") print(f" Total patterns [2^{N}] = {TOTAL:,}") print(f" Bullish [2^{N-1}] = {HALF:,}") print(f" Bearish [2^{N-1}] = {HALF:,}") print(f" Matrix B_n validated = {B_n:,} ✓") print(f" Samples generated = {N_SAMPLES}") print(f" Chart file = {chart_path}") print(f" CSV export (hex uint64) = {CSV_PATH} ({csv_bytes:,} bytes)") print(f" Binary export (.npy) = {NPY_PATH} ({npy_bytes:,} bytes)") print(f" Bit-packed export (.bin) = {BIN_PATH} ({bin_bytes:,} bytes)") print(f" Compute device = {DEVICE}") print(f" Wall-clock (sample+decode) = {total_ms:.2f} ms") print(SEP) print() print(" CONVERSION FORMULAS (tex Sec. V)") print(" Symbolic → Ternary:") print(" Bullish: '>' → +1, '=' → 0") print(" Bearish: '<' → -1, '=' → 0") print(" Ternary → Symbolic:") print(" +1 → '>', 0 → '=', -1 → '<'") print() print(" TEMPORAL CONVENTION:") print(f" Chart x-axis: left = C_{{-{N-1}}} (oldest) → right = C_0 (newest)") print(" Bullish pattern: POC non-increasing toward right (higher on left)") print(" Bearish pattern: POC non-decreasing toward right (lower on left)") print(SEP)