TopoDevPOC / TopoDevPOC_n39_save.py
algorembrant's picture
Upload 2 files
d64015b verified
#!/usr/bin/env python3
"""
TopoDevPOC_n39.py
Topologically Unique Developing Point of Control Patterns
Pre-market K-Lines, n = 39 three-minute candlesticks
Paper : TopoDevPOC.tex (ConQ Research Team, Continual Quasars)
Compute: Vectorized NumPy + GPU Torch (T4) + Warp branchless ops
Architectural patterns from core_engine_v11.py (Hyper-Warp Edition)
Outputs (CLI):
1. Total combination count for n=39
2. Matrix state-transition validation
3. 100 random ternary matrices (1Γ—38 each)
4. 100 random symbolic sequences (length-38 strings)
5. 100 developing_poc charts saved as PNG + ASCII CLI preview
6. CSV export β€” bit-packed uint64 pattern_id (39-bit encoding, min bytes)
7. Binary export β€” raw .npy uint64 array (fastest machine read)
COMPRESSION SCHEME:
Each pattern encodes into a single uint64 (8 bytes):
bit 0 : direction (0=Bullish, 1=Bearish)
bits 1..38 : transitions (1=strict, 0=equality)
Decode: direction = id & 1; trans[k] = (id >> (k+1)) & 1
Full matrix + direction reconstructed from one integer in O(1).
Entire 2^39 space = integers [0, 2^39). No file required for enumeration.
Run on Google Colab T4:
!python TopoDevPOC_n39.py
"""
# ── stdlib ────────────────────────────────────────────────────────────────────
import os, sys, time, math, csv, struct
import numpy as np
# ── matplotlib (non-interactive for Colab CLI) ────────────────────────────────
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
# ── GPU setup (T4 Colab) ─────────────────────────────────────────────────────
try:
import torch
HAS_CUDA = torch.cuda.is_available()
DEVICE = 'cuda' if HAS_CUDA else 'cpu'
except ImportError:
HAS_CUDA = False
DEVICE = 'cpu'
torch = None
# Optional Warp (core_engine_v11.py pattern) ─────────────────────────────────
HAS_WARP = False
try:
import warp as wp
wp.init()
wp.set_module_options({"enable_backward": False, "fast_math": True, "max_unroll": 8})
HAS_WARP = bool(wp.get_cuda_devices())
except Exception:
pass
# ─────────────────────────────────────────────────────────────────────────────
# CONSTANTS (tex Section II)
# ─────────────────────────────────────────────────────────────────────────────
N = 39 # pre-market 3-min candles (C_0 … C_{-(n-1)})
N_TRANS = N - 1 # 38 adjacent-pair relations
N_SAMPLES = 100
SEED = int(time.time() * 1000) & 0x7FFFFFFF
SEP = "=" * 74
# ─────────────────────────────────────────────────────────────────────────────
# 0. HEADER
# ─────────────────────────────────────────────────────────────────────────────
print(SEP)
print(" TopoDevPOC β€” Developing POC Pattern Enumerator")
print(f" n = {N} candles | {N_TRANS} transitions | device = {DEVICE}")
if HAS_CUDA and torch:
print(f" GPU = {torch.cuda.get_device_name(0)}")
if HAS_WARP:
print(f" Warp = enabled (branchless kernel path)")
print(SEP)
# ─────────────────────────────────────────────────────────────────────────────
# SECTION 1 β€” COMBINATORIAL ENUMERATION (tex Theorem, Sec. III)
#
# Bullish: each of (n-1) transitions ∈ {>, =} β†’ 2^(n-1) patterns
# Bearish: each of (n-1) transitions ∈ {<, =} β†’ 2^(n-1) patterns
# Total : 2^(n-1) + 2^(n-1) = 2^n (disjoint families)
# n=39 : 2^39 = 549,755,813,888
# ─────────────────────────────────────────────────────────────────────────────
TOTAL = 1 << N # exact Python int (arbitrary precision)
HALF = 1 << (N - 1)
print(f"\n[THEOREM] Total unique developing POC patterns for n={N}")
print(f" Bullish (non-increasing) : 2^{N-1} = {HALF:,}")
print(f" Bearish (non-decreasing) : 2^{N-1} = {HALF:,}")
print(f" Total (2^{N}) : {TOTAL:,}")
# ─────────────────────────────────────────────────────────────────────────────
# SECTION 2 β€” MATRIX STATE-TRANSITION VALIDATION (tex Sec. IV)
#
# States : S_0 (equality), S_Β± (strict move)
# A = [[1,1],[1,1]] (fully-connected 2-state digraph)
# v_0 = [1,1]^T (both states reachable initially)
#
# B_n = 1^T Β· A^(n-2) Β· v_0
# = 2^(n-3) Β· 1^T Β· A Β· 1 (using A^k = 2^(k-1)Β·A for kβ‰₯1)
# = 2^(n-3) Β· 4 = 2^(n-1) per direction
#
# Implementation: exact Python-int matrix exponentiation (no float rounding)
# ─────────────────────────────────────────────────────────────────────────────
def _mm2(A, B):
"""Exact 2Γ—2 matrix multiply with Python arbitrary-precision ints."""
return [
[A[0][0]*B[0][0] + A[0][1]*B[1][0], A[0][0]*B[0][1] + A[0][1]*B[1][1]],
[A[1][0]*B[0][0] + A[1][1]*B[1][0], A[1][0]*B[0][1] + A[1][1]*B[1][1]],
]
def _mpow2(M, k):
"""Fast 2Γ—2 matrix power, exact ints, O(log k)."""
if k == 0: return [[1,0],[0,1]]
if k == 1: return M
h = _mpow2(M, k >> 1)
s = _mm2(h, h)
return s if (k & 1) == 0 else _mm2(s, M)
A_mat = [[1,1],[1,1]]
v0 = [1, 1]
A_pow = _mpow2(A_mat, N - 2)
# 1^T Β· A^(n-2) Β· v0
Av0_0 = A_pow[0][0]*v0[0] + A_pow[0][1]*v0[1]
Av0_1 = A_pow[1][0]*v0[0] + A_pow[1][1]*v0[1]
B_n = Av0_0 + Av0_1 # = 1^T Β· (A^(n-2)Β·v0)
print(f"\n[MATRIX] A = [[1,1],[1,1]] | v_0 = [1,1]^T")
print(f" A^{N-2} = [[{A_pow[0][0]}, {A_pow[0][1]}],")
print(f" [{A_pow[1][0]}, {A_pow[1][1]}]]")
print(f" B_{N} = 1^T Β· A^{N-2} Β· v_0 = {B_n:,}")
print(f" Expected 2^{{n-1}} = {HALF:,}")
assert B_n == HALF, f"Matrix B_n mismatch: {B_n} β‰  {HALF}"
assert 2 * B_n == TOTAL, f"Total mismatch: {2*B_n} β‰  {TOTAL}"
print(f" [OK] 2 Γ— {B_n:,} = {TOTAL:,} βœ“")
# ─────────────────────────────────────────────────────────────────────────────
# SECTION 3 β€” PATTERN ENCODING SCHEME
#
# Each of the 2^39 patterns maps bijectively to a 39-bit integer:
# bit 0 : direction (0 = Bullish, 1 = Bearish)
# bits 1 … N-1 : transitions (1 = strict move, 0 = equality)
#
# pattern_id ∈ [0, 2^39) uniquely identifies every valid pattern.
#
# Ternary matrix m ∈ {+1,0}^(n-1) for bullish,
# m ∈ {-1,0}^(n-1) for bearish.
# Bijection: m_k = sign(direction) Γ— trans_bit_k
# ─────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────────────────────────────────────
# SECTION 4 β€” GPU-ACCELERATED RANDOM SAMPLING
# Generate (N_SAMPLES Γ— N) binary matrix at once on T4 GPU.
# Inspired by core_engine_v11.py XOR-shift PRNG kernel design.
# bits[:,0] = direction flags
# bits[:,1:] = N_TRANS transition flags per pattern
# ─────────────────────────────────────────────────────────────────────────────
t_sample = time.perf_counter()
if HAS_WARP:
# ── Warp branchless path (core_engine_v11.py style) ─────────────────────
# Launch one thread per sample; XOR-shift fills N bits per thread.
_seed_wp = SEED
@wp.kernel
def k_rng_bits(seed: int, N_cols: int,
out: wp.array(dtype=wp.int8)):
tid = wp.tid()
rng = wp.uint32(seed) ^ wp.uint32(tid)
if rng == wp.uint32(0):
rng = wp.uint32(123456789)
base = tid * N_cols
for col in range(N_cols):
rng = rng ^ (rng << wp.uint32(13))
rng = rng ^ (rng >> wp.uint32(17))
rng = rng ^ (rng << wp.uint32(5))
out[base + col] = wp.int8(int(rng) & 1)
out_wp = wp.zeros(N_SAMPLES * N, dtype=wp.int8, device='cuda')
wp.launch(k_rng_bits, dim=N_SAMPLES, block_dim=128,
inputs=[_seed_wp, N, out_wp], device='cuda')
wp.synchronize()
bits_np = out_wp.numpy().reshape(N_SAMPLES, N)
print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled via Warp XOR-shift kernel")
elif HAS_CUDA and torch is not None:
# ── Torch GPU path ───────────────────────────────────────────────────────
gen = torch.Generator(device='cuda')
gen.manual_seed(SEED)
bits_t = torch.randint(0, 2, (N_SAMPLES, N), device='cuda',
generator=gen, dtype=torch.int8)
bits_np = bits_t.cpu().numpy()
print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled on GPU (torch.randint)")
else:
# ── CPU NumPy fallback ────────────────────────────────────────────────────
rng_cpu = np.random.default_rng(SEED)
bits_np = rng_cpu.integers(0, 2, size=(N_SAMPLES, N), dtype=np.int8)
print(f"\n[SAMPLE] {N_SAMPLES} patterns sampled on CPU (NumPy)")
sample_ms = (time.perf_counter() - t_sample) * 1e3
print(f" Sampling time: {sample_ms:.2f} ms")
# ─────────────────────────────────────────────────────────────────────────────
# SECTION 5 β€” VECTORISED DECODING (NumPy, no Python loops over patterns)
#
# bits[:,0] β†’ direction array (0=Bullish, 1=Bearish), shape (100,)
# bits[:,1:] β†’ trans array, shape (100, 38)
# ternary_mat: +trans if Bullish, -trans if Bearish β†’ shape (100, 38)
# sign: Bullish→+1, Bearish→-1, broadcast multiply
# ─────────────────────────────────────────────────────────────────────────────
dir_bits = bits_np[:, 0].astype(np.int8) # 0 or 1
trans = bits_np[:, 1:].astype(np.int8) # (100,38), values 0 or 1
signs = 1 - 2 * dir_bits # 0β†’+1 (bull), 1β†’-1 (bear)
ternary = (signs[:, None] * trans).astype(np.int8) # (100,38): +1/0/-1
# Compact 39-bit pattern IDs
pows64 = (np.uint64(1) << np.arange(N, dtype=np.uint64))
pat_ids = (bits_np.astype(np.uint64) * pows64[None, :]).sum(axis=1)
# Symbolic sequences: vectorised char lookup
# Bullish (sign=+1): trans=1 β†’ '>', trans=0 β†’ '='
# Bearish (sign=-1): trans=1 β†’ '<', trans=0 β†’ '='
SYM_BULL = np.array(['=', '>'], dtype='<U1') # index by trans bit
SYM_BEAR = np.array(['=', '<'], dtype='<U1')
sym_matrix = np.where(dir_bits[:, None] == 0,
SYM_BULL[trans],
SYM_BEAR[trans]) # (100, 38)
# POC price sequence (right-to-left: p[0]=C_0=newest, p[N-1]=C_{-(N-1)}=oldest)
# p_raw[i, k] = POC of candle C_{-k} for sample i
# Transition k: p[k] = p[k+1] + ternary[k]
# Build by cumsum from oldest→newest: p_raw[:, N-1] = 50, then forward
BASE = 50.0
step = 1.0
p_raw = np.zeros((N_SAMPLES, N), dtype=np.float32)
p_raw[:, N-1] = BASE
# Vectorised: cumulative sum of ternary (columns N-2 down to 0)
for k in range(N-2, -1, -1):
p_raw[:, k] = p_raw[:, k+1] + ternary[:, k].astype(np.float32) * step
# Display order: oldest(left) β†’ newest(right)
# poc_disp[:, j] = p_raw[:, N-1-j]
poc_disp = p_raw[:, ::-1].copy() # (100, 39), column 0=oldest, col N-1=newest
# ─────────────────────────────────────────────────────────────────────────────
# OUTPUT 1 β€” TERNARY MATRICES (100 Γ— 38)
# ─────────────────────────────────────────────────────────────────────────────
print(f"\n{SEP}")
print(f" OUTPUT 1 β€” TERNARY MATRICES (1Γ—{N_TRANS}, values ∈ {{-1,0,+1}})")
print(f" Format: [#] Direction | PatternID | M = [m_0 … m_37]")
print(SEP)
for i in range(N_SAMPLES):
d_label = "Bullish" if dir_bits[i] == 0 else "Bearish"
pid = int(pat_ids[i])
row_str = np.array2string(ternary[i], separator=',',
max_line_width=400).replace('\n','')
print(f" [{i+1:3d}] {d_label:7s} | ID={pid:>15d} | M={row_str}")
# ─────────────────────────────────────────────────────────────────────────────
# OUTPUT 2 β€” SYMBOLIC SEQUENCES (length 38)
# ─────────────────────────────────────────────────────────────────────────────
print(f"\n{SEP}")
print(f" OUTPUT 2 β€” SYMBOLIC SEQUENCES (Ξ£, length {N_TRANS})")
print(f" Format: [#] Direction | PatternID | Ξ£ = Οƒ_0 Οƒ_1 … Οƒ_37")
print(SEP)
for i in range(N_SAMPLES):
d_label = "Bullish" if dir_bits[i] == 0 else "Bearish"
pid = int(pat_ids[i])
seq_str = ' '.join(sym_matrix[i])
print(f" [{i+1:3d}] {d_label:7s} | ID={pid:>15d} | Ξ£ = {seq_str}")
# ─────────────────────────────────────────────────────────────────────────────
# OUTPUT 3 β€” CHARTS
# (a) Full matplotlib figure: 10Γ—10 grid, saved to PNG
# (b) ASCII CLI preview for first 10 patterns
# ─────────────────────────────────────────────────────────────────────────────
print(f"\n{SEP}")
print(f" OUTPUT 3 β€” DEVELOPING POC CHARTS (100 patterns)")
print(SEP)
# ── x-axis: position 0=oldest C_{-(N-1)}, N-1=newest C_0 ──────────────────
x_pos = np.arange(N, dtype=np.float32)
x_tick_pos = [0, 9, 19, 29, N-1]
x_tick_lbl = [f'C_{{-{N-1}}}', f'C_{{-29}}', f'C_{{-19}}', f'C_{{-9}}', 'C_0']
ROWS, COLS = 10, 10
fig = plt.figure(figsize=(COLS * 3.2, ROWS * 2.0))
fig.suptitle(
f"TopoDevPOC β€” 100 Random Developing POC Patterns "
f"n={N} pre-market 3-min K-lines\n"
f"Total pattern space: 2^{N} = {TOTAL:,} "
f"(Bullish: {HALF:,} | Bearish: {HALF:,})",
fontsize=10, y=1.005
)
for i in range(N_SAMPLES):
ax = fig.add_subplot(ROWS, COLS, i + 1)
poc = poc_disp[i]
is_bull = (dir_bits[i] == 0)
color = '#1a6eb5' if is_bull else '#c0392b'
label = 'B↑' if is_bull else 'B↓'
# Background shade
ax.set_facecolor('#f7f9fc' if is_bull else '#fdf4f4')
# POC line
ax.plot(x_pos, poc, color=color, linewidth=1.2, zorder=3)
# Mark strict-move positions (non-zero ternary in display coords)
# Transition k corresponds to display segment [N-2-k, N-1-k]
# Highlight the newer-side node at display index N-1-k = N-1-k
strict_k = np.where(ternary[i] != 0)[0] # transition indices
strict_disp = (N - 1 - strict_k).astype(int) # display x-positions
if strict_disp.size > 0:
ax.scatter(strict_disp, poc[strict_disp],
color=color, s=5, zorder=5, linewidths=0)
# Flat segments (equality)
flat_k = np.where(ternary[i] == 0)[0]
flat_disp = (N - 1 - flat_k).astype(int)
if flat_disp.size > 0:
ax.scatter(flat_disp, poc[flat_disp],
color='gray', s=3, zorder=4, linewidths=0, alpha=0.5)
n_strict = int(np.abs(ternary[i]).sum())
pid_short = int(pat_ids[i]) % 10**9 # last 9 digits for readability
ax.set_title(f"#{i+1} {label} mv={n_strict} …{pid_short:09d}",
fontsize=5.5, pad=2, color=color)
ax.set_xlim(-0.5, N - 0.5)
ax.set_xticks(x_tick_pos)
ax.set_xticklabels(['←old', '', '', '', 'newβ†’'], fontsize=3.5)
ax.tick_params(axis='y', labelsize=3.5)
ax.yaxis.set_major_locator(mticker.MaxNLocator(4))
for sp in ('top', 'right'):
ax.spines[sp].set_visible(False)
ax.spines['left'].set_color(color)
ax.spines['left'].set_linewidth(1.5)
ax.spines['bottom'].set_color('#cccccc')
plt.tight_layout(rect=[0, 0, 1, 1])
chart_path = "TopoDevPOC_n39_100samples.png"
plt.savefig(chart_path, dpi=110, bbox_inches='tight')
plt.close(fig)
print(f" [SAVED] {chart_path}")
# ── ASCII CLI chart for first 10 patterns ────────────────────────────────────
H = 7 # chart height in rows
W = 39 # chart width = N
print(f"\n ASCII CLI Charts β€” first 10 samples (right side = C_0 = newest)\n")
for i in range(10):
poc = poc_disp[i]
d_lbl = "Bullish" if dir_bits[i] == 0 else "Bearish"
pid = int(pat_ids[i])
n_mv = int(np.abs(ternary[i]).sum())
pmin, pmax = poc.min(), poc.max()
span = pmax - pmin if pmax != pmin else 1.0
# Map each x-position to a row
rows = (H - 1 - ((poc - pmin) / span * (H - 1))).round().astype(int)
rows = np.clip(rows, 0, H - 1)
grid = [[' '] * W for _ in range(H)]
for j in range(W):
r = rows[j]
# Strict-move node in the pair ending at display j:
# transition index k = N-1-j (if j < N-1)
is_strict = (j < N - 1) and (ternary[i, N - 2 - j] != 0)
grid[r][j] = '●' if is_strict else 'Β·'
# Connect with horizontal dash where poc is flat
for row_idx in range(H):
line = grid[row_idx]
for j in range(1, W):
if line[j] == ' ' and rows[j] == row_idx:
line[j] = '-'
print(f" [{i+1:2d}] {d_lbl:7s} | ID={pid} | strict_moves={n_mv}/{N_TRANS}")
poc_hi = poc[N-1]; poc_lo = poc[0]
print(f" POC range: oldest={poc_lo:.0f} β†’ newest={poc_hi:.0f}")
print(f" β”Œ{'─'*W}┐")
for row_idx in range(H):
print(f" β”‚{''.join(grid[row_idx])}β”‚")
print(f" β””{'─'*W}β”˜")
sym_preview = ' '.join(sym_matrix[i, :12]) + ' …'
print(f" Ξ£ (first 12): {sym_preview}\n")
# ─────────────────────────────────────────────────────────────────────────────
# OUTPUT 4 β€” COMPRESSED CSV EXPORT (bit-packed uint64 pattern_id)
#
# COMPRESSION LOGIC:
# Naive CSV text per row (direction label + 38 integers): ~110 bytes/row
# Compressed: single uint64 hex string per row ~ 18 bytes/row
# Reduction: ~6Γ— on text CSV; binary .npy = 8 bytes/row (machine-optimal)
#
# ENCODING (39-bit integer into uint64):
# pattern_id = dir_bit | (trans[0]<<1) | (trans[1]<<2) | … | (trans[37]<<38)
# bit 0 = direction (0=Bullish, 1=Bearish)
# bits 1..38 = transition flags
#
# DECODE (one-liner, O(1)):
# direction = "Bullish" if (pid & 1) == 0 else "Bearish"
# trans[k] = (pid >> (k+1)) & 1 for k in 0..37
# ternary[k] = trans[k] if Bullish else -trans[k]
#
# NOTE ON "ALL PATTERNS":
# 2^39 = 549,755,813,888 patterns. At 8 bytes/pattern β†’ 4.4 TB.
# At minimum 39 bits/pattern (bit-packed) β†’ 2.68 TB.
# The pattern space IS [0, 2^39). Pattern k = integer k. No file needed.
# Use the streaming generator below for on-demand enumeration.
# ─────────────────────────────────────────────────────────────────────────────
print(f"\n{SEP}")
print(f" OUTPUT 4 β€” COMPRESSED EXPORT")
print(SEP)
# ── Verify encoding round-trip before writing ─────────────────────────────
def encode_pattern_id(dir_bit, trans_bits):
"""Pack direction + 38 transition bits into one uint64."""
pid = np.uint64(dir_bit)
for k, t in enumerate(trans_bits):
pid |= (np.uint64(int(t)) << np.uint64(k + 1))
return int(pid)
def decode_pattern_id(pid, n_trans=38):
"""Unpack uint64 β†’ direction + ternary matrix. O(1) per pattern."""
pid = int(pid)
dir_bit = pid & 1
direction = "Bullish" if dir_bit == 0 else "Bearish"
sign = 1 if dir_bit == 0 else -1
trans = np.array([(pid >> (k + 1)) & 1 for k in range(n_trans)], dtype=np.int8)
ternary = (sign * trans).astype(np.int8)
return direction, trans, ternary
# Round-trip verification on all 100 samples
for i in range(N_SAMPLES):
pid_ref = int(pat_ids[i])
pid_enc = encode_pattern_id(int(dir_bits[i]), trans[i])
assert pid_ref == pid_enc, f"Encode mismatch at sample {i}"
d2, t2, m2 = decode_pattern_id(pid_enc)
assert (d2 == ("Bullish" if dir_bits[i] == 0 else "Bearish")), "Direction mismatch"
assert np.array_equal(m2, ternary[i]), f"Ternary mismatch at sample {i}"
print(" [OK] 100-sample encode/decode round-trip verified")
# ── CSV: bit-packed uint64 β€” one integer per row ──────────────────────────
# Columns: seq_id (1-based), pattern_id_uint64 (hex), direction, n_strict
# Total row bytes: ~50 chars vs ~110 for expanded matrix text
CSV_PATH = "TopoDevPOC_n39_samples.csv"
with open(CSV_PATH, 'w', newline='') as f:
writer = csv.writer(f)
# Header β€” human-readable but compact
writer.writerow([
"seq_id", # 1-based sample index
"pattern_id_uint64",# hex string β€” encodes everything, 18 chars
"pattern_id_dec", # decimal for inspection
"direction", # B+ or B- (1 char each saves space vs full label)
"n_strict_moves", # popcount of transitions
])
for i in range(N_SAMPLES):
pid = int(pat_ids[i])
d_char = "B+" if dir_bits[i] == 0 else "B-"
n_mv = int(np.abs(ternary[i]).sum())
writer.writerow([
i + 1,
f"0x{pid:016X}", # 18-char hex β€” decode with int(x, 16)
pid,
d_char,
n_mv,
])
csv_bytes = os.path.getsize(CSV_PATH)
print(f" [CSV] {CSV_PATH} β†’ {csv_bytes:,} bytes ({csv_bytes/N_SAMPLES:.1f} bytes/row)")
# ── Binary .npy β€” uint64 array, 8 bytes/row, machine-optimal ─────────────
NPY_PATH = "TopoDevPOC_n39_samples.npy"
np.save(NPY_PATH, pat_ids.astype(np.uint64))
npy_bytes = os.path.getsize(NPY_PATH)
print(f" [NPY] {NPY_PATH} β†’ {npy_bytes:,} bytes ({(npy_bytes-128)/N_SAMPLES:.1f} bytes/row payload)")
# ── Raw bit-packed .bin β€” 39 bits/pattern β†’ ceil(39/8)=5 bytes each ──────
# Pack 8 patterns Γ— 39 bits = 312 bits = 39 bytes per block (tight packing)
# Simple implementation: pack pattern_ids as 5-byte little-endian chunks
BIN_PATH = "TopoDevPOC_n39_samples.bin"
BYTES_PER = 5 # ceil(39 / 8) = 5 bytes holds one 39-bit pattern_id
with open(BIN_PATH, 'wb') as f:
# 8-byte magic + 4-byte n_patterns + 1-byte bits_per_pattern
f.write(b'TPOC3900')
f.write(struct.pack('<IB', N_SAMPLES, 39))
for i in range(N_SAMPLES):
pid = int(pat_ids[i])
f.write(pid.to_bytes(BYTES_PER, byteorder='little'))
bin_bytes = os.path.getsize(BIN_PATH)
payload = N_SAMPLES * BYTES_PER
print(f" [BIN] {BIN_PATH} β†’ {bin_bytes:,} bytes (13 hdr + {payload} payload = {BYTES_PER} bytes/pattern)")
# ── Print size comparison ─────────────────────────────────────────────────
print()
print(f" SIZE COMPARISON (100 samples, n=39, {N_TRANS} transitions):")
print(f" Naive text (direction + 38 ints): ~{100 * 110:,} bytes")
print(f" CSV hex uint64 (this file): {csv_bytes:,} bytes ({100*110//csv_bytes:.1f}Γ— smaller)")
print(f" NumPy .npy uint64: {npy_bytes:,} bytes")
print(f" Raw bit-packed .bin (39 bits each): {bin_bytes:,} bytes ({100*110//bin_bytes:.1f}Γ— smaller)")
print()
print(f" SCALING TO ALL 2^{N} = {TOTAL:,} PATTERNS:")
full_npy_tb = (TOTAL * 8) / 1e12
full_bin_tb = (TOTAL * BYTES_PER) / 1e12
full_text_tb = (TOTAL * 110) / 1e12
print(f" Naive text: ~{full_text_tb:.1f} TB (infeasible)")
print(f" uint64 binary (.npy): ~{full_npy_tb:.2f} TB (infeasible to store)")
print(f" 5-byte bit-packed: ~{full_bin_tb:.2f} TB (infeasible to store)")
print(f" Optimal: 0 bytes β€” pattern k = integer k, range [0, 2^{N})")
print(f" Use streaming generator below for on-demand enumeration.")
# ── Streaming generator (no memory, yields all 2^39 on-demand) ───────────
def stream_all_patterns(n=N):
"""
Yields (pattern_id, direction, ternary_matrix) for all 2^n patterns.
Zero memory beyond one pattern at a time. Works for any n.
Pattern_id k: bit0=direction, bits1..n-1=transitions.
"""
sign_map = {0: 1, 1: -1}
for k in range(1 << n):
dir_bit = k & 1
sign = sign_map[dir_bit]
direction = "Bullish" if dir_bit == 0 else "Bearish"
ternary_k = np.array(
[sign * ((k >> (j + 1)) & 1) for j in range(n - 1)],
dtype=np.int8
)
yield k, direction, ternary_k
# Show the first 4 and last 4 pattern_ids to confirm ordering
print()
print(f" STREAMING GENERATOR β€” first 4 / last 4 of all {TOTAL:,} patterns:")
gen = stream_all_patterns(N)
for _ in range(4):
pid_s, d_s, _ = next(gen)
print(f" pattern_id={pid_s:>3d} direction={d_s}")
print(f" ... ({TOTAL - 8:,} patterns omitted) ...")
# Last 4: decode directly from known IDs
for pid_s in range(TOTAL - 4, TOTAL):
d_s = "Bullish" if (pid_s & 1) == 0 else "Bearish"
print(f" pattern_id={pid_s} direction={d_s}")
# ─────────────────────────────────────────────────────────────────────────────
# FINAL SUMMARY
# ─────────────────────────────────────────────────────────────────────────────
total_ms = (time.perf_counter() - t_sample) * 1e3
print(SEP)
print(" SUMMARY")
print(SEP)
print(f" n (candles) = {N}")
print(f" Transitions per pattern = {N_TRANS}")
print(f" Total patterns [2^{N}] = {TOTAL:,}")
print(f" Bullish [2^{N-1}] = {HALF:,}")
print(f" Bearish [2^{N-1}] = {HALF:,}")
print(f" Matrix B_n validated = {B_n:,} βœ“")
print(f" Samples generated = {N_SAMPLES}")
print(f" Chart file = {chart_path}")
print(f" CSV export (hex uint64) = {CSV_PATH} ({csv_bytes:,} bytes)")
print(f" Binary export (.npy) = {NPY_PATH} ({npy_bytes:,} bytes)")
print(f" Bit-packed export (.bin) = {BIN_PATH} ({bin_bytes:,} bytes)")
print(f" Compute device = {DEVICE}")
print(f" Wall-clock (sample+decode) = {total_ms:.2f} ms")
print(SEP)
print()
print(" CONVERSION FORMULAS (tex Sec. V)")
print(" Symbolic β†’ Ternary:")
print(" Bullish: '>' β†’ +1, '=' β†’ 0")
print(" Bearish: '<' β†’ -1, '=' β†’ 0")
print(" Ternary β†’ Symbolic:")
print(" +1 β†’ '>', 0 β†’ '=', -1 β†’ '<'")
print()
print(" TEMPORAL CONVENTION:")
print(f" Chart x-axis: left = C_{{-{N-1}}} (oldest) β†’ right = C_0 (newest)")
print(" Bullish pattern: POC non-increasing toward right (higher on left)")
print(" Bearish pattern: POC non-decreasing toward right (lower on left)")
print(SEP)