TopoDevPOC / TopoDevPOC_n39.py
algorembrant's picture
Upload 5 files
bfe8887 verified
# !pip -q install numpy matplotlib warp-lang torch
#!/usr/bin/env python3
"""
TopoDevPOC_n39.py
Topologically Unique Developing Point of Control Patterns
Pre-market K-Lines, n = 39 three-minute candlesticks
Paper : TopoDevPOC.tex (ConQ Research Team, Continual Quasars)
Compute: Vectorized NumPy + GPU Torch (T4) + Warp branchless ops
Architectural patterns from core_engine_v11.py (Hyper-Warp Edition)
Outputs (CLI):
1. Total combination count for n=39
2. Matrix state-transition validation
3. 100 random ternary matrices (1Γ—38 each)
4. 100 random symbolic sequences (length-38 strings)
5. 100 developing_poc charts saved as PNG + ASCII CLI preview
Run on Google Colab T4:
!python TopoDevPOC_n39.py
"""
# ── stdlib ────────────────────────────────────────────────────────────────────
import os, sys, time, math
import numpy as np
# ── matplotlib (non-interactive for Colab CLI) ────────────────────────────────
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
# ── GPU setup (T4 Colab) ─────────────────────────────────────────────────────
try:
import torch
HAS_CUDA = torch.cuda.is_available()
DEVICE = 'cuda' if HAS_CUDA else 'cpu'
except ImportError:
HAS_CUDA = False
DEVICE = 'cpu'
torch = None
# Optional Warp (core_engine_v11.py pattern) ─────────────────────────────────
HAS_WARP = False
try:
import warp as wp
wp.init()
wp.set_module_options({"enable_backward": False, "fast_math": True, "max_unroll": 8})
HAS_WARP = bool(wp.get_cuda_devices())
except Exception:
pass
# ─────────────────────────────────────────────────────────────────────────────
# CONSTANTS (tex Section II)
# ─────────────────────────────────────────────────────────────────────────────
N = 39 # pre-market 3-min candles (C_0 … C_{-(n-1)})
N_TRANS = N - 1 # 38 adjacent-pair relations
N_SAMPLES = 100
SEED = int(time.time() * 1000) & 0x7FFFFFFF
SEP = "=" * 74
# ─────────────────────────────────────────────────────────────────────────────
# 0. HEADER
# ─────────────────────────────────────────────────────────────────────────────
print(SEP)
print(" TopoDevPOC β€” Developing POC Pattern Enumerator")
print(f" n = {N} candles | {N_TRANS} transitions | device = {DEVICE}")
if HAS_CUDA and torch:
print(f" GPU = {torch.cuda.get_device_name(0)}")
if HAS_WARP:
print(f" Warp = enabled (branchless kernel path)")
print(SEP)
# ─────────────────────────────────────────────────────────────────────────────
# SECTION 1 β€” COMBINATORIAL ENUMERATION (tex Theorem, Sec. III)
#
# Bullish: each of (n-1) transitions ∈ {>, =} β†’ 2^(n-1) patterns
# Bearish: each of (n-1) transitions ∈ {<, =} β†’ 2^(n-1) patterns
# Total : 2^(n-1) + 2^(n-1) = 2^n (disjoint families)
# n=39 : 2^39 = 549,755,813,888
# ─────────────────────────────────────────────────────────────────────────────
TOTAL = 1 << N # exact Python int (arbitrary precision)
HALF = 1 << (N - 1)
print(f"\n[THEOREM] Total unique developing POC patterns for n={N}")
print(f" Bullish (non-increasing) : 2^{N-1} = {HALF:,}")
print(f" Bearish (non-decreasing) : 2^{N-1} = {HALF:,}")
print(f" Total (2^{N}) : {TOTAL:,}")
# ─────────────────────────────────────────────────────────────────────────────
# SECTION 2 β€” MATRIX STATE-TRANSITION VALIDATION (tex Sec. IV)
#
# States : S_0 (equality), S_Β± (strict move)
# A = [[1,1],[1,1]] (fully-connected 2-state digraph)
# v_0 = [1,1]^T (both states reachable initially)
#
# B_n = 1^T Β· A^(n-2) Β· v_0
# = 2^(n-3) Β· 1^T Β· A Β· 1 (using A^k = 2^(k-1)Β·A for kβ‰₯1)
# = 2^(n-3) Β· 4 = 2^(n-1) per direction
#
# Implementation: exact Python-int matrix exponentiation (no float rounding)
# ─────────────────────────────────────────────────────────────────────────────
def _mm2(A, B):
"""Exact 2Γ—2 matrix multiply with Python arbitrary-precision ints."""
return [
[A[0][0]*B[0][0] + A[0][1]*B[1][0], A[0][0]*B[0][1] + A[0][1]*B[1][1]],
[A[1][0]*B[0][0] + A[1][1]*B[1][0], A[1][0]*B[0][1] + A[1][1]*B[1][1]],
]
def _mpow2(M, k):
"""Fast 2Γ—2 matrix power, exact ints, O(log k)."""
if k == 0: return [[1,0],[0,1]]
if k == 1: return M
h = _mpow2(M, k >> 1)
s = _mm2(h, h)
return s if (k & 1) == 0 else _mm2(s, M)
A_mat = [[1,1],[1,1]]
v0 = [1, 1]
A_pow = _mpow2(A_mat, N - 2)
# 1^T Β· A^(n-2) Β· v0
Av0_0 = A_pow[0][0]*v0[0] + A_pow[0][1]*v0[1]
Av0_1 = A_pow[1][0]*v0[0] + A_pow[1][1]*v0[1]
B_n = Av0_0 + Av0_1 # = 1^T Β· (A^(n-2)Β·v0)
print(f"\n[MATRIX] A = [[1,1],[1,1]] | v_0 = [1,1]^T")
print(f" A^{N-2} = [[{A_pow[0][0]}, {A_pow[0][1]}],")
print(f" [{A_pow[1][0]}, {A_pow[1][1]}]]")
print(f" B_{N} = 1^T Β· A^{N-2} Β· v_0 = {B_n:,}")
print(f" Expected 2^{{n-1}} = {HALF:,}")
assert B_n == HALF, f"Matrix B_n mismatch: {B_n} β‰  {HALF}"
assert 2 * B_n == TOTAL, f"Total mismatch: {2*B_n} β‰  {TOTAL}"
print(f" [OK] 2 Γ— {B_n:,} = {TOTAL:,} βœ“")
# ─────────────────────────────────────────────────────────────────────────────
# SECTION 3 β€” PATTERN ENCODING SCHEME
#
# Each of the 2^39 patterns maps bijectively to a 39-bit integer:
# bit 0 : direction (0 = Bullish, 1 = Bearish)
# bits 1 … N-1 : transitions (1 = strict move, 0 = equality)
#
# pattern_id ∈ [0, 2^39) uniquely identifies every valid pattern.
#
# Ternary matrix m ∈ {+1,0}^(n-1) for bullish,
# m ∈ {-1,0}^(n-1) for bearish.
# Bijection: m_k = sign(direction) Γ— trans_bit_k
# ─────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────────────────────────────────────
# SECTION 4 β€” GPU-ACCELERATED RANDOM SAMPLING
# Generate (N_SAMPLES Γ— N) binary matrix at once on T4 GPU.
# Inspired by core_engine_v11.py XOR-shift PRNG kernel design.
# bits[:,0] = direction flags
# bits[:,1:] = N_TRANS transition flags per pattern
# ─────────────────────────────────────────────────────────────────────────────
t_sample = time.perf_counter()
if HAS_WARP:
# ── Warp branchless path (core_engine_v11.py style) ─────────────────────
# Launch one thread per sample; XOR-shift fills N bits per thread.
_seed_wp = SEED
@wp.kernel
def k_rng_bits(seed: int, N_cols: int,
out: wp.array(dtype=wp.int8)):
tid = wp.tid()
rng = wp.uint32(seed) ^ wp.uint32(tid)
if rng == wp.uint32(0):
rng = wp.uint32(123456789)
base = tid * N_cols
for col in range(N_cols):
rng = rng ^ (rng << wp.uint32(13))
rng = rng ^ (rng >> wp.uint32(17))
rng = rng ^ (rng << wp.uint32(5))
out[base + col] = wp.int8(int(rng) & 1)
out_wp = wp.zeros(N_SAMPLES * N, dtype=wp.int8, device='cuda')
wp.launch(k_rng_bits, dim=N_SAMPLES, block_dim=128,
inputs=[_seed_wp, N, out_wp], device='cuda')
wp.synchronize()
bits_np = out_wp.numpy().reshape(N_SAMPLES, N)
print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled via Warp XOR-shift kernel")
elif HAS_CUDA and torch is not None:
# ── Torch GPU path ───────────────────────────────────────────────────────
gen = torch.Generator(device='cuda')
gen.manual_seed(SEED)
bits_t = torch.randint(0, 2, (N_SAMPLES, N), device='cuda',
generator=gen, dtype=torch.int8)
bits_np = bits_t.cpu().numpy()
print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled on GPU (torch.randint)")
else:
# ── CPU NumPy fallback ────────────────────────────────────────────────────
rng_cpu = np.random.default_rng(SEED)
bits_np = rng_cpu.integers(0, 2, size=(N_SAMPLES, N), dtype=np.int8)
print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled on CPU (NumPy)")
sample_ms = (time.perf_counter() - t_sample) * 1e3
print(f" Sampling time: {sample_ms:.2f} ms")
# ─────────────────────────────────────────────────────────────────────────────
# SECTION 5 β€” VECTORISED DECODING (NumPy, no Python loops over patterns)
#
# bits[:,0] β†’ direction array (0=Bullish, 1=Bearish), shape (100,)
# bits[:,1:] β†’ trans array, shape (100, 38)
# ternary_mat: +trans if Bullish, -trans if Bearish β†’ shape (100, 38)
# sign: Bullish→+1, Bearish→-1, broadcast multiply
# ─────────────────────────────────────────────────────────────────────────────
dir_bits = bits_np[:, 0].astype(np.int8) # 0 or 1
trans = bits_np[:, 1:].astype(np.int8) # (100,38), values 0 or 1
signs = 1 - 2 * dir_bits # 0β†’+1 (bull), 1β†’-1 (bear)
ternary = (signs[:, None] * trans).astype(np.int8) # (100,38): +1/0/-1
# Compact 39-bit pattern IDs
pows64 = (np.uint64(1) << np.arange(N, dtype=np.uint64))
pat_ids = (bits_np.astype(np.uint64) * pows64[None, :]).sum(axis=1)
# Symbolic sequences: vectorised char lookup
# Bullish (sign=+1): trans=1 β†’ '>', trans=0 β†’ '='
# Bearish (sign=-1): trans=1 β†’ '<', trans=0 β†’ '='
SYM_BULL = np.array(['=', '>'], dtype='<U1') # index by trans bit
SYM_BEAR = np.array(['=', '<'], dtype='<U1')
sym_matrix = np.where(dir_bits[:, None] == 0,
SYM_BULL[trans],
SYM_BEAR[trans]) # (100, 38)
# POC price sequence (right-to-left: p[0]=C_0=newest, p[N-1]=C_{-(N-1)}=oldest)
# p_raw[i, k] = POC of candle C_{-k} for sample i
# Transition k: p[k] = p[k+1] + ternary[k]
# Build by cumsum from oldest→newest: p_raw[:, N-1] = 50, then forward
BASE = 50.0
step = 1.0
p_raw = np.zeros((N_SAMPLES, N), dtype=np.float32)
p_raw[:, N-1] = BASE
# Vectorised: cumulative sum of ternary (columns N-2 down to 0)
for k in range(N-2, -1, -1):
p_raw[:, k] = p_raw[:, k+1] + ternary[:, k].astype(np.float32) * step
# Display order: oldest(left) β†’ newest(right)
# poc_disp[:, j] = p_raw[:, N-1-j]
poc_disp = p_raw[:, ::-1].copy() # (100, 39), column 0=oldest, col N-1=newest
# ─────────────────────────────────────────────────────────────────────────────
# OUTPUT 1 β€” TERNARY MATRICES (100 Γ— 38)
# ─────────────────────────────────────────────────────────────────────────────
print(f"\n{SEP}")
print(f" OUTPUT 1 β€” TERNARY MATRICES (1Γ—{N_TRANS}, values ∈ {{-1,0,+1}})")
print(f" Format: [#] Direction | PatternID | M = [m_0 … m_37]")
print(SEP)
for i in range(N_SAMPLES):
d_label = "Bullish" if dir_bits[i] == 0 else "Bearish"
pid = int(pat_ids[i])
row_str = np.array2string(ternary[i], separator=',',
max_line_width=400).replace('\n','')
print(f" [{i+1:3d}] {d_label:7s} | ID={pid:>15d} | M={row_str}")
# ─────────────────────────────────────────────────────────────────────────────
# OUTPUT 2 β€” SYMBOLIC SEQUENCES (length 38)
# ─────────────────────────────────────────────────────────────────────────────
print(f"\n{SEP}")
print(f" OUTPUT 2 β€” SYMBOLIC SEQUENCES (Ξ£, length {N_TRANS})")
print(f" Format: [#] Direction | PatternID | Ξ£ = Οƒ_0 Οƒ_1 … Οƒ_37")
print(SEP)
for i in range(N_SAMPLES):
d_label = "Bullish" if dir_bits[i] == 0 else "Bearish"
pid = int(pat_ids[i])
seq_str = ' '.join(sym_matrix[i])
print(f" [{i+1:3d}] {d_label:7s} | ID={pid:>15d} | Ξ£ = {seq_str}")
# ─────────────────────────────────────────────────────────────────────────────
# OUTPUT 3 β€” CHARTS
# (a) Full matplotlib figure: 10Γ—10 grid, saved to PNG
# (b) ASCII CLI preview for first 10 patterns
# ─────────────────────────────────────────────────────────────────────────────
print(f"\n{SEP}")
print(f" OUTPUT 3 β€” DEVELOPING POC CHARTS (100 patterns)")
print(SEP)
# ── x-axis: position 0=oldest C_{-(N-1)}, N-1=newest C_0 ──────────────────
x_pos = np.arange(N, dtype=np.float32)
x_tick_pos = [0, 9, 19, 29, N-1]
x_tick_lbl = [f'C_{{-{N-1}}}', f'C_{{-29}}', f'C_{{-19}}', f'C_{{-9}}', 'C_0']
ROWS, COLS = 10, 10
fig = plt.figure(figsize=(COLS * 3.2, ROWS * 2.0))
fig.suptitle(
f"TopoDevPOC β€” 100 Random Developing POC Patterns "
f"n={N} pre-market 3-min K-lines\n"
f"Total pattern space: 2^{N} = {TOTAL:,} "
f"(Bullish: {HALF:,} | Bearish: {HALF:,})",
fontsize=10, y=1.005
)
for i in range(N_SAMPLES):
ax = fig.add_subplot(ROWS, COLS, i + 1)
poc = poc_disp[i]
is_bull = (dir_bits[i] == 0)
color = '#1a6eb5' if is_bull else '#c0392b'
label = 'B↑' if is_bull else 'B↓'
# Background shade
ax.set_facecolor('#f7f9fc' if is_bull else '#fdf4f4')
# POC line
ax.plot(x_pos, poc, color=color, linewidth=1.2, zorder=3)
# Mark strict-move positions (non-zero ternary in display coords)
# Transition k corresponds to display segment [N-2-k, N-1-k]
# Highlight the newer-side node at display index N-1-k = N-1-k
strict_k = np.where(ternary[i] != 0)[0] # transition indices
strict_disp = (N - 1 - strict_k).astype(int) # display x-positions
if strict_disp.size > 0:
ax.scatter(strict_disp, poc[strict_disp],
color=color, s=5, zorder=5, linewidths=0)
# Flat segments (equality)
flat_k = np.where(ternary[i] == 0)[0]
flat_disp = (N - 1 - flat_k).astype(int)
if flat_disp.size > 0:
ax.scatter(flat_disp, poc[flat_disp],
color='gray', s=3, zorder=4, linewidths=0, alpha=0.5)
n_strict = int(np.abs(ternary[i]).sum())
pid_short = int(pat_ids[i]) % 10**9 # last 9 digits for readability
ax.set_title(f"#{i+1} {label} mv={n_strict} …{pid_short:09d}",
fontsize=5.5, pad=2, color=color)
ax.set_xlim(-0.5, N - 0.5)
ax.set_xticks(x_tick_pos)
ax.set_xticklabels(['←old', '', '', '', 'newβ†’'], fontsize=3.5)
ax.tick_params(axis='y', labelsize=3.5)
ax.yaxis.set_major_locator(mticker.MaxNLocator(4))
for sp in ('top', 'right'):
ax.spines[sp].set_visible(False)
ax.spines['left'].set_color(color)
ax.spines['left'].set_linewidth(1.5)
ax.spines['bottom'].set_color('#cccccc')
plt.tight_layout(rect=[0, 0, 1, 1])
chart_path = "TopoDevPOC_n39_100samples.png"
plt.savefig(chart_path, dpi=110, bbox_inches='tight')
plt.close(fig)
print(f" [SAVED] {chart_path}")
# ── ASCII CLI chart for first 10 patterns ────────────────────────────────────
H = 7 # chart height in rows
W = 39 # chart width = N
print(f"\n ASCII CLI Charts β€” first 10 samples (right side = C_0 = newest)\n")
for i in range(10):
poc = poc_disp[i]
d_lbl = "Bullish" if dir_bits[i] == 0 else "Bearish"
pid = int(pat_ids[i])
n_mv = int(np.abs(ternary[i]).sum())
pmin, pmax = poc.min(), poc.max()
span = pmax - pmin if pmax != pmin else 1.0
# Map each x-position to a row
rows = (H - 1 - ((poc - pmin) / span * (H - 1))).round().astype(int)
rows = np.clip(rows, 0, H - 1)
grid = [[' '] * W for _ in range(H)]
for j in range(W):
r = rows[j]
# Strict-move node in the pair ending at display j:
# transition index k = N-1-j (if j < N-1)
is_strict = (j < N - 1) and (ternary[i, N - 2 - j] != 0)
grid[r][j] = '●' if is_strict else 'Β·'
# Connect with horizontal dash where poc is flat
for row_idx in range(H):
line = grid[row_idx]
for j in range(1, W):
if line[j] == ' ' and rows[j] == row_idx:
line[j] = '-'
print(f" [{i+1:2d}] {d_lbl:7s} | ID={pid} | strict_moves={n_mv}/{N_TRANS}")
poc_hi = poc[N-1]; poc_lo = poc[0]
print(f" POC range: oldest={poc_lo:.0f} β†’ newest={poc_hi:.0f}")
print(f" β”Œ{'─'*W}┐")
for row_idx in range(H):
print(f" β”‚{''.join(grid[row_idx])}β”‚")
print(f" β””{'─'*W}β”˜")
sym_preview = ' '.join(sym_matrix[i, :12]) + ' …'
print(f" Ξ£ (first 12): {sym_preview}\n")
# ─────────────────────────────────────────────────────────────────────────────
# FINAL SUMMARY
# ─────────────────────────────────────────────────────────────────────────────
total_ms = (time.perf_counter() - t_sample) * 1e3
print(SEP)
print(" SUMMARY")
print(SEP)
print(f" n (candles) = {N}")
print(f" Transitions per pattern = {N_TRANS}")
print(f" Total patterns [2^{N}] = {TOTAL:,}")
print(f" Bullish [2^{N-1}] = {HALF:,}")
print(f" Bearish [2^{N-1}] = {HALF:,}")
print(f" Matrix B_n validated = {B_n:,} βœ“")
print(f" Samples generated = {N_SAMPLES}")
print(f" Chart file = {chart_path}")
print(f" Compute device = {DEVICE}")
print(f" Wall-clock (sample+decode) = {total_ms:.2f} ms")
print(SEP)
print()
print(" CONVERSION FORMULAS (tex Sec. V)")
print(" Symbolic β†’ Ternary:")
print(" Bullish: '>' β†’ +1, '=' β†’ 0")
print(" Bearish: '<' β†’ -1, '=' β†’ 0")
print(" Ternary β†’ Symbolic:")
print(" +1 β†’ '>', 0 β†’ '=', -1 β†’ '<'")
print()
print(" TEMPORAL CONVENTION:")
print(f" Chart x-axis: left = C_{{-{N-1}}} (oldest) β†’ right = C_0 (newest)")
print(" Bullish pattern: POC non-increasing toward right (higher on left)")
print(" Bearish pattern: POC non-decreasing toward right (lower on left)")
print(SEP)