#!/usr/bin/env python3 # Copyright Daniel Harding - RomanAILabs # Credits: Nova (GPT-5.2 Thinking) """ 4D_Quantum_Ollama_Termux.py Offline, Termux-friendly, Ollama-like local server with a real 4D field substrate. Key features (the "missing pieces"): - /api/generate JSONL streaming (Ollama-ish) - Field4D persistence (events.jsonl + nodes.json) + replay on boot - Tool-call loop: model can emit TOOL_CALL blocks to query/mutate Field4D mid-generation - /api/stop cancels a running generation - Hard limits: request bytes, max tokens, max prompt bytes, basic rate limiting Backends: - stub4d (always works offline) - llama_cpp (optional) via llama-cpp-python if installed (GGUF) Run: python3 ~/4D_Quantum_Ollama_Termux.py --host 127.0.0.1 --port 11434 Try (stub backend): curl -N http://127.0.0.1:11434/api/generate \ -H "Content-Type: application/json" \ -d '{"model":"stub4d","prompt":"Hello 4D. Query the field and inject an event.","stream":true,"session":"daniel"}' Stop: curl -s http://127.0.0.1:11434/api/stop -H "Content-Type: application/json" -d '{"session":"daniel"}' Field summary: curl -s http://127.0.0.1:11434/field4d/summary Tool-call format (model emits this EXACT block): [[TOOL_CALL]]{"tool":"field4d.query","args":{"center":{"x":0,"y":0,"z":0,"t":0},"radius":3}}[[/TOOL_CALL]] Server executes and injects: [[TOOL_RESULT]]{...}[[/TOOL_RESULT]] """ from __future__ import annotations import argparse import dataclasses import hashlib import io import json import os import queue import random import re import signal import threading import time import traceback from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Any, Dict, Generator, List, Optional, Tuple # ----------------------------- # Constants / Limits # ----------------------------- DEFAULT_PORT = 11434 MAX_REQUEST_BYTES = 512 * 1024 # 512KB MAX_PROMPT_BYTES = 256 * 1024 # 256KB MAX_TOKENS_DEFAULT = 256 # generation cap (per request) MAX_STREAM_SECONDS = 120 # kill runaway streams RATE_LIMIT_RPS = 6 # naive per-IP endpoint limiter TOOL_CALL_OPEN = "[[TOOL_CALL]]" TOOL_CALL_CLOSE = "[[/TOOL_CALL]]" TOOL_RES_OPEN = "[[TOOL_RESULT]]" TOOL_RES_CLOSE = "[[/TOOL_RESULT]]" # ----------------------------- # Utils # ----------------------------- def now_ms() -> int: return int(time.time() * 1000) def json_dumps(obj: Any) -> str: return json.dumps(obj, ensure_ascii=False, separators=(",", ":"), sort_keys=False) def clamp(x: float, lo: float, hi: float) -> float: return lo if x < lo else hi if x > hi else x def ensure_dir(p: str) -> None: os.makedirs(p, exist_ok=True) def safe_read_json(path: str, default: Any) -> Any: try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return default def safe_write_json(path: str, obj: Any) -> None: tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(obj, f, ensure_ascii=False, indent=2) os.replace(tmp, path) def sha256_text(s: str) -> str: return hashlib.sha256(s.encode("utf-8", errors="ignore")).hexdigest() def client_ip(handler: BaseHTTPRequestHandler) -> str: return handler.headers.get("X-Forwarded-For", "").split(",")[0].strip() or handler.client_address[0] # ----------------------------- # 4D Core # ----------------------------- @dataclasses.dataclass class Vec4: x: float y: float z: float t: float def to_dict(self) -> Dict[str, float]: return {"x": float(self.x), "y": float(self.y), "z": float(self.z), "t": float(self.t)} @staticmethod def from_dict(d: Dict[str, Any]) -> "Vec4": return Vec4( float(d.get("x", 0.0)), float(d.get("y", 0.0)), float(d.get("z", 0.0)), float(d.get("t", 0.0)), ) def dist4(self, o: "Vec4") -> float: dx = self.x - o.x dy = self.y - o.y dz = self.z - o.z dt = self.t - o.t return (dx * dx + dy * dy + dz * dz + dt * dt) ** 0.5 @dataclasses.dataclass class FieldEvent: id: str ts_ms: int pos: Vec4 type: str payload: Dict[str, Any] half_life_s: float = 900.0 def weight(self, t_ms: Optional[int] = None) -> float: if t_ms is None: t_ms = now_ms() age_s = max(0.0, (t_ms - self.ts_ms) / 1000.0) if self.half_life_s <= 0: return 1.0 return 2 ** (-age_s / self.half_life_s) @dataclasses.dataclass class FieldNode: id: str pos: Vec4 attrs: Dict[str, Any] class Field4D: """ Practical 4D substrate with persistence. - events: append-only JSONL - nodes: nodes.json """ def __init__(self, data_dir: str) -> None: self._lock = threading.RLock() self.data_dir = data_dir ensure_dir(self.data_dir) self.events_path = os.path.join(self.data_dir, "field_events.jsonl") self.nodes_path = os.path.join(self.data_dir, "field_nodes.json") self._events: List[FieldEvent] = [] self._nodes: Dict[str, FieldNode] = {} self._load_nodes() self._replay_events(max_lines=5000) def _load_nodes(self) -> None: raw = safe_read_json(self.nodes_path, default={}) if not isinstance(raw, dict): raw = {} nodes: Dict[str, FieldNode] = {} for nid, nv in raw.items(): try: pos = Vec4.from_dict(nv.get("pos", {}) if isinstance(nv, dict) else {}) attrs = nv.get("attrs", {}) if isinstance(nv, dict) else {} if not isinstance(attrs, dict): attrs = {"value": attrs} nodes[str(nid)] = FieldNode(id=str(nid), pos=pos, attrs=attrs) except Exception: continue with self._lock: self._nodes = nodes def _save_nodes(self) -> None: with self._lock: out = {nid: {"pos": n.pos.to_dict(), "attrs": n.attrs} for nid, n in self._nodes.items()} safe_write_json(self.nodes_path, out) def _replay_events(self, max_lines: int = 5000) -> None: if not os.path.exists(self.events_path): return # replay last N lines to keep RAM bounded try: with open(self.events_path, "rb") as f: f.seek(0, os.SEEK_END) size = f.tell() # read tail chunk read_size = min(size, 2 * 1024 * 1024) f.seek(size - read_size) tail = f.read().decode("utf-8", errors="ignore").splitlines() except Exception: return tail = tail[-max_lines:] events: List[FieldEvent] = [] for line in tail: line = line.strip() if not line: continue try: d = json.loads(line) pos = Vec4.from_dict(d.get("pos", {}) if isinstance(d.get("pos", {}), dict) else {}) payload = d.get("payload", {}) if not isinstance(payload, dict): payload = {"value": payload} events.append(FieldEvent( id=str(d.get("id", "")), ts_ms=int(d.get("ts_ms", 0) or 0), pos=pos, type=str(d.get("type", "event")), payload=payload, half_life_s=float(d.get("half_life_s", 900.0)), )) except Exception: continue with self._lock: self._events = events def summary(self) -> Dict[str, Any]: with self._lock: return {"events": len(self._events), "nodes": len(self._nodes), "ts_ms": now_ms()} def inject_event(self, etype: str, pos: Vec4, payload: Dict[str, Any], half_life_s: float = 900.0) -> FieldEvent: with self._lock: eid = sha256_text(f"{etype}|{now_ms()}|{random.random()}")[:16] ev = FieldEvent( id=eid, ts_ms=now_ms(), pos=pos, type=str(etype), payload=dict(payload or {}), half_life_s=float(half_life_s), ) self._events.append(ev) if len(self._events) > 6000: self._events = self._events[-4500:] # persist append-only try: with open(self.events_path, "a", encoding="utf-8") as f: f.write(json_dumps({ "id": ev.id, "ts_ms": ev.ts_ms, "type": ev.type, "pos": ev.pos.to_dict(), "payload": ev.payload, "half_life_s": ev.half_life_s, }) + "\n") except Exception: pass return ev def upsert_node(self, node_id: str, pos: Vec4, attrs: Dict[str, Any]) -> FieldNode: with self._lock: n = FieldNode(id=str(node_id), pos=pos, attrs=dict(attrs or {})) self._nodes[str(node_id)] = n self._save_nodes() return n def mutate_node(self, node_id: str, patch: Dict[str, Any]) -> Optional[FieldNode]: with self._lock: n = self._nodes.get(str(node_id)) if not n: return None if isinstance(patch, dict): if "pos" in patch and isinstance(patch["pos"], dict): n.pos = Vec4.from_dict(patch["pos"]) for k, v in patch.items(): if k == "pos": continue n.attrs[k] = v self._save_nodes() return n def query_events(self, center: Vec4, radius: float, limit: int = 50, types: Optional[List[str]] = None) -> List[Dict[str, Any]]: rr = float(radius) lim = max(1, int(limit)) tset = set(types or []) out: List[Tuple[float, FieldEvent]] = [] with self._lock: for ev in self._events: if tset and ev.type not in tset: continue if ev.pos.dist4(center) <= rr: w = ev.weight() if w >= 0.01: out.append((w, ev)) out.sort(key=lambda x: x[0], reverse=True) out = out[:lim] return [{ "id": ev.id, "ts_ms": ev.ts_ms, "type": ev.type, "pos": ev.pos.to_dict(), "weight": w, "payload": ev.payload } for (w, ev) in out] def query_nodes(self, center: Vec4, radius: float, limit: int = 50) -> List[Dict[str, Any]]: rr = float(radius) lim = max(1, int(limit)) out: List[Tuple[float, FieldNode]] = [] with self._lock: for n in self._nodes.values(): d = n.pos.dist4(center) if d <= rr: out.append((d, n)) out.sort(key=lambda x: x[0]) out = out[:lim] return [{"id": n.id, "pos": n.pos.to_dict(), "attrs": n.attrs, "dist": d} for (d, n) in out] def sample_scalars(self, pos: Vec4, radius: float = 3.0) -> Dict[str, float]: evs = self.query_events(pos, radius=radius, limit=200) density = min(1.0, len(evs) / 60.0) weight_sum = sum(float(e.get("weight", 0.0)) for e in evs) types = set(e.get("type") for e in evs) type_div = min(1.0, len(types) / 12.0) entropy = clamp(0.12 + 0.55 * density + 0.33 * type_div, 0.0, 1.0) curvature = clamp(weight_sum / 14.0, 0.0, 1.0) potential = clamp(density * (0.5 + 0.5 * curvature), 0.0, 1.0) temperature = entropy return { "density": float(density), "entropy": float(entropy), "curvature": float(curvature), "potential": float(potential), "temperature": float(temperature), } # ----------------------------- # Backends # ----------------------------- class BackendError(Exception): pass class ModelBackend: name = "base" def load(self, model: str, **kwargs: Any) -> Dict[str, Any]: raise NotImplementedError def unload(self) -> None: raise NotImplementedError def stream(self, prompt: str, cancel: threading.Event, **kwargs: Any) -> Generator[str, None, None]: raise NotImplementedError class Stub4DBackend(ModelBackend): name = "stub4d" def __init__(self) -> None: self.loaded = False def load(self, model: str, **kwargs: Any) -> Dict[str, Any]: self.loaded = True return {"ok": True, "backend": self.name, "model": model, "note": "Offline stub mode"} def unload(self) -> None: self.loaded = False def stream(self, prompt: str, cancel: threading.Event, **kwargs: Any) -> Generator[str, None, None]: if not self.loaded: self.load("stub4d") seed = int(kwargs.get("seed", 0) or 0) rnd = random.Random(seed + len(prompt) + 17) # If user hints query/event, emit a tool-call once (to test the loop) wants = ("query" in prompt.lower()) or ("inject" in prompt.lower()) or ("field" in prompt.lower()) base = "4D stream: " for ch in base: if cancel.is_set(): return time.sleep(0.02) yield ch if wants and not cancel.is_set(): # deterministic tool call tool = { "tool": "field4d.query", "args": {"center": {"x": 0, "y": 0, "z": 0, "t": 0}, "radius": 3, "limit": 6} } block = f"{TOOL_CALL_OPEN}{json_dumps(tool)}{TOOL_CALL_CLOSE}" for i in range(0, len(block), 24): if cancel.is_set(): return time.sleep(0.04) yield block[i:i+24] msg = " resonance rising… curvature whispering… timeline steady… ✶\n" for i in range(0, len(msg), 8): if cancel.is_set(): return time.sleep(0.05) yield msg[i:i+8] class LlamaCppBackend(ModelBackend): name = "llama_cpp" def __init__(self) -> None: self.llm = None self.model_path = None def load(self, model: str, **kwargs: Any) -> Dict[str, Any]: try: import llama_cpp # type: ignore except Exception as e: raise BackendError(f"llama-cpp-python not available: {e}") n_ctx = int(kwargs.get("n_ctx", 2048)) n_threads = int(kwargs.get("n_threads", max(1, os.cpu_count() or 2))) seed = int(kwargs.get("seed", 0) or 0) use_mmap = bool(kwargs.get("use_mmap", True)) use_mlock = bool(kwargs.get("use_mlock", False)) self.llm = llama_cpp.Llama( model_path=model, n_ctx=n_ctx, n_threads=n_threads, seed=seed, use_mmap=use_mmap, use_mlock=use_mlock, logits_all=False, ) self.model_path = model return {"ok": True, "backend": self.name, "model": model, "n_ctx": n_ctx, "n_threads": n_threads} def unload(self) -> None: self.llm = None self.model_path = None def stream(self, prompt: str, cancel: threading.Event, **kwargs: Any) -> Generator[str, None, None]: if not self.llm: raise BackendError("llama_cpp backend not loaded") max_tokens = int(kwargs.get("max_tokens", MAX_TOKENS_DEFAULT)) temperature = float(kwargs.get("temperature", 0.7)) top_p = float(kwargs.get("top_p", 0.95)) stop = kwargs.get("stop", None) # llama_cpp yields dict chunks for ch in self.llm(prompt, max_tokens=max_tokens, temperature=temperature, top_p=top_p, stop=stop, stream=True): if cancel.is_set(): return try: yield ch["choices"][0]["text"] except Exception: yield "" # ----------------------------- # Runtime Core # ----------------------------- @dataclasses.dataclass class SessionState: session: str created_ms: int last_ms: int cursor: Vec4 cancel: threading.Event backend: str model: str seed: int = 0 class QuantumCore: def __init__(self, models_dir: str, data_dir: str, default_backend: str = "stub4d") -> None: self.lock = threading.RLock() self.models_dir = models_dir ensure_dir(self.models_dir) self.data_dir = data_dir ensure_dir(self.data_dir) self.field = Field4D(data_dir=os.path.join(self.data_dir, "field4d")) self.sessions: Dict[str, SessionState] = {} self.backend = self._make_backend(default_backend) self.backend_name = self.backend.name self.loaded_model: Optional[str] = None self._rate: Dict[str, List[float]] = {} # ip -> timestamps self.field.inject_event("boot", Vec4(0,0,0,time.time()), {"msg":"server_online","backend":self.backend_name}, half_life_s=7200.0) def _make_backend(self, name: str) -> ModelBackend: n = (name or "").strip().lower() if n in ("stub", "stub4d", "offline", "4d"): return Stub4DBackend() if n in ("llama", "llama_cpp", "llamacpp"): return LlamaCppBackend() return Stub4DBackend() def rate_limit_ok(self, ip: str) -> bool: now = time.time() win = 1.0 with self.lock: arr = self._rate.get(ip, []) arr = [t for t in arr if (now - t) <= win] if len(arr) >= RATE_LIMIT_RPS: self._rate[ip] = arr return False arr.append(now) self._rate[ip] = arr return True def health(self) -> Dict[str, Any]: return { "ok": True, "ts_ms": now_ms(), "backend": self.backend_name, "loaded_model": self.loaded_model, "field4d": self.field.summary(), "sessions": len(self.sessions), } def list_models(self) -> Dict[str, Any]: out = [] for root, _, files in os.walk(self.models_dir): for fn in files: if fn.lower().endswith((".gguf", ".bin", ".ggml")): p = os.path.join(root, fn) try: st = os.stat(p) out.append({"name": os.path.relpath(p, self.models_dir), "path": p, "bytes": st.st_size}) except Exception: pass out.sort(key=lambda x: x["name"]) return {"ok": True, "models": out} def load(self, model: str, backend: Optional[str] = None, kwargs: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: kwargs = kwargs or {} with self.lock: if backend: self.backend = self._make_backend(backend) self.backend_name = self.backend.name model_path = model if model_path == "stub4d": info = self.backend.load("stub4d", **kwargs) self.loaded_model = "stub4d" self.field.inject_event("model_loaded", Vec4(0,0,0,time.time()), {"model":"stub4d","backend":self.backend_name}, 1800.0) return {"ok": True, "loaded": self.loaded_model, "backend": self.backend_name, "info": info} if not os.path.isabs(model_path): cand = os.path.join(self.models_dir, model_path) if os.path.exists(cand): model_path = cand if not os.path.exists(model_path): raise BackendError(f"model not found: {model_path}") info = self.backend.load(model_path, **kwargs) self.loaded_model = model_path self.field.inject_event("model_loaded", Vec4(0,0,0,time.time()), {"model":model_path,"backend":self.backend_name}, 1800.0) return {"ok": True, "loaded": self.loaded_model, "backend": self.backend_name, "info": info} def unload(self) -> Dict[str, Any]: with self.lock: old = self.loaded_model self.backend.unload() self.loaded_model = None self.field.inject_event("model_unloaded", Vec4(0,0,0,time.time()), {"model":old,"backend":self.backend_name}, 900.0) return {"ok": True, "unloaded": old} def _get_session(self, sid: str) -> SessionState: with self.lock: s = self.sessions.get(sid) if s: s.last_ms = now_ms() return s s = SessionState( session=sid, created_ms=now_ms(), last_ms=now_ms(), cursor=Vec4(0.0,0.0,0.0,time.time()), cancel=threading.Event(), backend=self.backend_name, model=os.path.basename(self.loaded_model or "stub4d"), seed=0, ) self.sessions[sid] = s self.field.inject_event("session_created", s.cursor, {"session":sid}, 1800.0) return s def stop(self, sid: str) -> Dict[str, Any]: s = self._get_session(sid) s.cancel.set() self.field.inject_event("stop", s.cursor, {"session":sid}, 600.0) return {"ok": True, "stopped": sid} def _field_snapshot(self, s: SessionState) -> str: sc = self.field.sample_scalars(s.cursor, radius=3.0) return ( f"[FIELD4D] pos={s.cursor.to_dict()} " f"entropy={sc['entropy']:.3f} curvature={sc['curvature']:.3f} " f"density={sc['density']:.3f} potential={sc['potential']:.3f}" ) def _evolve_cursor(self, s: SessionState, text: str) -> None: sc = self.field.sample_scalars(s.cursor, radius=3.0) entropy = float(sc["entropy"]) curvature = float(sc["curvature"]) dt = 0.25 + 0.75 * entropy mag = (0.05 + 0.25 * entropy) * (0.6 + 0.4 * curvature) h = hashlib.sha256(text.encode("utf-8", errors="ignore")).digest() dx = (h[0] - 128) / 128.0 dy = (h[1] - 128) / 128.0 dz = (h[2] - 128) / 128.0 spin = text.count("!") - text.count(".") dx += 0.02 * spin dy -= 0.01 * spin norm = (dx*dx + dy*dy + dz*dz) ** 0.5 if norm < 1e-6: dx,dy,dz,norm = 1.0,0.0,0.0,1.0 dx,dy,dz = dx/norm, dy/norm, dz/norm s.cursor = Vec4(s.cursor.x + dx*mag, s.cursor.y + dy*mag, s.cursor.z + dz*mag, s.cursor.t + dt) # ---- TOOL EXECUTION ---- def tool_exec(self, tool: str, args: Dict[str, Any], session: SessionState) -> Dict[str, Any]: t = (tool or "").strip() a = args if isinstance(args, dict) else {} # safety caps if len(json_dumps(a)) > 64_000: return {"ok": False, "error": "tool_args_too_large"} if t == "field4d.summary": return {"ok": True, "summary": self.field.summary()} if t == "field4d.query": center = Vec4.from_dict(a.get("center", {}) if isinstance(a.get("center", {}), dict) else session.cursor.to_dict()) radius = float(a.get("radius", 3.0)) limit = int(a.get("limit", 20)) types = a.get("types", None) if types is not None and not isinstance(types, list): types = None evs = self.field.query_events(center, radius, limit=limit, types=types) nodes = self.field.query_nodes(center, radius, limit=limit) sc = self.field.sample_scalars(center, radius=radius) return {"ok": True, "center": center.to_dict(), "radius": radius, "scalars": sc, "events": evs, "nodes": nodes} if t == "field4d.inject_event": etype = str(a.get("type", "event")) pos = Vec4.from_dict(a.get("pos", {}) if isinstance(a.get("pos", {}), dict) else session.cursor.to_dict()) payload = a.get("payload", {}) if not isinstance(payload, dict): payload = {"value": payload} half = float(a.get("half_life_s", 900.0)) ev = self.field.inject_event(etype, pos, payload, half_life_s=half) return {"ok": True, "event": {"id": ev.id, "ts_ms": ev.ts_ms, "type": ev.type, "pos": ev.pos.to_dict(), "payload": ev.payload}} if t == "field4d.upsert_node": node_id = str(a.get("id", "")) if not node_id: return {"ok": False, "error": "missing_node_id"} pos = Vec4.from_dict(a.get("pos", {}) if isinstance(a.get("pos", {}), dict) else session.cursor.to_dict()) attrs = a.get("attrs", {}) if not isinstance(attrs, dict): attrs = {"value": attrs} n = self.field.upsert_node(node_id, pos, attrs) return {"ok": True, "node": {"id": n.id, "pos": n.pos.to_dict(), "attrs": n.attrs}} if t == "field4d.mutate_node": node_id = str(a.get("id", "")) patch = a.get("patch", {}) if not isinstance(patch, dict): patch = {"value": patch} n = self.field.mutate_node(node_id, patch) if not n: return {"ok": False, "error": "node_not_found"} return {"ok": True, "node": {"id": n.id, "pos": n.pos.to_dict(), "attrs": n.attrs}} return {"ok": False, "error": "unknown_tool", "tool": t} # ---- TOOL-CALL LOOP STREAMING ---- def _extract_tool_calls(self, text: str) -> List[Dict[str, Any]]: """ Extract complete TOOL_CALL blocks from a text buffer. Returns list of {"raw":..., "call":{tool,args}} for valid JSON blocks. """ out: List[Dict[str, Any]] = [] start = 0 while True: i = text.find(TOOL_CALL_OPEN, start) if i < 0: break j = text.find(TOOL_CALL_CLOSE, i + len(TOOL_CALL_OPEN)) if j < 0: break raw_json = text[i + len(TOOL_CALL_OPEN):j].strip() start = j + len(TOOL_CALL_CLOSE) try: call = json.loads(raw_json) if isinstance(call, dict) and "tool" in call: out.append({"raw": raw_json, "call": call}) except Exception: continue return out def generate_jsonl( self, model: str, prompt: str, session_id: str, stream: bool, options: Dict[str, Any], ) -> Generator[Dict[str, Any], None, None]: """ Ollama-ish JSONL stream generator: yields dicts per line. Supports tool-call loop by intercepting TOOL_CALL blocks and injecting TOOL_RESULT back into prompt. """ if not isinstance(options, dict): options = {} if len(prompt.encode("utf-8", errors="ignore")) > MAX_PROMPT_BYTES: yield {"error": "prompt_too_large", "done": True} return s = self._get_session(session_id) s.cancel.clear() s.seed = int(options.get("seed", 0) or 0) # ensure model loaded (stub default) with self.lock: if model and model != (self.loaded_model or "") and model != "stub4d": # allow specifying by name/path pass # soft field conditioning + instruct tool-call format (so it's actually used) sys_instr = ( "You may interact with a 4D field. When you need it, emit EXACT tool-call blocks:\n" f"{TOOL_CALL_OPEN}" + '{"tool":"field4d.query","args":{"center":{"x":0,"y":0,"z":0,"t":0},"radius":3}}' + f"{TOOL_CALL_CLOSE}\n" "Wait for TOOL_RESULT, then continue.\n" ) conditioned = prompt.rstrip() + "\n\n" + self._field_snapshot(s) + "\n\n" + sys_instr # log prompt into field self.field.inject_event("prompt", s.cursor, {"session": session_id, "chars": len(prompt)}, 1200.0) # choose backend # model param here is informational; actual loaded model uses /api/pull/load in “real” ollama. # For simplicity: if user asks llama_cpp explicitly, they should have loaded it via /load. backend = self.backend if self.loaded_model is None and backend.name == "stub4d": backend.load("stub4d") # tool-call loop variables tool_loops = 0 max_tool_loops = 6 buffer = "" # for detecting tool calls across chunks t0 = time.time() emitted_chars = 0 def emit_chunk(txt: str) -> Dict[str, Any]: nonlocal emitted_chars emitted_chars += len(txt) return {"model": model or (self.loaded_model or "stub4d"), "created_at": now_ms(), "response": txt, "done": False} # stream from backend for chunk in backend.stream(conditioned, cancel=s.cancel, seed=s.seed, max_tokens=int(options.get("num_predict", MAX_TOKENS_DEFAULT) or MAX_TOKENS_DEFAULT), temperature=float(options.get("temperature", 0.7)), top_p=float(options.get("top_p", 0.95))): if s.cancel.is_set(): yield {"response": "", "done": True, "stopped": True} return if (time.time() - t0) > MAX_STREAM_SECONDS: yield {"response": "", "done": True, "error": "stream_timeout"} return chunk = chunk or "" if chunk: buffer += chunk # send chunk out yield emit_chunk(chunk) # tool-call detection + execution calls = self._extract_tool_calls(buffer) if calls and tool_loops < max_tool_loops: for c in calls: if s.cancel.is_set(): yield {"response": "", "done": True, "stopped": True} return call = c["call"] tool = str(call.get("tool", "")).strip() args = call.get("args", {}) if not isinstance(args, dict): args = {} # execute res = self.tool_exec(tool, args, session=s) # persist a tool event self.field.inject_event("tool", s.cursor, {"session": session_id, "tool": tool, "ok": bool(res.get("ok", False))}, 900.0) # inject tool result back into prompt stream (this is the “interaction”) tool_result_block = f"\n{TOOL_RES_OPEN}{json_dumps(res)}{TOOL_RES_CLOSE}\n" yield emit_chunk(tool_result_block) # update conditioned prompt for continued generation (loop) conditioned = conditioned + buffer + tool_result_block buffer = "" # reset after tool handling tool_loops += 1 # restart backend streaming with updated prompt by breaking out and re-entering # (simplest robust loop for stdlib server) break # If we handled a tool call, restart generation using updated prompt if tool_loops > 0 and not s.cancel.is_set(): # restart: stream again (recursive-ish but iterative) # call backend.stream again continuing until no tool calls for chunk2 in backend.stream(conditioned, cancel=s.cancel, seed=s.seed, max_tokens=int(options.get("num_predict", MAX_TOKENS_DEFAULT) or MAX_TOKENS_DEFAULT), temperature=float(options.get("temperature", 0.7)), top_p=float(options.get("top_p", 0.95))): if s.cancel.is_set(): yield {"response": "", "done": True, "stopped": True} return if (time.time() - t0) > MAX_STREAM_SECONDS: yield {"response": "", "done": True, "error": "stream_timeout"} return chunk2 = chunk2 or "" if chunk2: yield emit_chunk(chunk2) buffer += chunk2 # stop if more tool calls appear; outer loop will catch, but avoid infinite nesting if self._extract_tool_calls(buffer) and tool_loops < max_tool_loops: break # continue outer loop # continue normal streaming # finalize full_text = buffer self.field.inject_event("output", s.cursor, {"session": session_id, "chars": emitted_chars}, 1200.0) self._evolve_cursor(s, full_text or "") yield {"response": "", "done": True, "session": session_id, "cursor": s.cursor.to_dict()} # ----------------------------- # HTTP Server # ----------------------------- class QuantumHTTP(HTTPServer): def __init__(self, addr: Tuple[str, int], handler, core: QuantumCore): super().__init__(addr, handler) self.core = core class Handler(BaseHTTPRequestHandler): server_version = "RomanAILabs4D-Ollama/1.1" def _send_json(self, code: int, obj: Any) -> None: data = json_dumps(obj).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def _read_body(self) -> bytes: try: ln = int(self.headers.get("Content-Length", "0") or "0") except Exception: ln = 0 if ln > MAX_REQUEST_BYTES: raise ValueError("request_too_large") return self.rfile.read(ln) if ln > 0 else b"" def _read_json(self) -> Dict[str, Any]: raw = self._read_body() if not raw: return {} try: obj = json.loads(raw.decode("utf-8", errors="ignore")) return obj if isinstance(obj, dict) else {} except Exception: return {} def _rate_limit(self) -> bool: core: QuantumCore = self.server.core # type: ignore ip = client_ip(self) return core.rate_limit_ok(ip) # ---- GET ---- def do_GET(self) -> None: core: QuantumCore = self.server.core # type: ignore if not self._rate_limit(): return self._send_json(429, {"ok": False, "error": "rate_limited"}) if self.path == "/health": return self._send_json(200, core.health()) if self.path == "/api/version": return self._send_json(200, {"version": "1.1", "name": "RomanAILabs 4D Quantum Offline Ollama Server"}) if self.path == "/api/tags": # Ollama returns installed tags; we return models in models_dir + loaded indicator m = core.list_models() m["loaded_model"] = core.loaded_model m["backend"] = core.backend_name return self._send_json(200, m) if self.path == "/models": return self._send_json(200, core.list_models()) if self.path == "/field4d/summary": return self._send_json(200, {"ok": True, "summary": core.field.summary()}) return self._send_json(404, {"ok": False, "error": "not_found"}) # ---- POST ---- def do_POST(self) -> None: core: QuantumCore = self.server.core # type: ignore if not self._rate_limit(): return self._send_json(429, {"ok": False, "error": "rate_limited"}) try: if self.path == "/load": body = self._read_json() model = str(body.get("model", "stub4d")) backend = body.get("backend", None) kwargs = body.get("kwargs", {}) if not isinstance(kwargs, dict): kwargs = {} out = core.load(model=model, backend=backend, kwargs=kwargs) return self._send_json(200, out) if self.path == "/unload": return self._send_json(200, core.unload()) if self.path == "/api/stop": body = self._read_json() sid = str(body.get("session", "default")) return self._send_json(200, core.stop(sid)) if self.path == "/field4d/inject_event": body = self._read_json() etype = str(body.get("type", "event")) pos = Vec4.from_dict(body.get("pos", {}) if isinstance(body.get("pos", {}), dict) else {}) payload = body.get("payload", {}) if not isinstance(payload, dict): payload = {"value": payload} half = float(body.get("half_life_s", 900.0)) ev = core.field.inject_event(etype, pos, payload, half_life_s=half) return self._send_json(200, {"ok": True, "event": {"id": ev.id, "ts_ms": ev.ts_ms, "type": ev.type, "pos": ev.pos.to_dict(), "payload": ev.payload}}) if self.path == "/field4d/query": body = self._read_json() center = Vec4.from_dict(body.get("center", {}) if isinstance(body.get("center", {}), dict) else {}) radius = float(body.get("radius", 3.0)) limit = int(body.get("limit", 25)) types = body.get("types", None) if types is not None and not isinstance(types, list): types = None evs = core.field.query_events(center, radius, limit=limit, types=types) nodes = core.field.query_nodes(center, radius, limit=limit) sc = core.field.sample_scalars(center, radius=radius) return self._send_json(200, {"ok": True, "center": center.to_dict(), "radius": radius, "scalars": sc, "events": evs, "nodes": nodes}) # Ollama-ish generate (JSONL streaming) if self.path == "/api/generate": body = self._read_json() model = str(body.get("model", core.loaded_model or "stub4d")) prompt = str(body.get("prompt", "")) stream = bool(body.get("stream", True)) session_id = str(body.get("session", "default")) options = body.get("options", {}) if isinstance(body.get("options", {}), dict) else {} # enforce prompt cap again if len(prompt.encode("utf-8", errors="ignore")) > MAX_PROMPT_BYTES: return self._send_json(400, {"error": "prompt_too_large"}) # Start response headers for JSONL streaming self.send_response(200) self.send_header("Content-Type", "application/x-ndjson; charset=utf-8") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.end_headers() # stream lines for obj in core.generate_jsonl(model=model, prompt=prompt, session_id=session_id, stream=stream, options=options): line = (json_dumps(obj) + "\n").encode("utf-8") try: self.wfile.write(line) self.wfile.flush() except BrokenPipeError: break except Exception: break return except ValueError as ve: return self._send_json(400, {"ok": False, "error": str(ve)}) except Exception as e: return self._send_json(500, {"ok": False, "error": str(e), "trace": traceback.format_exc()}) return self._send_json(404, {"ok": False, "error": "not_found"}) # ----------------------------- # Main # ----------------------------- def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--host", default="127.0.0.1") ap.add_argument("--port", type=int, default=DEFAULT_PORT) ap.add_argument("--models-dir", default=os.path.join(os.getcwd(), "models")) ap.add_argument("--data-dir", default=os.path.join(os.path.expanduser("~"), ".local", "share", "romanai_4d_server")) ap.add_argument("--backend", default="stub4d", help="stub4d or llama_cpp") args = ap.parse_args() core = QuantumCore(models_dir=args.models_dir, data_dir=args.data_dir, default_backend=args.backend) srv = QuantumHTTP((args.host, args.port), Handler, core=core) def shutdown(_sig=None, _frame=None): try: print("\n[4D] shutdown") except Exception: pass os._exit(0) signal.signal(signal.SIGINT, shutdown) signal.signal(signal.SIGTERM, shutdown) print(f"[4D] listening: http://{args.host}:{args.port}") print(f"[4D] models_dir: {args.models_dir}") print(f"[4D] data_dir: {args.data_dir}") print(f"[4D] backend: {core.backend_name}") print("[4D] endpoints: /health, /api/version, /api/tags, /api/generate, /api/stop, /field4d/*, /load, /unload") srv.serve_forever() return 0 if __name__ == "__main__": raise SystemExit(main())