| |
| """ |
| TopoDevPOC_n39.py |
| Topologically Unique Developing Point of Control Patterns |
| Pre-market K-Lines, n = 39 three-minute candlesticks |
| |
| Paper : TopoDevPOC.tex (ConQ Research Team, Continual Quasars) |
| Compute: Vectorized NumPy + GPU Torch (T4) + Warp branchless ops |
| Architectural patterns from core_engine_v11.py (Hyper-Warp Edition) |
| |
| Outputs (CLI): |
| 1. Total combination count for n=39 |
| 2. Matrix state-transition validation |
| 3. 100 random ternary matrices (1Γ38 each) |
| 4. 100 random symbolic sequences (length-38 strings) |
| 5. 100 developing_poc charts saved as PNG + ASCII CLI preview |
| 6. CSV export β bit-packed uint64 pattern_id (39-bit encoding, min bytes) |
| 7. Binary export β raw .npy uint64 array (fastest machine read) |
| |
| COMPRESSION SCHEME: |
| Each pattern encodes into a single uint64 (8 bytes): |
| bit 0 : direction (0=Bullish, 1=Bearish) |
| bits 1..38 : transitions (1=strict, 0=equality) |
| Decode: direction = id & 1; trans[k] = (id >> (k+1)) & 1 |
| Full matrix + direction reconstructed from one integer in O(1). |
| Entire 2^39 space = integers [0, 2^39). No file required for enumeration. |
| |
| Run on Google Colab T4: |
| !python TopoDevPOC_n39.py |
| """ |
|
|
| |
| import os, sys, time, math, csv, struct |
| import numpy as np |
|
|
| |
| import matplotlib |
| matplotlib.use('Agg') |
| import matplotlib.pyplot as plt |
| import matplotlib.ticker as mticker |
|
|
| |
| try: |
| import torch |
| HAS_CUDA = torch.cuda.is_available() |
| DEVICE = 'cuda' if HAS_CUDA else 'cpu' |
| except ImportError: |
| HAS_CUDA = False |
| DEVICE = 'cpu' |
| torch = None |
|
|
| |
| HAS_WARP = False |
| try: |
| import warp as wp |
| wp.init() |
| wp.set_module_options({"enable_backward": False, "fast_math": True, "max_unroll": 8}) |
| HAS_WARP = bool(wp.get_cuda_devices()) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| N = 39 |
| N_TRANS = N - 1 |
| N_SAMPLES = 100 |
| SEED = int(time.time() * 1000) & 0x7FFFFFFF |
|
|
| SEP = "=" * 74 |
|
|
| |
| |
| |
| print(SEP) |
| print(" TopoDevPOC β Developing POC Pattern Enumerator") |
| print(f" n = {N} candles | {N_TRANS} transitions | device = {DEVICE}") |
| if HAS_CUDA and torch: |
| print(f" GPU = {torch.cuda.get_device_name(0)}") |
| if HAS_WARP: |
| print(f" Warp = enabled (branchless kernel path)") |
| print(SEP) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| TOTAL = 1 << N |
| HALF = 1 << (N - 1) |
|
|
| print(f"\n[THEOREM] Total unique developing POC patterns for n={N}") |
| print(f" Bullish (non-increasing) : 2^{N-1} = {HALF:,}") |
| print(f" Bearish (non-decreasing) : 2^{N-1} = {HALF:,}") |
| print(f" Total (2^{N}) : {TOTAL:,}") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def _mm2(A, B): |
| """Exact 2Γ2 matrix multiply with Python arbitrary-precision ints.""" |
| return [ |
| [A[0][0]*B[0][0] + A[0][1]*B[1][0], A[0][0]*B[0][1] + A[0][1]*B[1][1]], |
| [A[1][0]*B[0][0] + A[1][1]*B[1][0], A[1][0]*B[0][1] + A[1][1]*B[1][1]], |
| ] |
|
|
| def _mpow2(M, k): |
| """Fast 2Γ2 matrix power, exact ints, O(log k).""" |
| if k == 0: return [[1,0],[0,1]] |
| if k == 1: return M |
| h = _mpow2(M, k >> 1) |
| s = _mm2(h, h) |
| return s if (k & 1) == 0 else _mm2(s, M) |
|
|
| A_mat = [[1,1],[1,1]] |
| v0 = [1, 1] |
| A_pow = _mpow2(A_mat, N - 2) |
| |
| Av0_0 = A_pow[0][0]*v0[0] + A_pow[0][1]*v0[1] |
| Av0_1 = A_pow[1][0]*v0[0] + A_pow[1][1]*v0[1] |
| B_n = Av0_0 + Av0_1 |
|
|
| print(f"\n[MATRIX] A = [[1,1],[1,1]] | v_0 = [1,1]^T") |
| print(f" A^{N-2} = [[{A_pow[0][0]}, {A_pow[0][1]}],") |
| print(f" [{A_pow[1][0]}, {A_pow[1][1]}]]") |
| print(f" B_{N} = 1^T Β· A^{N-2} Β· v_0 = {B_n:,}") |
| print(f" Expected 2^{{n-1}} = {HALF:,}") |
| assert B_n == HALF, f"Matrix B_n mismatch: {B_n} β {HALF}" |
| assert 2 * B_n == TOTAL, f"Total mismatch: {2*B_n} β {TOTAL}" |
| print(f" [OK] 2 Γ {B_n:,} = {TOTAL:,} β") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| t_sample = time.perf_counter() |
|
|
| if HAS_WARP: |
| |
| |
| _seed_wp = SEED |
|
|
| @wp.kernel |
| def k_rng_bits(seed: int, N_cols: int, |
| out: wp.array(dtype=wp.int8)): |
| tid = wp.tid() |
| rng = wp.uint32(seed) ^ wp.uint32(tid) |
| if rng == wp.uint32(0): |
| rng = wp.uint32(123456789) |
| base = tid * N_cols |
| for col in range(N_cols): |
| rng = rng ^ (rng << wp.uint32(13)) |
| rng = rng ^ (rng >> wp.uint32(17)) |
| rng = rng ^ (rng << wp.uint32(5)) |
| out[base + col] = wp.int8(int(rng) & 1) |
|
|
| out_wp = wp.zeros(N_SAMPLES * N, dtype=wp.int8, device='cuda') |
| wp.launch(k_rng_bits, dim=N_SAMPLES, block_dim=128, |
| inputs=[_seed_wp, N, out_wp], device='cuda') |
| wp.synchronize() |
| bits_np = out_wp.numpy().reshape(N_SAMPLES, N) |
| print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled via Warp XOR-shift kernel") |
|
|
| elif HAS_CUDA and torch is not None: |
| |
| gen = torch.Generator(device='cuda') |
| gen.manual_seed(SEED) |
| bits_t = torch.randint(0, 2, (N_SAMPLES, N), device='cuda', |
| generator=gen, dtype=torch.int8) |
| bits_np = bits_t.cpu().numpy() |
| print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled on GPU (torch.randint)") |
|
|
| else: |
| |
| rng_cpu = np.random.default_rng(SEED) |
| bits_np = rng_cpu.integers(0, 2, size=(N_SAMPLES, N), dtype=np.int8) |
| print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled on CPU (NumPy)") |
|
|
| sample_ms = (time.perf_counter() - t_sample) * 1e3 |
| print(f" Sampling time: {sample_ms:.2f} ms") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| dir_bits = bits_np[:, 0].astype(np.int8) |
| trans = bits_np[:, 1:].astype(np.int8) |
| signs = 1 - 2 * dir_bits |
| ternary = (signs[:, None] * trans).astype(np.int8) |
|
|
| |
| pows64 = (np.uint64(1) << np.arange(N, dtype=np.uint64)) |
| pat_ids = (bits_np.astype(np.uint64) * pows64[None, :]).sum(axis=1) |
|
|
| |
| |
| |
| SYM_BULL = np.array(['=', '>'], dtype='<U1') |
| SYM_BEAR = np.array(['=', '<'], dtype='<U1') |
| sym_matrix = np.where(dir_bits[:, None] == 0, |
| SYM_BULL[trans], |
| SYM_BEAR[trans]) |
|
|
| |
| |
| |
| |
| BASE = 50.0 |
| step = 1.0 |
| p_raw = np.zeros((N_SAMPLES, N), dtype=np.float32) |
| p_raw[:, N-1] = BASE |
| |
| for k in range(N-2, -1, -1): |
| p_raw[:, k] = p_raw[:, k+1] + ternary[:, k].astype(np.float32) * step |
|
|
| |
| |
| poc_disp = p_raw[:, ::-1].copy() |
|
|
| |
| |
| |
| print(f"\n{SEP}") |
| print(f" OUTPUT 1 β TERNARY MATRICES (1Γ{N_TRANS}, values β {{-1,0,+1}})") |
| print(f" Format: [#] Direction | PatternID | M = [m_0 β¦ m_37]") |
| print(SEP) |
|
|
| for i in range(N_SAMPLES): |
| d_label = "Bullish" if dir_bits[i] == 0 else "Bearish" |
| pid = int(pat_ids[i]) |
| row_str = np.array2string(ternary[i], separator=',', |
| max_line_width=400).replace('\n','') |
| print(f" [{i+1:3d}] {d_label:7s} | ID={pid:>15d} | M={row_str}") |
|
|
| |
| |
| |
| print(f"\n{SEP}") |
| print(f" OUTPUT 2 β SYMBOLIC SEQUENCES (Ξ£, length {N_TRANS})") |
| print(f" Format: [#] Direction | PatternID | Ξ£ = Ο_0 Ο_1 β¦ Ο_37") |
| print(SEP) |
|
|
| for i in range(N_SAMPLES): |
| d_label = "Bullish" if dir_bits[i] == 0 else "Bearish" |
| pid = int(pat_ids[i]) |
| seq_str = ' '.join(sym_matrix[i]) |
| print(f" [{i+1:3d}] {d_label:7s} | ID={pid:>15d} | Ξ£ = {seq_str}") |
|
|
| |
| |
| |
| |
| |
| print(f"\n{SEP}") |
| print(f" OUTPUT 3 β DEVELOPING POC CHARTS (100 patterns)") |
| print(SEP) |
|
|
| |
| x_pos = np.arange(N, dtype=np.float32) |
| x_tick_pos = [0, 9, 19, 29, N-1] |
| x_tick_lbl = [f'C_{{-{N-1}}}', f'C_{{-29}}', f'C_{{-19}}', f'C_{{-9}}', 'C_0'] |
|
|
| ROWS, COLS = 10, 10 |
| fig = plt.figure(figsize=(COLS * 3.2, ROWS * 2.0)) |
| fig.suptitle( |
| f"TopoDevPOC β 100 Random Developing POC Patterns " |
| f"n={N} pre-market 3-min K-lines\n" |
| f"Total pattern space: 2^{N} = {TOTAL:,} " |
| f"(Bullish: {HALF:,} | Bearish: {HALF:,})", |
| fontsize=10, y=1.005 |
| ) |
|
|
| for i in range(N_SAMPLES): |
| ax = fig.add_subplot(ROWS, COLS, i + 1) |
| poc = poc_disp[i] |
| is_bull = (dir_bits[i] == 0) |
| color = '#1a6eb5' if is_bull else '#c0392b' |
| label = 'Bβ' if is_bull else 'Bβ' |
|
|
| |
| ax.set_facecolor('#f7f9fc' if is_bull else '#fdf4f4') |
|
|
| |
| ax.plot(x_pos, poc, color=color, linewidth=1.2, zorder=3) |
|
|
| |
| |
| |
| strict_k = np.where(ternary[i] != 0)[0] |
| strict_disp = (N - 1 - strict_k).astype(int) |
| if strict_disp.size > 0: |
| ax.scatter(strict_disp, poc[strict_disp], |
| color=color, s=5, zorder=5, linewidths=0) |
|
|
| |
| flat_k = np.where(ternary[i] == 0)[0] |
| flat_disp = (N - 1 - flat_k).astype(int) |
| if flat_disp.size > 0: |
| ax.scatter(flat_disp, poc[flat_disp], |
| color='gray', s=3, zorder=4, linewidths=0, alpha=0.5) |
|
|
| n_strict = int(np.abs(ternary[i]).sum()) |
| pid_short = int(pat_ids[i]) % 10**9 |
| ax.set_title(f"#{i+1} {label} mv={n_strict} β¦{pid_short:09d}", |
| fontsize=5.5, pad=2, color=color) |
|
|
| ax.set_xlim(-0.5, N - 0.5) |
| ax.set_xticks(x_tick_pos) |
| ax.set_xticklabels(['βold', '', '', '', 'newβ'], fontsize=3.5) |
| ax.tick_params(axis='y', labelsize=3.5) |
| ax.yaxis.set_major_locator(mticker.MaxNLocator(4)) |
| for sp in ('top', 'right'): |
| ax.spines[sp].set_visible(False) |
| ax.spines['left'].set_color(color) |
| ax.spines['left'].set_linewidth(1.5) |
| ax.spines['bottom'].set_color('#cccccc') |
|
|
| plt.tight_layout(rect=[0, 0, 1, 1]) |
| chart_path = "TopoDevPOC_n39_100samples.png" |
| plt.savefig(chart_path, dpi=110, bbox_inches='tight') |
| plt.close(fig) |
| print(f" [SAVED] {chart_path}") |
|
|
| |
| H = 7 |
| W = 39 |
|
|
| print(f"\n ASCII CLI Charts β first 10 samples (right side = C_0 = newest)\n") |
| for i in range(10): |
| poc = poc_disp[i] |
| d_lbl = "Bullish" if dir_bits[i] == 0 else "Bearish" |
| pid = int(pat_ids[i]) |
| n_mv = int(np.abs(ternary[i]).sum()) |
| pmin, pmax = poc.min(), poc.max() |
| span = pmax - pmin if pmax != pmin else 1.0 |
|
|
| |
| rows = (H - 1 - ((poc - pmin) / span * (H - 1))).round().astype(int) |
| rows = np.clip(rows, 0, H - 1) |
|
|
| grid = [[' '] * W for _ in range(H)] |
| for j in range(W): |
| r = rows[j] |
| |
| |
| is_strict = (j < N - 1) and (ternary[i, N - 2 - j] != 0) |
| grid[r][j] = 'β' if is_strict else 'Β·' |
| |
| for row_idx in range(H): |
| line = grid[row_idx] |
| for j in range(1, W): |
| if line[j] == ' ' and rows[j] == row_idx: |
| line[j] = '-' |
|
|
| print(f" [{i+1:2d}] {d_lbl:7s} | ID={pid} | strict_moves={n_mv}/{N_TRANS}") |
| poc_hi = poc[N-1]; poc_lo = poc[0] |
| print(f" POC range: oldest={poc_lo:.0f} β newest={poc_hi:.0f}") |
| print(f" β{'β'*W}β") |
| for row_idx in range(H): |
| print(f" β{''.join(grid[row_idx])}β") |
| print(f" β{'β'*W}β") |
| sym_preview = ' '.join(sym_matrix[i, :12]) + ' β¦' |
| print(f" Ξ£ (first 12): {sym_preview}\n") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| print(f"\n{SEP}") |
| print(f" OUTPUT 4 β COMPRESSED EXPORT") |
| print(SEP) |
|
|
| |
| def encode_pattern_id(dir_bit, trans_bits): |
| """Pack direction + 38 transition bits into one uint64.""" |
| pid = np.uint64(dir_bit) |
| for k, t in enumerate(trans_bits): |
| pid |= (np.uint64(int(t)) << np.uint64(k + 1)) |
| return int(pid) |
|
|
| def decode_pattern_id(pid, n_trans=38): |
| """Unpack uint64 β direction + ternary matrix. O(1) per pattern.""" |
| pid = int(pid) |
| dir_bit = pid & 1 |
| direction = "Bullish" if dir_bit == 0 else "Bearish" |
| sign = 1 if dir_bit == 0 else -1 |
| trans = np.array([(pid >> (k + 1)) & 1 for k in range(n_trans)], dtype=np.int8) |
| ternary = (sign * trans).astype(np.int8) |
| return direction, trans, ternary |
|
|
| |
| for i in range(N_SAMPLES): |
| pid_ref = int(pat_ids[i]) |
| pid_enc = encode_pattern_id(int(dir_bits[i]), trans[i]) |
| assert pid_ref == pid_enc, f"Encode mismatch at sample {i}" |
| d2, t2, m2 = decode_pattern_id(pid_enc) |
| assert (d2 == ("Bullish" if dir_bits[i] == 0 else "Bearish")), "Direction mismatch" |
| assert np.array_equal(m2, ternary[i]), f"Ternary mismatch at sample {i}" |
| print(" [OK] 100-sample encode/decode round-trip verified") |
|
|
| |
| |
| |
| CSV_PATH = "TopoDevPOC_n39_samples.csv" |
| with open(CSV_PATH, 'w', newline='') as f: |
| writer = csv.writer(f) |
| |
| writer.writerow([ |
| "seq_id", |
| "pattern_id_uint64", |
| "pattern_id_dec", |
| "direction", |
| "n_strict_moves", |
| ]) |
| for i in range(N_SAMPLES): |
| pid = int(pat_ids[i]) |
| d_char = "B+" if dir_bits[i] == 0 else "B-" |
| n_mv = int(np.abs(ternary[i]).sum()) |
| writer.writerow([ |
| i + 1, |
| f"0x{pid:016X}", |
| pid, |
| d_char, |
| n_mv, |
| ]) |
|
|
| csv_bytes = os.path.getsize(CSV_PATH) |
| print(f" [CSV] {CSV_PATH} β {csv_bytes:,} bytes ({csv_bytes/N_SAMPLES:.1f} bytes/row)") |
|
|
| |
| NPY_PATH = "TopoDevPOC_n39_samples.npy" |
| np.save(NPY_PATH, pat_ids.astype(np.uint64)) |
| npy_bytes = os.path.getsize(NPY_PATH) |
| print(f" [NPY] {NPY_PATH} β {npy_bytes:,} bytes ({(npy_bytes-128)/N_SAMPLES:.1f} bytes/row payload)") |
|
|
| |
| |
| |
| BIN_PATH = "TopoDevPOC_n39_samples.bin" |
| BYTES_PER = 5 |
| with open(BIN_PATH, 'wb') as f: |
| |
| f.write(b'TPOC3900') |
| f.write(struct.pack('<IB', N_SAMPLES, 39)) |
| for i in range(N_SAMPLES): |
| pid = int(pat_ids[i]) |
| f.write(pid.to_bytes(BYTES_PER, byteorder='little')) |
| bin_bytes = os.path.getsize(BIN_PATH) |
| payload = N_SAMPLES * BYTES_PER |
| print(f" [BIN] {BIN_PATH} β {bin_bytes:,} bytes (13 hdr + {payload} payload = {BYTES_PER} bytes/pattern)") |
|
|
| |
| print() |
| print(f" SIZE COMPARISON (100 samples, n=39, {N_TRANS} transitions):") |
| print(f" Naive text (direction + 38 ints): ~{100 * 110:,} bytes") |
| print(f" CSV hex uint64 (this file): {csv_bytes:,} bytes ({100*110//csv_bytes:.1f}Γ smaller)") |
| print(f" NumPy .npy uint64: {npy_bytes:,} bytes") |
| print(f" Raw bit-packed .bin (39 bits each): {bin_bytes:,} bytes ({100*110//bin_bytes:.1f}Γ smaller)") |
| print() |
| print(f" SCALING TO ALL 2^{N} = {TOTAL:,} PATTERNS:") |
| full_npy_tb = (TOTAL * 8) / 1e12 |
| full_bin_tb = (TOTAL * BYTES_PER) / 1e12 |
| full_text_tb = (TOTAL * 110) / 1e12 |
| print(f" Naive text: ~{full_text_tb:.1f} TB (infeasible)") |
| print(f" uint64 binary (.npy): ~{full_npy_tb:.2f} TB (infeasible to store)") |
| print(f" 5-byte bit-packed: ~{full_bin_tb:.2f} TB (infeasible to store)") |
| print(f" Optimal: 0 bytes β pattern k = integer k, range [0, 2^{N})") |
| print(f" Use streaming generator below for on-demand enumeration.") |
|
|
| |
| def stream_all_patterns(n=N): |
| """ |
| Yields (pattern_id, direction, ternary_matrix) for all 2^n patterns. |
| Zero memory beyond one pattern at a time. Works for any n. |
| Pattern_id k: bit0=direction, bits1..n-1=transitions. |
| """ |
| sign_map = {0: 1, 1: -1} |
| for k in range(1 << n): |
| dir_bit = k & 1 |
| sign = sign_map[dir_bit] |
| direction = "Bullish" if dir_bit == 0 else "Bearish" |
| ternary_k = np.array( |
| [sign * ((k >> (j + 1)) & 1) for j in range(n - 1)], |
| dtype=np.int8 |
| ) |
| yield k, direction, ternary_k |
|
|
| |
| print() |
| print(f" STREAMING GENERATOR β first 4 / last 4 of all {TOTAL:,} patterns:") |
| gen = stream_all_patterns(N) |
| for _ in range(4): |
| pid_s, d_s, _ = next(gen) |
| print(f" pattern_id={pid_s:>3d} direction={d_s}") |
| print(f" ... ({TOTAL - 8:,} patterns omitted) ...") |
| |
| for pid_s in range(TOTAL - 4, TOTAL): |
| d_s = "Bullish" if (pid_s & 1) == 0 else "Bearish" |
| print(f" pattern_id={pid_s} direction={d_s}") |
|
|
| |
| |
| |
| total_ms = (time.perf_counter() - t_sample) * 1e3 |
| print(SEP) |
| print(" SUMMARY") |
| print(SEP) |
| print(f" n (candles) = {N}") |
| print(f" Transitions per pattern = {N_TRANS}") |
| print(f" Total patterns [2^{N}] = {TOTAL:,}") |
| print(f" Bullish [2^{N-1}] = {HALF:,}") |
| print(f" Bearish [2^{N-1}] = {HALF:,}") |
| print(f" Matrix B_n validated = {B_n:,} β") |
| print(f" Samples generated = {N_SAMPLES}") |
| print(f" Chart file = {chart_path}") |
| print(f" CSV export (hex uint64) = {CSV_PATH} ({csv_bytes:,} bytes)") |
| print(f" Binary export (.npy) = {NPY_PATH} ({npy_bytes:,} bytes)") |
| print(f" Bit-packed export (.bin) = {BIN_PATH} ({bin_bytes:,} bytes)") |
| print(f" Compute device = {DEVICE}") |
| print(f" Wall-clock (sample+decode) = {total_ms:.2f} ms") |
| print(SEP) |
| print() |
| print(" CONVERSION FORMULAS (tex Sec. V)") |
| print(" Symbolic β Ternary:") |
| print(" Bullish: '>' β +1, '=' β 0") |
| print(" Bearish: '<' β -1, '=' β 0") |
| print(" Ternary β Symbolic:") |
| print(" +1 β '>', 0 β '=', -1 β '<'") |
| print() |
| print(" TEMPORAL CONVENTION:") |
| print(f" Chart x-axis: left = C_{{-{N-1}}} (oldest) β right = C_0 (newest)") |
| print(" Bullish pattern: POC non-increasing toward right (higher on left)") |
| print(" Bearish pattern: POC non-decreasing toward right (lower on left)") |
| print(SEP) |
|
|