| # !pip -q install numpy matplotlib warp-lang torch | |
| #!/usr/bin/env python3 | |
| """ | |
| TopoDevPOC_n39.py | |
| Topologically Unique Developing Point of Control Patterns | |
| Pre-market K-Lines, n = 39 three-minute candlesticks | |
| Paper : TopoDevPOC.tex (ConQ Research Team, Continual Quasars) | |
| Compute: Vectorized NumPy + GPU Torch (T4) + Warp branchless ops | |
| Architectural patterns from core_engine_v11.py (Hyper-Warp Edition) | |
| Outputs (CLI): | |
| 1. Total combination count for n=39 | |
| 2. Matrix state-transition validation | |
| 3. 100 random ternary matrices (1Γ38 each) | |
| 4. 100 random symbolic sequences (length-38 strings) | |
| 5. 100 developing_poc charts saved as PNG + ASCII CLI preview | |
| Run on Google Colab T4: | |
| !python TopoDevPOC_n39.py | |
| """ | |
| # ββ stdlib ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import os, sys, time, math | |
| import numpy as np | |
| # ββ matplotlib (non-interactive for Colab CLI) ββββββββββββββββββββββββββββββββ | |
| import matplotlib | |
| matplotlib.use('Agg') | |
| import matplotlib.pyplot as plt | |
| import matplotlib.ticker as mticker | |
| # ββ GPU setup (T4 Colab) βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| import torch | |
| HAS_CUDA = torch.cuda.is_available() | |
| DEVICE = 'cuda' if HAS_CUDA else 'cpu' | |
| except ImportError: | |
| HAS_CUDA = False | |
| DEVICE = 'cpu' | |
| torch = None | |
| # Optional Warp (core_engine_v11.py pattern) βββββββββββββββββββββββββββββββββ | |
| HAS_WARP = False | |
| try: | |
| import warp as wp | |
| wp.init() | |
| wp.set_module_options({"enable_backward": False, "fast_math": True, "max_unroll": 8}) | |
| HAS_WARP = bool(wp.get_cuda_devices()) | |
| except Exception: | |
| pass | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONSTANTS (tex Section II) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| N = 39 # pre-market 3-min candles (C_0 β¦ C_{-(n-1)}) | |
| N_TRANS = N - 1 # 38 adjacent-pair relations | |
| N_SAMPLES = 100 | |
| SEED = int(time.time() * 1000) & 0x7FFFFFFF | |
| SEP = "=" * 74 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 0. HEADER | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(SEP) | |
| print(" TopoDevPOC β Developing POC Pattern Enumerator") | |
| print(f" n = {N} candles | {N_TRANS} transitions | device = {DEVICE}") | |
| if HAS_CUDA and torch: | |
| print(f" GPU = {torch.cuda.get_device_name(0)}") | |
| if HAS_WARP: | |
| print(f" Warp = enabled (branchless kernel path)") | |
| print(SEP) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 1 β COMBINATORIAL ENUMERATION (tex Theorem, Sec. III) | |
| # | |
| # Bullish: each of (n-1) transitions β {>, =} β 2^(n-1) patterns | |
| # Bearish: each of (n-1) transitions β {<, =} β 2^(n-1) patterns | |
| # Total : 2^(n-1) + 2^(n-1) = 2^n (disjoint families) | |
| # n=39 : 2^39 = 549,755,813,888 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| TOTAL = 1 << N # exact Python int (arbitrary precision) | |
| HALF = 1 << (N - 1) | |
| print(f"\n[THEOREM] Total unique developing POC patterns for n={N}") | |
| print(f" Bullish (non-increasing) : 2^{N-1} = {HALF:,}") | |
| print(f" Bearish (non-decreasing) : 2^{N-1} = {HALF:,}") | |
| print(f" Total (2^{N}) : {TOTAL:,}") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 2 β MATRIX STATE-TRANSITION VALIDATION (tex Sec. IV) | |
| # | |
| # States : S_0 (equality), S_Β± (strict move) | |
| # A = [[1,1],[1,1]] (fully-connected 2-state digraph) | |
| # v_0 = [1,1]^T (both states reachable initially) | |
| # | |
| # B_n = 1^T Β· A^(n-2) Β· v_0 | |
| # = 2^(n-3) Β· 1^T Β· A Β· 1 (using A^k = 2^(k-1)Β·A for kβ₯1) | |
| # = 2^(n-3) Β· 4 = 2^(n-1) per direction | |
| # | |
| # Implementation: exact Python-int matrix exponentiation (no float rounding) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _mm2(A, B): | |
| """Exact 2Γ2 matrix multiply with Python arbitrary-precision ints.""" | |
| return [ | |
| [A[0][0]*B[0][0] + A[0][1]*B[1][0], A[0][0]*B[0][1] + A[0][1]*B[1][1]], | |
| [A[1][0]*B[0][0] + A[1][1]*B[1][0], A[1][0]*B[0][1] + A[1][1]*B[1][1]], | |
| ] | |
| def _mpow2(M, k): | |
| """Fast 2Γ2 matrix power, exact ints, O(log k).""" | |
| if k == 0: return [[1,0],[0,1]] | |
| if k == 1: return M | |
| h = _mpow2(M, k >> 1) | |
| s = _mm2(h, h) | |
| return s if (k & 1) == 0 else _mm2(s, M) | |
| A_mat = [[1,1],[1,1]] | |
| v0 = [1, 1] | |
| A_pow = _mpow2(A_mat, N - 2) | |
| # 1^T Β· A^(n-2) Β· v0 | |
| Av0_0 = A_pow[0][0]*v0[0] + A_pow[0][1]*v0[1] | |
| Av0_1 = A_pow[1][0]*v0[0] + A_pow[1][1]*v0[1] | |
| B_n = Av0_0 + Av0_1 # = 1^T Β· (A^(n-2)Β·v0) | |
| print(f"\n[MATRIX] A = [[1,1],[1,1]] | v_0 = [1,1]^T") | |
| print(f" A^{N-2} = [[{A_pow[0][0]}, {A_pow[0][1]}],") | |
| print(f" [{A_pow[1][0]}, {A_pow[1][1]}]]") | |
| print(f" B_{N} = 1^T Β· A^{N-2} Β· v_0 = {B_n:,}") | |
| print(f" Expected 2^{{n-1}} = {HALF:,}") | |
| assert B_n == HALF, f"Matrix B_n mismatch: {B_n} β {HALF}" | |
| assert 2 * B_n == TOTAL, f"Total mismatch: {2*B_n} β {TOTAL}" | |
| print(f" [OK] 2 Γ {B_n:,} = {TOTAL:,} β") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 3 β PATTERN ENCODING SCHEME | |
| # | |
| # Each of the 2^39 patterns maps bijectively to a 39-bit integer: | |
| # bit 0 : direction (0 = Bullish, 1 = Bearish) | |
| # bits 1 β¦ N-1 : transitions (1 = strict move, 0 = equality) | |
| # | |
| # pattern_id β [0, 2^39) uniquely identifies every valid pattern. | |
| # | |
| # Ternary matrix m β {+1,0}^(n-1) for bullish, | |
| # m β {-1,0}^(n-1) for bearish. | |
| # Bijection: m_k = sign(direction) Γ trans_bit_k | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 4 β GPU-ACCELERATED RANDOM SAMPLING | |
| # Generate (N_SAMPLES Γ N) binary matrix at once on T4 GPU. | |
| # Inspired by core_engine_v11.py XOR-shift PRNG kernel design. | |
| # bits[:,0] = direction flags | |
| # bits[:,1:] = N_TRANS transition flags per pattern | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| t_sample = time.perf_counter() | |
| if HAS_WARP: | |
| # ββ Warp branchless path (core_engine_v11.py style) βββββββββββββββββββββ | |
| # Launch one thread per sample; XOR-shift fills N bits per thread. | |
| _seed_wp = SEED | |
| def k_rng_bits(seed: int, N_cols: int, | |
| out: wp.array(dtype=wp.int8)): | |
| tid = wp.tid() | |
| rng = wp.uint32(seed) ^ wp.uint32(tid) | |
| if rng == wp.uint32(0): | |
| rng = wp.uint32(123456789) | |
| base = tid * N_cols | |
| for col in range(N_cols): | |
| rng = rng ^ (rng << wp.uint32(13)) | |
| rng = rng ^ (rng >> wp.uint32(17)) | |
| rng = rng ^ (rng << wp.uint32(5)) | |
| out[base + col] = wp.int8(int(rng) & 1) | |
| out_wp = wp.zeros(N_SAMPLES * N, dtype=wp.int8, device='cuda') | |
| wp.launch(k_rng_bits, dim=N_SAMPLES, block_dim=128, | |
| inputs=[_seed_wp, N, out_wp], device='cuda') | |
| wp.synchronize() | |
| bits_np = out_wp.numpy().reshape(N_SAMPLES, N) | |
| print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled via Warp XOR-shift kernel") | |
| elif HAS_CUDA and torch is not None: | |
| # ββ Torch GPU path βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| gen = torch.Generator(device='cuda') | |
| gen.manual_seed(SEED) | |
| bits_t = torch.randint(0, 2, (N_SAMPLES, N), device='cuda', | |
| generator=gen, dtype=torch.int8) | |
| bits_np = bits_t.cpu().numpy() | |
| print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled on GPU (torch.randint)") | |
| else: | |
| # ββ CPU NumPy fallback ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| rng_cpu = np.random.default_rng(SEED) | |
| bits_np = rng_cpu.integers(0, 2, size=(N_SAMPLES, N), dtype=np.int8) | |
| print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled on CPU (NumPy)") | |
| sample_ms = (time.perf_counter() - t_sample) * 1e3 | |
| print(f" Sampling time: {sample_ms:.2f} ms") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 5 β VECTORISED DECODING (NumPy, no Python loops over patterns) | |
| # | |
| # bits[:,0] β direction array (0=Bullish, 1=Bearish), shape (100,) | |
| # bits[:,1:] β trans array, shape (100, 38) | |
| # ternary_mat: +trans if Bullish, -trans if Bearish β shape (100, 38) | |
| # sign: Bullishβ+1, Bearishβ-1, broadcast multiply | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| dir_bits = bits_np[:, 0].astype(np.int8) # 0 or 1 | |
| trans = bits_np[:, 1:].astype(np.int8) # (100,38), values 0 or 1 | |
| signs = 1 - 2 * dir_bits # 0β+1 (bull), 1β-1 (bear) | |
| ternary = (signs[:, None] * trans).astype(np.int8) # (100,38): +1/0/-1 | |
| # Compact 39-bit pattern IDs | |
| pows64 = (np.uint64(1) << np.arange(N, dtype=np.uint64)) | |
| pat_ids = (bits_np.astype(np.uint64) * pows64[None, :]).sum(axis=1) | |
| # Symbolic sequences: vectorised char lookup | |
| # Bullish (sign=+1): trans=1 β '>', trans=0 β '=' | |
| # Bearish (sign=-1): trans=1 β '<', trans=0 β '=' | |
| SYM_BULL = np.array(['=', '>'], dtype='<U1') # index by trans bit | |
| SYM_BEAR = np.array(['=', '<'], dtype='<U1') | |
| sym_matrix = np.where(dir_bits[:, None] == 0, | |
| SYM_BULL[trans], | |
| SYM_BEAR[trans]) # (100, 38) | |
| # POC price sequence (right-to-left: p[0]=C_0=newest, p[N-1]=C_{-(N-1)}=oldest) | |
| # p_raw[i, k] = POC of candle C_{-k} for sample i | |
| # Transition k: p[k] = p[k+1] + ternary[k] | |
| # Build by cumsum from oldestβnewest: p_raw[:, N-1] = 50, then forward | |
| BASE = 50.0 | |
| step = 1.0 | |
| p_raw = np.zeros((N_SAMPLES, N), dtype=np.float32) | |
| p_raw[:, N-1] = BASE | |
| # Vectorised: cumulative sum of ternary (columns N-2 down to 0) | |
| for k in range(N-2, -1, -1): | |
| p_raw[:, k] = p_raw[:, k+1] + ternary[:, k].astype(np.float32) * step | |
| # Display order: oldest(left) β newest(right) | |
| # poc_disp[:, j] = p_raw[:, N-1-j] | |
| poc_disp = p_raw[:, ::-1].copy() # (100, 39), column 0=oldest, col N-1=newest | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # OUTPUT 1 β TERNARY MATRICES (100 Γ 38) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n{SEP}") | |
| print(f" OUTPUT 1 β TERNARY MATRICES (1Γ{N_TRANS}, values β {{-1,0,+1}})") | |
| print(f" Format: [#] Direction | PatternID | M = [m_0 β¦ m_37]") | |
| print(SEP) | |
| for i in range(N_SAMPLES): | |
| d_label = "Bullish" if dir_bits[i] == 0 else "Bearish" | |
| pid = int(pat_ids[i]) | |
| row_str = np.array2string(ternary[i], separator=',', | |
| max_line_width=400).replace('\n','') | |
| print(f" [{i+1:3d}] {d_label:7s} | ID={pid:>15d} | M={row_str}") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # OUTPUT 2 β SYMBOLIC SEQUENCES (length 38) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n{SEP}") | |
| print(f" OUTPUT 2 β SYMBOLIC SEQUENCES (Ξ£, length {N_TRANS})") | |
| print(f" Format: [#] Direction | PatternID | Ξ£ = Ο_0 Ο_1 β¦ Ο_37") | |
| print(SEP) | |
| for i in range(N_SAMPLES): | |
| d_label = "Bullish" if dir_bits[i] == 0 else "Bearish" | |
| pid = int(pat_ids[i]) | |
| seq_str = ' '.join(sym_matrix[i]) | |
| print(f" [{i+1:3d}] {d_label:7s} | ID={pid:>15d} | Ξ£ = {seq_str}") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # OUTPUT 3 β CHARTS | |
| # (a) Full matplotlib figure: 10Γ10 grid, saved to PNG | |
| # (b) ASCII CLI preview for first 10 patterns | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n{SEP}") | |
| print(f" OUTPUT 3 β DEVELOPING POC CHARTS (100 patterns)") | |
| print(SEP) | |
| # ββ x-axis: position 0=oldest C_{-(N-1)}, N-1=newest C_0 ββββββββββββββββββ | |
| x_pos = np.arange(N, dtype=np.float32) | |
| x_tick_pos = [0, 9, 19, 29, N-1] | |
| x_tick_lbl = [f'C_{{-{N-1}}}', f'C_{{-29}}', f'C_{{-19}}', f'C_{{-9}}', 'C_0'] | |
| ROWS, COLS = 10, 10 | |
| fig = plt.figure(figsize=(COLS * 3.2, ROWS * 2.0)) | |
| fig.suptitle( | |
| f"TopoDevPOC β 100 Random Developing POC Patterns " | |
| f"n={N} pre-market 3-min K-lines\n" | |
| f"Total pattern space: 2^{N} = {TOTAL:,} " | |
| f"(Bullish: {HALF:,} | Bearish: {HALF:,})", | |
| fontsize=10, y=1.005 | |
| ) | |
| for i in range(N_SAMPLES): | |
| ax = fig.add_subplot(ROWS, COLS, i + 1) | |
| poc = poc_disp[i] | |
| is_bull = (dir_bits[i] == 0) | |
| color = '#1a6eb5' if is_bull else '#c0392b' | |
| label = 'Bβ' if is_bull else 'Bβ' | |
| # Background shade | |
| ax.set_facecolor('#f7f9fc' if is_bull else '#fdf4f4') | |
| # POC line | |
| ax.plot(x_pos, poc, color=color, linewidth=1.2, zorder=3) | |
| # Mark strict-move positions (non-zero ternary in display coords) | |
| # Transition k corresponds to display segment [N-2-k, N-1-k] | |
| # Highlight the newer-side node at display index N-1-k = N-1-k | |
| strict_k = np.where(ternary[i] != 0)[0] # transition indices | |
| strict_disp = (N - 1 - strict_k).astype(int) # display x-positions | |
| if strict_disp.size > 0: | |
| ax.scatter(strict_disp, poc[strict_disp], | |
| color=color, s=5, zorder=5, linewidths=0) | |
| # Flat segments (equality) | |
| flat_k = np.where(ternary[i] == 0)[0] | |
| flat_disp = (N - 1 - flat_k).astype(int) | |
| if flat_disp.size > 0: | |
| ax.scatter(flat_disp, poc[flat_disp], | |
| color='gray', s=3, zorder=4, linewidths=0, alpha=0.5) | |
| n_strict = int(np.abs(ternary[i]).sum()) | |
| pid_short = int(pat_ids[i]) % 10**9 # last 9 digits for readability | |
| ax.set_title(f"#{i+1} {label} mv={n_strict} β¦{pid_short:09d}", | |
| fontsize=5.5, pad=2, color=color) | |
| ax.set_xlim(-0.5, N - 0.5) | |
| ax.set_xticks(x_tick_pos) | |
| ax.set_xticklabels(['βold', '', '', '', 'newβ'], fontsize=3.5) | |
| ax.tick_params(axis='y', labelsize=3.5) | |
| ax.yaxis.set_major_locator(mticker.MaxNLocator(4)) | |
| for sp in ('top', 'right'): | |
| ax.spines[sp].set_visible(False) | |
| ax.spines['left'].set_color(color) | |
| ax.spines['left'].set_linewidth(1.5) | |
| ax.spines['bottom'].set_color('#cccccc') | |
| plt.tight_layout(rect=[0, 0, 1, 1]) | |
| chart_path = "TopoDevPOC_n39_100samples.png" | |
| plt.savefig(chart_path, dpi=110, bbox_inches='tight') | |
| plt.close(fig) | |
| print(f" [SAVED] {chart_path}") | |
| # ββ ASCII CLI chart for first 10 patterns ββββββββββββββββββββββββββββββββββββ | |
| H = 7 # chart height in rows | |
| W = 39 # chart width = N | |
| print(f"\n ASCII CLI Charts β first 10 samples (right side = C_0 = newest)\n") | |
| for i in range(10): | |
| poc = poc_disp[i] | |
| d_lbl = "Bullish" if dir_bits[i] == 0 else "Bearish" | |
| pid = int(pat_ids[i]) | |
| n_mv = int(np.abs(ternary[i]).sum()) | |
| pmin, pmax = poc.min(), poc.max() | |
| span = pmax - pmin if pmax != pmin else 1.0 | |
| # Map each x-position to a row | |
| rows = (H - 1 - ((poc - pmin) / span * (H - 1))).round().astype(int) | |
| rows = np.clip(rows, 0, H - 1) | |
| grid = [[' '] * W for _ in range(H)] | |
| for j in range(W): | |
| r = rows[j] | |
| # Strict-move node in the pair ending at display j: | |
| # transition index k = N-1-j (if j < N-1) | |
| is_strict = (j < N - 1) and (ternary[i, N - 2 - j] != 0) | |
| grid[r][j] = 'β' if is_strict else 'Β·' | |
| # Connect with horizontal dash where poc is flat | |
| for row_idx in range(H): | |
| line = grid[row_idx] | |
| for j in range(1, W): | |
| if line[j] == ' ' and rows[j] == row_idx: | |
| line[j] = '-' | |
| print(f" [{i+1:2d}] {d_lbl:7s} | ID={pid} | strict_moves={n_mv}/{N_TRANS}") | |
| poc_hi = poc[N-1]; poc_lo = poc[0] | |
| print(f" POC range: oldest={poc_lo:.0f} β newest={poc_hi:.0f}") | |
| print(f" β{'β'*W}β") | |
| for row_idx in range(H): | |
| print(f" β{''.join(grid[row_idx])}β") | |
| print(f" β{'β'*W}β") | |
| sym_preview = ' '.join(sym_matrix[i, :12]) + ' β¦' | |
| print(f" Ξ£ (first 12): {sym_preview}\n") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FINAL SUMMARY | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| total_ms = (time.perf_counter() - t_sample) * 1e3 | |
| print(SEP) | |
| print(" SUMMARY") | |
| print(SEP) | |
| print(f" n (candles) = {N}") | |
| print(f" Transitions per pattern = {N_TRANS}") | |
| print(f" Total patterns [2^{N}] = {TOTAL:,}") | |
| print(f" Bullish [2^{N-1}] = {HALF:,}") | |
| print(f" Bearish [2^{N-1}] = {HALF:,}") | |
| print(f" Matrix B_n validated = {B_n:,} β") | |
| print(f" Samples generated = {N_SAMPLES}") | |
| print(f" Chart file = {chart_path}") | |
| print(f" Compute device = {DEVICE}") | |
| print(f" Wall-clock (sample+decode) = {total_ms:.2f} ms") | |
| print(SEP) | |
| print() | |
| print(" CONVERSION FORMULAS (tex Sec. V)") | |
| print(" Symbolic β Ternary:") | |
| print(" Bullish: '>' β +1, '=' β 0") | |
| print(" Bearish: '<' β -1, '=' β 0") | |
| print(" Ternary β Symbolic:") | |
| print(" +1 β '>', 0 β '=', -1 β '<'") | |
| print() | |
| print(" TEMPORAL CONVENTION:") | |
| print(f" Chart x-axis: left = C_{{-{N-1}}} (oldest) β right = C_0 (newest)") | |
| print(" Bullish pattern: POC non-increasing toward right (higher on left)") | |
| print(" Bearish pattern: POC non-decreasing toward right (lower on left)") | |
| print(SEP) | |