#!/usr/bin/env python3 # Copyright Daniel Harding - RomanAILabs # Credits: Nova (GPT-5.2 Thinking) """ tessellation_4d.py (Original-style + Qwen GGUF Chat) — Context-Safe Edition ========================================================================== Fixes the crash you hit: - "Requested tokens exceed context window of 4096" How: - The context block is now *summaries only* (no big arrays). - HyperMemory payloads are compacted before sending to the model. - max_tokens is automatically clamped so prompt+completion stays under n_ctx. Requirements: pip install -U numpy llama-cpp-python Run: python3 tessellation_4d.py Commands: /help /perceive /drift /step [n] /mem [k] /clear /exit Model: - Qwen3-4B-Instruct-2507-Q4_K_M.gguf must be in the same folder as this script. Optional env vars: TESSERACT_CTX=4096 TESSERACT_THREADS=0 TESSERACT_N_GPU_LAYERS=0 """ import os import time import json import math import numpy as np from dataclasses import dataclass from typing import List, Tuple, Any, Optional # ------------------------------------------------------------------- # "sensorio" stubs (kept naming) # ------------------------------------------------------------------- class Sensor: def readings(self) -> np.ndarray: raise NotImplementedError class DepthSensor(Sensor): def __init__(self): self._last = np.zeros((10,), dtype=np.float64) def readings(self) -> np.ndarray: self._last = np.random.rand(10).astype(np.float64) return self._last class InfraredSensor(Sensor): def __init__(self): self._last = np.zeros((10,), dtype=np.float64) def readings(self) -> np.ndarray: self._last = np.random.uniform(0.0, 100.0, 10).astype(np.float64) return self._last # ------------------------------------------------------------------- # "physicsio" stub # ------------------------------------------------------------------- @dataclass class PhysicalObject: mass: float = 1.0 class Physics: def __init__(self): self.gravity = np.array([0.0, -9.81, 0.0], dtype=np.float64) def apply_forces(self, obj: PhysicalObject) -> np.ndarray: return float(obj.mass) * self.gravity # ------------------------------------------------------------------- # "neuralnet" stub (simple linear layers) # ------------------------------------------------------------------- class NeuralNetwork: def __init__(self): self.layers = [ {"neurons": 10}, {"neurons": 20}, {"neurons": 30}, ] self._weights = [] in_dim = 4 rng = np.random.default_rng(1337) for layer in self.layers: out_dim = int(layer["neurons"]) w = rng.normal(0.0, 0.2, size=(in_dim, out_dim)).astype(np.float64) self._weights.append(w) in_dim = out_dim def predict(self, data: np.ndarray) -> np.ndarray: x = np.asarray(data, dtype=np.float64) if x.ndim == 1: x = x.reshape(1, -1) for w in self._weights: x = x @ w x = np.tanh(x) return x # ------------------------------------------------------------------- # "geometrytransformations" stub # ------------------------------------------------------------------- class Transformations: def __init__(self): self.rotations = [ self.Rotate_x(90), self.Rotate_y(45), ] def Rotate_x(self, deg: float): rad = math.radians(float(deg)) c, s = math.cos(rad), math.sin(rad) def _rot(v3: np.ndarray) -> np.ndarray: v3 = np.asarray(v3, dtype=np.float64).reshape(3) x, y, z = v3[0], v3[1], v3[2] return np.array([x, (y * c - z * s), (y * s + z * c)], dtype=np.float64) return _rot def Rotate_y(self, deg: float): rad = math.radians(float(deg)) c, s = math.cos(rad), math.sin(rad) def _rot(v3: np.ndarray) -> np.ndarray: v3 = np.asarray(v3, dtype=np.float64).reshape(3) x, y, z = v3[0], v3[1], v3[2] return np.array([(x * c + z * s), y, (-x * s + z * c)], dtype=np.float64) return _rot def project_3d_to_4d(self, points: np.ndarray, axes: np.ndarray) -> np.ndarray: p = np.asarray(points, dtype=np.float64) if p.ndim == 1: p = p.reshape(1, -1) if p.shape[1] < 3: p = np.pad(p, ((0, 0), (0, 3 - p.shape[1])), mode="constant", constant_values=0.0) p = p[:, :3] a = np.asarray(axes, dtype=np.float64).flatten() if a.size < 2: a = np.pad(a, (0, 2 - a.size), mode="constant", constant_values=0.0) ax0, ax1 = float(a[0]), float(a[1]) out = [] for row in p: v = row v = self.rotations[0](v) v = self.rotations[1](v) x, y, z = float(v[0]), float(v[1]), float(v[2]) w = (x * ax0) + (y * ax1) + math.tanh(z) out.append([x, y, z, w]) return np.asarray(out, dtype=np.float64) # ------------------------------------------------------------------- # "spatial reasoning" stub # ------------------------------------------------------------------- class SpatialReasoning: def __init__(self): self.simplification = [] def simplify(self, data: np.ndarray) -> np.ndarray: x = np.asarray(data, dtype=np.float64) if x.ndim == 1: x = x.reshape(1, -1) if x.shape[0] == 1: return x smoothed = [] for i in range(x.shape[0]): a = x[max(0, i - 1)] b = x[i] c = x[min(x.shape[0] - 1, i + 1)] smoothed.append((a + b + c) / 3.0) return np.asarray(smoothed, dtype=np.float64) # ------------------------------------------------------------------- # Tesseract Node # ------------------------------------------------------------------- class TesseractNode: def __init__(self): self.sensors = [DepthSensor(), InfraredSensor()] self.physics = Physics() self.neuralnetwork = NeuralNetwork() self.transformations = Transformations() self.spatial_reasoning = SpatialReasoning() def perceive(self) -> List[Tuple[np.ndarray, np.ndarray]]: perceptions: List[Tuple[np.ndarray, np.ndarray]] = [] depth_readings = None ir_readings = None for sensor in self.sensors: if isinstance(sensor, DepthSensor): depth_readings = sensor.readings() elif isinstance(sensor, InfraredSensor): ir_readings = sensor.readings() else: raise ValueError("TesseractNode can only handle DepthSensor and InfraredSensor") if depth_readings is None or ir_readings is None: return perceptions depth_norm = self.normalize(depth_readings) ir_norm = self.normalize(ir_readings) depth_points = self._to_points3(depth_norm) ir_points = self._to_points3(ir_norm) transformed_depth = self.transformations.project_3d_to_4d(depth_points, np.array([0.7, 0.2])) transformed_ir = self.transformations.project_3d_to_4d(ir_points, np.array([0.3, 0.1])) perceptions.append((transformed_ir, transformed_depth)) return perceptions def process_perceptions(self, perceptions: List[Tuple[np.ndarray, np.ndarray]]) -> List[np.ndarray]: processed_perception: List[np.ndarray] = [] for perceived in perceptions: perceived_ir_4d, perceived_depth_4d = perceived combined = self._combine_modalities(perceived_ir_4d, perceived_depth_4d) spatialized = self.spatial_reasoning.simplify(combined) predicted_outcome = self.neuralnetwork.predict(spatialized) processed_perception.append(predicted_outcome) return processed_perception def normalize(self, data: np.ndarray) -> np.ndarray: mean = np.mean(data) std = np.std(data) if float(std) <= 1e-12: return np.asarray(data, dtype=np.float64) - float(mean) return (np.asarray(data, dtype=np.float64) - float(mean)) / float(std) def transform(self, normalized_data: np.ndarray) -> List[np.ndarray]: pts3 = self._to_points3(np.asarray(normalized_data, dtype=np.float64)) rotated_then_4d = self.transformations.project_3d_to_4d(pts3, np.array([0.5, 0.5])) return [rotated_then_4d] def _to_points3(self, vec: np.ndarray) -> np.ndarray: v = np.asarray(vec, dtype=np.float64).flatten() if v.size < 3: v = np.pad(v, (0, 3 - v.size), mode="constant", constant_values=0.0) pts = [] for i in range(0, v.size - 2): pts.append([float(v[i]), float(v[i + 1]), float(v[i + 2])]) return np.asarray(pts, dtype=np.float64) def _combine_modalities(self, ir4d: np.ndarray, depth4d: np.ndarray) -> np.ndarray: a = np.asarray(ir4d, dtype=np.float64) b = np.asarray(depth4d, dtype=np.float64) n = int(min(a.shape[0], b.shape[0])) if n <= 0: return np.zeros((1, 4), dtype=np.float64) return (a[:n] + b[:n]) / 2.0 # ------------------------------------------------------------------- # 4D Drift + HyperMemory # ------------------------------------------------------------------- C_M_PER_S = 299_792_458.0 @dataclass(frozen=True) class Vec4: ct_m: float x_m: float y_m: float z_m: float def to_dict(self) -> dict: return { "ct_m": float(self.ct_m), "t_s": float(self.ct_m / C_M_PER_S), "x_m": float(self.x_m), "y_m": float(self.y_m), "z_m": float(self.z_m), } def dist(self, other: "Vec4") -> float: dct = self.ct_m - other.ct_m dx = self.x_m - other.x_m dy = self.y_m - other.y_m dz = self.z_m - other.z_m return math.sqrt(dct*dct + dx*dx + dy*dy + dz*dz) @dataclass class MemoryItem: coord: Vec4 payload: dict created_s: float class DriftEngine: def __init__(self, step_m: float = 0.25, step_s: float = 0.0000005): self.step_m = max(0.0, float(step_m)) self.step_s = max(0.0, float(step_s)) self.rng = np.random.default_rng(int.from_bytes(os.urandom(8), "big", signed=False)) self.pos = Vec4(ct_m=time.time() * C_M_PER_S, x_m=0.0, y_m=0.0, z_m=0.0) def step(self, n: int = 1) -> Vec4: for _ in range(max(1, int(n))): dt_s = float(self.rng.uniform(-self.step_s, self.step_s)) dx = float(self.rng.uniform(-self.step_m, self.step_m)) dy = float(self.rng.uniform(-self.step_m, self.step_m)) dz = float(self.rng.uniform(-self.step_m, self.step_m)) self.pos = Vec4( ct_m=self.pos.ct_m + dt_s * C_M_PER_S, x_m=self.pos.x_m + dx, y_m=self.pos.y_m + dy, z_m=self.pos.z_m + dz, ) return self.pos class HyperMemory: def __init__(self): self.items: List[MemoryItem] = [] def add(self, coord: Vec4, payload: dict) -> None: self.items.append(MemoryItem(coord=coord, payload=payload, created_s=time.time())) if len(self.items) > 2000: self.items = self.items[-2000:] def nearest(self, center: Vec4, k: int = 6) -> List[Tuple[MemoryItem, float]]: scored = [(it, center.dist(it.coord)) for it in self.items] scored.sort(key=lambda x: x[1]) return scored[:max(1, min(int(k), 32))] # ------------------------------------------------------------------- # GGUF Chat Runtime # ------------------------------------------------------------------- class GGUFChat: def __init__(self, model_path: str, ctx: int = 4096, threads: int = 0, n_gpu_layers: int = 0): self.model_path = model_path self.ctx = int(ctx) self.threads = int(threads) self.n_gpu_layers = int(n_gpu_layers) self.llm = None self._why = "" self._init() def _init(self) -> None: if not os.path.exists(self.model_path): self._why = f"GGUF not found at: {self.model_path}" return try: from llama_cpp import Llama # type: ignore kwargs = { "model_path": self.model_path, "n_ctx": self.ctx, "n_gpu_layers": self.n_gpu_layers, } if self.threads > 0: kwargs["n_threads"] = self.threads self.llm = Llama(**kwargs) except Exception as e: self._why = f"Failed to init llama-cpp-python: {e!r}" def ready(self) -> bool: return self.llm is not None def why(self) -> str: return self._why def chat(self, system: str, user: str, context_block: str, temperature: float, max_tokens: int) -> str: if not self.llm: raise RuntimeError(self._why or "LLM not initialized") messages = [ {"role": "system", "content": system}, {"role": "user", "content": context_block + "\n\nUser:\n" + user}, ] out = self.llm.create_chat_completion( messages=messages, temperature=float(max(0.0, min(2.0, temperature))), max_tokens=int(max(1, min(2048, max_tokens))), stream=False, ) return out["choices"][0]["message"]["content"] # ------------------------------------------------------------------- # Prompting (“super power” awareness) # ------------------------------------------------------------------- def system_prompt_superpower() -> str: return ( "You are RomanAILabs TESSERACT-NODE (tessellation_4d prototype) running locally.\n" "SUPER POWER (4th dimension): you perceive a live 4D drift coordinate (ct,x,y,z) that shifts every turn.\n" "You can access HyperMemory items retrieved by 4D proximity to the current drift coordinate.\n" "When asked to 'do things in the 4th dimension', interpret that as drifting, retrieving nearby memories,\n" "and reasoning about how drift changes what you can recall.\n" "Be grounded: only use the provided memory/perception summaries; do not invent sensor data.\n" ) # ------------------------------------------------------------------- # Token-safe context block (SUMMARY ONLY) # ------------------------------------------------------------------- def _np_summary(arr: np.ndarray) -> dict: x = np.asarray(arr, dtype=np.float64) return { "shape": list(x.shape), "mean": float(np.mean(x)), "std": float(np.std(x)), "min": float(np.min(x)), "max": float(np.max(x)), } def _compact_snapshot(snapshot: Optional[dict]) -> Optional[dict]: """ Convert a big snapshot into a small one. """ if snapshot is None: return None out = {"perceptions": [], "processed": []} try: for p in snapshot.get("perceptions", [])[:1]: out["perceptions"].append({ "ir4d_shape": p.get("ir4d_shape"), "depth4d_shape": p.get("depth4d_shape"), }) for pr in snapshot.get("processed", [])[:1]: out["processed"].append({ "prediction_summary": pr.get("prediction_summary"), }) except Exception: return None return out def _compact_memory_payload(payload: dict) -> dict: """ Prevent large payloads from exploding context. """ if not isinstance(payload, dict): return {"type": "unknown"} t = payload.get("type", "unknown") if t == "perception_snapshot": # store only summary here snap = payload.get("snapshot", None) return {"type": "perception_snapshot", "snapshot": _compact_snapshot(snap)} # generic small = {"type": t} # include tiny bits if they exist for k in ("text", "note", "tag"): if k in payload and isinstance(payload[k], str): small[k] = payload[k][:200] return small def build_context_block(drift: Vec4, nearest: List[Tuple[MemoryItem, float]], last_snapshot: Optional[dict]) -> str: drift_block = json.dumps(drift.to_dict(), ensure_ascii=False, separators=(",", ":")) mem_block = [] for it, d in nearest[:6]: mem_block.append({ "distance_m": float(d), "coord": it.coord.to_dict(), "payload": _compact_memory_payload(it.payload), "age_s": float(time.time() - it.created_s), }) mem_json = json.dumps(mem_block, ensure_ascii=False, separators=(",", ":")) snap_json = json.dumps(_compact_snapshot(last_snapshot), ensure_ascii=False, separators=(",", ":")) return ( "=== TESSERACT PERCEPTION FEED (SUMMARY) ===\n" f"DRIFT:{drift_block}\n" f"NEAREST_4D_MEMORIES:{mem_json}\n" f"LAST_PERCEPTION_SNAPSHOT:{snap_json}\n" "Rules:\n" "- Use only the summaries above.\n" "- If memories are empty, say so.\n" ) # ------------------------------------------------------------------- # CLI Chat Loop # ------------------------------------------------------------------- HELP_TEXT = """\ Commands: /help show this help /perceive run sensors -> 4D transform -> spatial reasoning -> prediction; store snapshot in 4D memory /drift show current drift coordinate /step [n] advance drift n steps (default 1) /mem [k] show nearest k memories (default 6) /clear clear in-RAM memory /exit quit """ def snapshot_from_pipeline(node: TesseractNode) -> dict: perceptions = node.perceive() processed = node.process_perceptions(perceptions) snap = {"perceptions": [], "processed": []} for (ir4d, depth4d), outcome in zip(perceptions, processed): ir4d = np.asarray(ir4d, dtype=np.float64) depth4d = np.asarray(depth4d, dtype=np.float64) out = np.asarray(outcome, dtype=np.float64) snap["perceptions"].append({ "ir4d_shape": list(ir4d.shape), "depth4d_shape": list(depth4d.shape), "ir4d_summary": _np_summary(ir4d), "depth4d_summary": _np_summary(depth4d), }) snap["processed"].append({ "prediction_summary": _np_summary(out), }) return snap def clamp_completion_tokens(n_ctx: int, context_block: str, user_text: str, desired: int) -> int: """ Crude but effective: - assume ~4 chars per token average - keep a safety margin """ approx_prompt_tokens = int((len(context_block) + len(user_text) + 800) / 4) # +800 for chat template overhead budget = max(64, n_ctx - approx_prompt_tokens - 64) # keep margin return int(max(64, min(desired, budget))) def main() -> None: script_dir = os.path.dirname(os.path.abspath(__file__)) gguf_path = os.path.join(script_dir, "Qwen3-4B-Instruct-2507-Q4_K_M.gguf") # Runtime config (env overridable) n_ctx = int(os.environ.get("TESSERACT_CTX", "4096")) n_threads = int(os.environ.get("TESSERACT_THREADS", "0")) n_gpu_layers = int(os.environ.get("TESSERACT_N_GPU_LAYERS", "0")) node = TesseractNode() drift = DriftEngine(step_m=0.25, step_s=0.0000005) mem = HyperMemory() last_snapshot: Optional[dict] = None chat = GGUFChat(model_path=gguf_path, ctx=n_ctx, threads=n_threads, n_gpu_layers=n_gpu_layers) print("TESSERACT-NODE: tessellation_4d + Qwen GGUF chat (Context-Safe)") print(f"GGUF path: {gguf_path}") print(f"n_ctx={n_ctx} threads={n_threads} n_gpu_layers={n_gpu_layers}") if not chat.ready(): print("ERROR: LLM not ready.") print("Reason:", chat.why()) print("\nFix (inside your venv): pip install -U llama-cpp-python\n") return print("\nType /help for commands. Then just chat normally.\n") while True: try: user = input("You> ").strip() except (EOFError, KeyboardInterrupt): print("\nBye.") return if not user: continue if user.startswith("/"): parts = user.split() cmd = parts[0].lower() if cmd == "/help": print(HELP_TEXT) continue if cmd == "/exit": print("Bye.") return if cmd == "/drift": print(json.dumps(drift.pos.to_dict(), indent=2)) continue if cmd == "/step": n = 1 if len(parts) > 1: try: n = int(parts[1]) except Exception: n = 1 drift.step(n) print(f"Drift stepped {n}. Current:", json.dumps(drift.pos.to_dict())) continue if cmd == "/clear": mem.items = [] last_snapshot = None print("Memory cleared (RAM only).") continue if cmd == "/mem": k = 6 if len(parts) > 1: try: k = int(parts[1]) except Exception: k = 6 nearest = mem.nearest(drift.pos, k=k) if not nearest: print("No memories yet. Use /perceive to create one.") else: for i, (it, d) in enumerate(nearest, 1): payload = _compact_memory_payload(it.payload) print(f"[{i}] dist_m={d:.3f} coord={it.coord.to_dict()} payload={payload}") continue if cmd == "/perceive": last_snapshot = snapshot_from_pipeline(node) mem.add(drift.pos, {"type": "perception_snapshot", "snapshot": last_snapshot}) print("Perception captured + stored in HyperMemory at current drift.") drift.step(1) continue print("Unknown command. Type /help.") continue # Chat turn: drift moves each turn drift.step(1) nearest = mem.nearest(drift.pos, k=6) ctx_block = build_context_block(drift.pos, nearest, last_snapshot) # Clamp completion tokens so we don't exceed n_ctx desired_out = 512 max_tokens = clamp_completion_tokens(n_ctx=n_ctx, context_block=ctx_block, user_text=user, desired=desired_out) try: reply = chat.chat( system=system_prompt_superpower(), user=user, context_block=ctx_block, temperature=0.7, max_tokens=max_tokens, ) except Exception as e: print("LLM ERROR:", repr(e)) print("Tip: If you want a larger context window, try:") print(" export TESSERACT_CTX=8192 (only if your GGUF supports it)") continue print("\nQwen4D> " + reply.strip() + "\n") if __name__ == "__main__": main()