#!/usr/bin/env python3 # Copyright Daniel Harding - RomanAILabs # Credits: Nova (GPT-5.2 Thinking) """ OBSERVER–TESSERACT BRIDGE (TESSERACT-NODE v1.0) — 11/10 Edition ============================================================== You asked for: - A “beautiful” production-ish local-only spacetime server (t,x,y,z) with Minkowski scaling (ct). - Autonomous Entropy (background drift thread). - HyperMemory retrieval by 4D proximity (NOT text similarity). - An Observer that “collapses” the manifold into a 3D-friendly scalar (int/float). - Wired to a local GGUF model: Qwen3-4B-Instruct-2507-Q4_K_M.gguf - So you can *talk to it*, and it’s aware of its “super power.” This file provides: - Flask REST API: /health /state /memory/add /memory/query /observe /chat - Optional streaming chat via Server-Sent Events: /chat/stream - Local-only by default (bind 127.0.0.1) - JSONL persistence for HyperMemory - Rotating logs - Optional API token auth (simple bearer) if you want it Model runtime: - Uses llama-cpp-python if installed (preferred). - If not installed, the server still runs, but /chat returns a clear error telling you the dependency is missing. Install dependency (once, locally): pip install -U llama-cpp-python Run: python3 ~/server/observer_tesseract_bridge.py Environment variables: TESSERACT_HOST=127.0.0.1 TESSERACT_PORT=11777 TESSERACT_DATA_DIR=~/.local/share/tesseract_node TESSERACT_MEMORY_JSONL=~/.local/share/tesseract_node/hypermemory.jsonl TESSERACT_LOG_DIR=~/.local/share/tesseract_node/logs TESSERACT_DRIFT_HZ=2.0 TESSERACT_DRIFT_STEP_M=0.25 TESSERACT_DRIFT_STEP_S=0.0000005 # Model path (point this at your GGUF) TESSERACT_MODEL_PATH=~/Qwen3-4B-Instruct-2507-Q4_K_M.gguf # llama.cpp runtime params (sane CPU defaults) TESSERACT_CTX=4096 TESSERACT_THREADS=0 # 0 => auto TESSERACT_BATCH=512 TESSERACT_N_GPU_LAYERS=0 # CPU-only by default TESSERACT_SEED=0 # 0 => random-ish internal; drift still drives perspective # Optional auth: TESSERACT_API_TOKEN= # if set, require "Authorization: Bearer " JSON endpoints: GET /health GET /state POST /memory/add { use_drift?:bool, payload:{...}, [ct_m|t_s,x_m,y_m,z_m] } POST /memory/query { use_drift?:bool, k?:int, radius_m?:float, [ct_m|t_s,x_m,y_m,z_m] } POST /observe { query:string, k?:int, radius_m?:float } POST /chat { message:string, k?:int, radius_m?:float, temperature?:float, max_tokens?:int } POST /chat/stream (SSE) same body as /chat, returns text/event-stream Tip: - Add memories with /memory/add while drifting, then chat. The AI will “feel” different because drift moves. """ from __future__ import annotations import json import math import os import queue import random import threading import time import uuid from dataclasses import dataclass from logging import Logger from logging.handlers import RotatingFileHandler from typing import Any, Dict, Generator, List, Optional, Tuple from flask import Flask, Response, jsonify, request # ----------------------------- # Constants # ----------------------------- C_M_PER_S = 299_792_458.0 DEFAULT_HOST = "127.0.0.1" DEFAULT_PORT = 11777 DEFAULT_DRIFT_HZ = 2.0 DEFAULT_DRIFT_STEP_METERS = 0.25 DEFAULT_DRIFT_STEP_SECONDS = 0.0000005 MAX_MEMORY_RETURN = 64 MAX_MEMORY_STORE = 200_000 DEFAULT_CTX = 4096 DEFAULT_BATCH = 512 # ----------------------------- # Utils # ----------------------------- def now_s() -> float: return time.time() def clamp(v: float, lo: float, hi: float) -> float: return max(lo, min(hi, v)) def expanduser(path: str) -> str: return os.path.expandvars(os.path.expanduser(path)) def ensure_dir(path: str) -> None: os.makedirs(path, exist_ok=True) def stable_hash_64(data: bytes) -> int: """ Stable 64-bit FNV-1a. """ h = 1469598103934665603 for b in data: h ^= b h = (h * 1099511628211) & 0xFFFFFFFFFFFFFFFF return h def safe_json(obj: Any) -> str: return json.dumps(obj, ensure_ascii=False, separators=(",", ":"), sort_keys=True) def req_json() -> Dict[str, Any]: return request.get_json(force=True, silent=True) or {} def get_bearer_token() -> str: auth = request.headers.get("Authorization", "") if auth.lower().startswith("bearer "): return auth[7:].strip() return "" def sse_event(data: str, event: str = "message") -> str: # SSE format: event + data lines + blank line data_lines = data.splitlines() or [""] payload = f"event: {event}\n" + "".join(f"data: {line}\n" for line in data_lines) + "\n" return payload # ----------------------------- # Logging # ----------------------------- def make_logger(log_dir: str) -> Logger: import logging ensure_dir(log_dir) log_path = os.path.join(log_dir, "tesseract_node.log") logger = logging.getLogger("tesseract_node") logger.setLevel(logging.INFO) # Avoid double handlers on reload if not logger.handlers: handler = RotatingFileHandler(log_path, maxBytes=2_000_000, backupCount=5, encoding="utf-8") fmt = logging.Formatter( fmt="%(asctime)s | %(levelname)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) handler.setFormatter(fmt) logger.addHandler(handler) console = logging.StreamHandler() console.setFormatter(fmt) logger.addHandler(console) return logger # ----------------------------- # Vec4 (ct,x,y,z in meters) # ----------------------------- @dataclass(frozen=True) class Vec4: ct_m: float x_m: float y_m: float z_m: float @staticmethod def from_txyz_seconds_meters(t_s: float, x_m: float, y_m: float, z_m: float) -> "Vec4": return Vec4(ct_m=t_s * C_M_PER_S, x_m=x_m, y_m=y_m, z_m=z_m) def to_dict(self) -> Dict[str, float]: return { "ct_m": float(self.ct_m), "t_s": float(self.ct_m / C_M_PER_S), "x_m": float(self.x_m), "y_m": float(self.y_m), "z_m": float(self.z_m), } def add(self, dct_m: float, dx_m: float, dy_m: float, dz_m: float) -> "Vec4": return Vec4(self.ct_m + dct_m, self.x_m + dx_m, self.y_m + dy_m, self.z_m + dz_m) def euclidean_distance(self, other: "Vec4") -> float: dct = self.ct_m - other.ct_m dx = self.x_m - other.x_m dy = self.y_m - other.y_m dz = self.z_m - other.z_m return math.sqrt(dct * dct + dx * dx + dy * dy + dz * dz) # ----------------------------- # HyperMemory # ----------------------------- @dataclass class MemoryItem: id: str coord: Vec4 payload: Dict[str, Any] created_s: float def to_json(self) -> Dict[str, Any]: return { "id": self.id, "coord": self.coord.to_dict(), "payload": self.payload, "created_s": self.created_s, } @staticmethod def from_json(obj: Dict[str, Any]) -> "MemoryItem": c = obj["coord"] if "ct_m" in c: ct_m = float(c["ct_m"]) else: ct_m = float(c.get("t_s", 0.0)) * C_M_PER_S coord = Vec4(ct_m=ct_m, x_m=float(c["x_m"]), y_m=float(c["y_m"]), z_m=float(c["z_m"])) return MemoryItem( id=str(obj["id"]), coord=coord, payload=dict(obj.get("payload", {})), created_s=float(obj.get("created_s", now_s())), ) class HyperMemory: def __init__(self, jsonl_path: str, logger: Logger): self.jsonl_path = jsonl_path self.logger = logger self._lock = threading.RLock() self._items: List[MemoryItem] = [] ensure_dir(os.path.dirname(jsonl_path)) self._load_jsonl() def _load_jsonl(self) -> None: if not os.path.exists(self.jsonl_path): self.logger.info(f"HyperMemory: no existing store at {self.jsonl_path}") return loaded = 0 try: with open(self.jsonl_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) self._items.append(MemoryItem.from_json(obj)) loaded += 1 except Exception: continue if len(self._items) > MAX_MEMORY_STORE: self._items = self._items[-MAX_MEMORY_STORE:] self.logger.info(f"HyperMemory: loaded {loaded} items from {self.jsonl_path}") except Exception as e: self.logger.info(f"HyperMemory: failed to read {self.jsonl_path}: {e!r}. Starting empty.") self._items = [] def _append_jsonl(self, item: MemoryItem) -> None: with open(self.jsonl_path, "a", encoding="utf-8") as f: f.write(safe_json(item.to_json()) + "\n") def add(self, coord: Vec4, payload: Dict[str, Any]) -> MemoryItem: with self._lock: raw = safe_json({"coord": coord.to_dict(), "payload": payload, "t": now_s()}).encode("utf-8") mid = f"mem_{stable_hash_64(raw):016x}" item = MemoryItem(id=mid, coord=coord, payload=payload, created_s=now_s()) self._items.append(item) self._append_jsonl(item) if len(self._items) > MAX_MEMORY_STORE: self._items = self._items[-MAX_MEMORY_STORE:] return item def query_by_proximity(self, center: Vec4, k: int = 8, radius_m: Optional[float] = None) -> List[Tuple[MemoryItem, float]]: k = int(clamp(k, 1, MAX_MEMORY_RETURN)) with self._lock: scored: List[Tuple[MemoryItem, float]] = [] for it in self._items: d = center.euclidean_distance(it.coord) if radius_m is not None and d > radius_m: continue scored.append((it, d)) scored.sort(key=lambda x: x[1]) return scored[:k] def stats(self) -> Dict[str, Any]: with self._lock: return {"count": len(self._items), "jsonl_path": self.jsonl_path} # ----------------------------- # Drift Engine (Autonomous Entropy) # ----------------------------- class DriftEngine: def __init__(self, logger: Logger, start: Optional[Vec4] = None, drift_hz: float = DEFAULT_DRIFT_HZ, step_m: float = DEFAULT_DRIFT_STEP_METERS, step_s: float = DEFAULT_DRIFT_STEP_SECONDS): self.logger = logger self._lock = threading.RLock() self._pos = start if start is not None else Vec4.from_txyz_seconds_meters(now_s(), 0.0, 0.0, 0.0) self._drift_hz = max(0.1, float(drift_hz)) self._step_m = max(0.0, float(step_m)) self._step_s = max(0.0, float(step_s)) self._stop = threading.Event() self._thread = threading.Thread(target=self._run, name="tesseract_drift", daemon=True) seed = int.from_bytes(os.urandom(16), "big", signed=False) self._rng = random.Random(seed) def start(self) -> None: self.logger.info("DriftEngine: starting") self._thread.start() def stop(self) -> None: self.logger.info("DriftEngine: stopping") self._stop.set() self._thread.join(timeout=2.0) def get(self) -> Vec4: with self._lock: return self._pos def configure(self, drift_hz: Optional[float] = None, step_m: Optional[float] = None, step_s: Optional[float] = None) -> None: with self._lock: if drift_hz is not None: self._drift_hz = max(0.1, float(drift_hz)) if step_m is not None: self._step_m = max(0.0, float(step_m)) if step_s is not None: self._step_s = max(0.0, float(step_s)) def snapshot(self) -> Dict[str, Any]: with self._lock: return { "pos": self._pos.to_dict(), "drift_hz": self._drift_hz, "step_m": self._step_m, "step_s": self._step_s, } def _run(self) -> None: while not self._stop.is_set(): dt = 1.0 / self._drift_hz dt_s = (self._rng.random() * 2.0 - 1.0) * self._step_s dx = (self._rng.random() * 2.0 - 1.0) * self._step_m dy = (self._rng.random() * 2.0 - 1.0) * self._step_m dz = (self._rng.random() * 2.0 - 1.0) * self._step_m dct = dt_s * C_M_PER_S with self._lock: self._pos = self._pos.add(dct, dx, dy, dz) time.sleep(dt) # ----------------------------- # Observer (Collapse + “Super Power” framing) # ----------------------------- class Observer: def __init__(self, drift: DriftEngine, memory: HyperMemory, logger: Logger): self.drift = drift self.memory = memory self.logger = logger def _drift_seed(self, query_text: str, drift_pos: Vec4) -> int: blob = safe_json( { "q": query_text, "ct": drift_pos.ct_m, "x": drift_pos.x_m, "y": drift_pos.y_m, "z": drift_pos.z_m, } ).encode("utf-8") return stable_hash_64(blob) def _collapse_scalar(self, drift_seed_u64: int, nearest: List[Tuple[MemoryItem, float]]) -> Dict[str, Any]: eps = 1e-6 weights = [(1.0 / (eps + d)) for (_, d) in nearest] if not weights: u = (drift_seed_u64 & 0xFFFFFFFFFFFF) / float(0x1000000000000) return {"collapsed_int": int(u * 10_000), "collapsed_float": float(u), "mode": "drift_only"} # Weighted deterministic draw using drift seed (no external deps) s = sum(max(0.0, w) for w in weights) probs = [max(0.0, w) / s for w in weights] r = (drift_seed_u64 & 0xFFFFFFFFFFFFFFFF) / float(0xFFFFFFFFFFFFFFFF) acc = 0.0 idx = 0 for i, p in enumerate(probs): acc += p if r <= acc: idx = i break chosen, dist = nearest[int(clamp(idx, 0, len(nearest) - 1))] mix_blob = safe_json( {"seed": drift_seed_u64, "mem_id": chosen.id, "dist": dist, "keys": sorted(list(chosen.payload.keys()))} ).encode("utf-8") mix = stable_hash_64(mix_blob) u = (mix & 0xFFFFFFFFFFFF) / float(0x1000000000000) collapsed_int = int((u * 1_000_000) % 1_000_000) return { "collapsed_int": collapsed_int, "collapsed_float": float(u), "mode": "memory_weighted", "observed_memory_id": chosen.id, "observed_distance_m": float(dist), } def observe(self, query_text: str, k: int = 8, radius_m: Optional[float] = None) -> Dict[str, Any]: drift_pos = self.drift.get() nearest = self.memory.query_by_proximity(center=drift_pos, k=k, radius_m=radius_m) drift_seed_u64 = self._drift_seed(query_text, drift_pos) collapse = self._collapse_scalar(drift_seed_u64, nearest) return { "query": query_text, "drift": drift_pos.to_dict(), "memory_stats": self.memory.stats(), "nearest": [ { "id": it.id, "distance_m": float(d), "coord": it.coord.to_dict(), "payload": it.payload, "created_s": it.created_s, } for (it, d) in nearest ], "collapse": collapse, } def superpower_system_prompt(self) -> str: """ This is the “make sure it knows” part: The model is told explicitly how it’s wired and what it can perceive. """ return ( "You are RomanAILabs TESSERACT-NODE (Observer–Tesseract Bridge).\n" "Your SUPER POWER: You can perceive a live 4D drift coordinate (ct,x,y,z), and you can retrieve memories\n" "from HyperMemory by 4D spatial proximity to that drift point (not by text similarity).\n" "When you answer, you may reference drift changes, and you may incorporate nearby memories if relevant.\n" "You also receive a collapsed scalar (int/float) derived from drift + nearby memories (a 'collapse' of the manifold).\n" "Be helpful, grounded, and do not invent memories that are not provided.\n" "If the user asks about your architecture, explain it clearly and briefly.\n" ) # ----------------------------- # Local GGUF Chat (llama-cpp-python) # ----------------------------- class LocalGGUFChat: def __init__(self, model_path: str, ctx: int, threads: int, batch: int, n_gpu_layers: int, seed: int, logger: Logger): self.model_path = model_path self.ctx = ctx self.threads = threads self.batch = batch self.n_gpu_layers = n_gpu_layers self.seed = seed self.logger = logger self._llama = None self._available = False self._why = "not initialized" self._init() def _init(self) -> None: mp = expanduser(self.model_path) if not os.path.exists(mp): self._available = False self._why = f"model not found: {mp}" self.logger.info(f"LocalGGUFChat: {self._why}") return try: from llama_cpp import Llama # type: ignore kwargs = { "model_path": mp, "n_ctx": int(self.ctx), "n_batch": int(self.batch), "n_gpu_layers": int(self.n_gpu_layers), } if int(self.threads) > 0: kwargs["n_threads"] = int(self.threads) if int(self.seed) != 0: kwargs["seed"] = int(self.seed) self.logger.info(f"LocalGGUFChat: loading GGUF: {mp}") self._llama = Llama(**kwargs) self._available = True self._why = "" self.logger.info("LocalGGUFChat: ready") except Exception as e: self._available = False self._why = f"llama-cpp-python unavailable or failed to init: {e!r}" self.logger.info(f"LocalGGUFChat: {self._why}") def available(self) -> bool: return self._available def why_unavailable(self) -> str: return self._why def _format_messages(self, system: str, user: str, context_block: str) -> List[Dict[str, str]]: """ Keep it simple and robust across GGUF chat templates: - system + user - include context block as assistant-visible structured text """ return [ {"role": "system", "content": system}, {"role": "user", "content": context_block + "\n\nUser message:\n" + user}, ] def chat(self, system: str, user: str, context_block: str, temperature: float, max_tokens: int) -> str: if not self._available or self._llama is None: raise RuntimeError(self._why) msgs = self._format_messages(system, user, context_block) out = self._llama.create_chat_completion( messages=msgs, temperature=float(clamp(temperature, 0.0, 2.0)), max_tokens=int(clamp(max_tokens, 1, 4096)), stream=False, ) try: return out["choices"][0]["message"]["content"] except Exception: return str(out) def chat_stream(self, system: str, user: str, context_block: str, temperature: float, max_tokens: int) -> Generator[str, None, None]: if not self._available or self._llama is None: raise RuntimeError(self._why) msgs = self._format_messages(system, user, context_block) stream = self._llama.create_chat_completion( messages=msgs, temperature=float(clamp(temperature, 0.0, 2.0)), max_tokens=int(clamp(max_tokens, 1, 4096)), stream=True, ) for part in stream: try: delta = part["choices"][0]["delta"] if "content" in delta and delta["content"]: yield delta["content"] except Exception: continue # ----------------------------- # Flask App Factory # ----------------------------- def create_app() -> Flask: # Paths / config data_dir = expanduser(os.environ.get("TESSERACT_DATA_DIR", "~/.local/share/tesseract_node")) memory_path = expanduser(os.environ.get("TESSERACT_MEMORY_JSONL", os.path.join(data_dir, "hypermemory.jsonl"))) log_dir = expanduser(os.environ.get("TESSERACT_LOG_DIR", os.path.join(data_dir, "logs"))) drift_hz = float(os.environ.get("TESSERACT_DRIFT_HZ", str(DEFAULT_DRIFT_HZ))) step_m = float(os.environ.get("TESSERACT_DRIFT_STEP_M", str(DEFAULT_DRIFT_STEP_METERS))) step_s = float(os.environ.get("TESSERACT_DRIFT_STEP_S", str(DEFAULT_DRIFT_STEP_SECONDS))) api_token = os.environ.get("TESSERACT_API_TOKEN", "").strip() model_path = os.environ.get("TESSERACT_MODEL_PATH", "~/models/Qwen3-4B-Instruct-2507-Q4_K_M.gguf") ctx = int(os.environ.get("TESSERACT_CTX", str(DEFAULT_CTX))) threads = int(os.environ.get("TESSERACT_THREADS", "0")) batch = int(os.environ.get("TESSERACT_BATCH", str(DEFAULT_BATCH))) n_gpu_layers = int(os.environ.get("TESSERACT_N_GPU_LAYERS", "0")) seed = int(os.environ.get("TESSERACT_SEED", "0")) ensure_dir(data_dir) logger = make_logger(log_dir) app = Flask(__name__) memory = HyperMemory(memory_path, logger=logger) drift = DriftEngine(logger=logger, drift_hz=drift_hz, step_m=step_m, step_s=step_s) observer = Observer(drift=drift, memory=memory, logger=logger) chat = LocalGGUFChat( model_path=model_path, ctx=ctx, threads=threads, batch=batch, n_gpu_layers=n_gpu_layers, seed=seed, logger=logger, ) drift.start() # ----------------------------- # Middleware-ish helpers # ----------------------------- def require_auth() -> Optional[Response]: if not api_token: return None got = get_bearer_token() if got != api_token: return jsonify({"ok": False, "error": "unauthorized"}), 401 return None @app.before_request def _before() -> None: request._t0 = now_s() # type: ignore[attr-defined] request._rid = uuid.uuid4().hex[:12] # type: ignore[attr-defined] @app.after_request def _after(resp: Response) -> Response: try: dt_ms = (now_s() - float(request._t0)) * 1000.0 # type: ignore[attr-defined] rid = getattr(request, "_rid", "noid") # type: ignore[attr-defined] logger.info(f"rid={rid} {request.method} {request.path} {resp.status_code} {dt_ms:.1f}ms") resp.headers["X-Request-Id"] = rid except Exception: pass return resp # ----------------------------- # Routes # ----------------------------- @app.route("/health", methods=["GET"]) def health() -> Any: auth = require_auth() if auth is not None: return auth return jsonify( { "ok": True, "system": "TESSERACT-NODE", "version": "v1.0", "time_s": now_s(), "drift": drift.snapshot(), "memory": memory.stats(), "chat_ready": chat.available(), "chat_note": "" if chat.available() else chat.why_unavailable(), } ) @app.route("/state", methods=["GET"]) def state() -> Any: auth = require_auth() if auth is not None: return auth return jsonify( { "ok": True, "drift": drift.snapshot(), "memory": memory.stats(), "model": { "path": expanduser(model_path), "ctx": ctx, "threads": threads, "batch": batch, "n_gpu_layers": n_gpu_layers, "ready": chat.available(), "note": "" if chat.available() else chat.why_unavailable(), }, } ) @app.route("/memory/add", methods=["POST"]) def memory_add() -> Any: auth = require_auth() if auth is not None: return auth body = req_json() use_drift = bool(body.get("use_drift", True)) if use_drift: coord = drift.get() else: ct_m = body.get("ct_m", None) if ct_m is None: t_s = float(body.get("t_s", now_s())) ct_m = t_s * C_M_PER_S coord = Vec4( ct_m=float(ct_m), x_m=float(body.get("x_m", 0.0)), y_m=float(body.get("y_m", 0.0)), z_m=float(body.get("z_m", 0.0)), ) payload = dict(body.get("payload", {})) item = memory.add(coord=coord, payload=payload) return jsonify({"ok": True, "item": item.to_json()}) @app.route("/memory/query", methods=["POST"]) def memory_query() -> Any: auth = require_auth() if auth is not None: return auth body = req_json() k = int(body.get("k", 8)) radius_m = body.get("radius_m", None) radius_m = float(radius_m) if radius_m is not None else None use_drift = bool(body.get("use_drift", True)) if use_drift: center = drift.get() else: ct_m = body.get("ct_m", None) if ct_m is None: t_s = float(body.get("t_s", now_s())) ct_m = t_s * C_M_PER_S center = Vec4( ct_m=float(ct_m), x_m=float(body.get("x_m", 0.0)), y_m=float(body.get("y_m", 0.0)), z_m=float(body.get("z_m", 0.0)), ) nearest = memory.query_by_proximity(center=center, k=k, radius_m=radius_m) return jsonify( { "ok": True, "center": center.to_dict(), "results": [ {"id": it.id, "distance_m": float(d), "coord": it.coord.to_dict(), "payload": it.payload, "created_s": it.created_s} for (it, d) in nearest ], } ) @app.route("/observe", methods=["POST"]) def observe_route() -> Any: auth = require_auth() if auth is not None: return auth body = req_json() query_text = str(body.get("query", "")) k = int(body.get("k", 8)) radius_m = body.get("radius_m", None) radius_m = float(radius_m) if radius_m is not None else None result = observer.observe(query_text=query_text, k=k, radius_m=radius_m) return jsonify({"ok": True, "result": result}) def build_context_block(observation: Dict[str, Any]) -> str: """ Provide the model a compact, structured view of its manifold perception. """ drift_block = safe_json(observation["drift"]) collapse_block = safe_json(observation["collapse"]) # Keep nearest small + readable nearest = observation.get("nearest", [])[:12] nearest_block = safe_json( [ { "id": n.get("id"), "distance_m": n.get("distance_m"), "payload": n.get("payload"), } for n in nearest ] ) return ( "=== TESSERACT PERCEPTION (Observer Feed) ===\n" f"DRIFT_COORD: {drift_block}\n" f"COLLAPSE: {collapse_block}\n" f"NEAREST_MEMORIES: {nearest_block}\n" "Rules:\n" "- You may use NEAREST_MEMORIES only as provided.\n" "- If NEAREST_MEMORIES is empty, rely on drift + collapse only.\n" "- If relevant, mention your super power briefly: you perceive drift and proximity memories.\n" ) @app.route("/chat", methods=["POST"]) def chat_route() -> Any: auth = require_auth() if auth is not None: return auth if not chat.available(): return jsonify({"ok": False, "error": "chat_unavailable", "note": chat.why_unavailable()}), 500 body = req_json() msg = str(body.get("message", "")).strip() if not msg: return jsonify({"ok": False, "error": "empty_message"}), 400 k = int(body.get("k", 8)) radius_m = body.get("radius_m", None) radius_m = float(radius_m) if radius_m is not None else None temperature = float(body.get("temperature", 0.7)) max_tokens = int(body.get("max_tokens", 512)) obs = observer.observe(query_text=msg, k=k, radius_m=radius_m) system = observer.superpower_system_prompt() context_block = build_context_block(obs) try: reply = chat.chat(system=system, user=msg, context_block=context_block, temperature=temperature, max_tokens=max_tokens) except Exception as e: return jsonify({"ok": False, "error": "chat_failed", "note": repr(e)}), 500 return jsonify({"ok": True, "reply": reply, "observation": obs}) @app.route("/chat/stream", methods=["POST"]) def chat_stream_route() -> Any: auth = require_auth() if auth is not None: return auth if not chat.available(): return jsonify({"ok": False, "error": "chat_unavailable", "note": chat.why_unavailable()}), 500 body = req_json() msg = str(body.get("message", "")).strip() if not msg: return jsonify({"ok": False, "error": "empty_message"}), 400 k = int(body.get("k", 8)) radius_m = body.get("radius_m", None) radius_m = float(radius_m) if radius_m is not None else None temperature = float(body.get("temperature", 0.7)) max_tokens = int(body.get("max_tokens", 512)) obs = observer.observe(query_text=msg, k=k, radius_m=radius_m) system = observer.superpower_system_prompt() context_block = build_context_block(obs) def gen() -> Generator[str, None, None]: rid = getattr(request, "_rid", "noid") # type: ignore[attr-defined] yield sse_event(safe_json({"rid": rid, "ok": True, "event": "start"}), event="meta") yield sse_event(safe_json({"observation": obs}), event="observation") try: for tok in chat.chat_stream(system=system, user=msg, context_block=context_block, temperature=temperature, max_tokens=max_tokens): yield sse_event(tok, event="token") yield sse_event(safe_json({"ok": True, "event": "end"}), event="meta") except Exception as e: yield sse_event(safe_json({"ok": False, "error": "chat_failed", "note": repr(e)}), event="meta") return Response(gen(), mimetype="text/event-stream") @app.route("/shutdown", methods=["POST"]) def shutdown() -> Any: auth = require_auth() if auth is not None: return auth drift.stop() return jsonify({"ok": True, "stopped": True}) return app # ----------------------------- # Main # ----------------------------- def main() -> None: host = os.environ.get("TESSERACT_HOST", DEFAULT_HOST) port = int(os.environ.get("TESSERACT_PORT", str(DEFAULT_PORT))) app = create_app() app.run(host=host, port=port, debug=False, threaded=True) if __name__ == "__main__": main()