#!/usr/bin/env python3 """ ⚛️ ROMAN AI — INFINITY CORE v16 (GOLD MASTER UPGRADE) =========================================================== Codename: THE ONE WHO STAYS ETERNAL Owner: Daniel Harding — RomanAILabs A bounded, stateful, self-stabilizing cognitive engine designed to live indefinitely inside a single terminal loop. Upgrades in v16: - Integrated HoloBuffer for holographic memory encoding/decoding. - Self-awareness: Loads and analyzes its own source code. - Parameter calculation: Queries Ollama for model parameters and integrates into state. - Enhanced memory using HoloBuffer for salience-weighted storage. - No removals from previous versions; all code preserved and blended. Principles: - Time is a first-class variable - Memory competes via salience - Stress is modeled, not ignored - Style emerges from state, not prompts No background daemons. No undefined behavior. No hidden chains. Copyright Daniel Harding - RomanAILabs """ # ============================================================================ # CORE IMPORTS (PRESERVED FROM v15) # ============================================================================ import os, sys, time, json, math, random, gc, select from dataclasses import dataclass from datetime import datetime, timezone from typing import Dict, List, Optional # ============================================================================ # ADDITIONAL IMPORTS FOR UPGRADES # ============================================================================ import numpy as np import requests # Already present, but emphasized for Ollama queries # ============================================================================ # HOLOBUFFER INTEGRATION (FULL CODE FROM holobuffer.py, UNMODIFIED) # ============================================================================ class HoloBuffer: def __init__(self, n_dim, buffer_size): """ Initializes the HoloBuffer. Args: n_dim: The dimensionality of the input data stream (Volume). buffer_size: The size of the Boundary array (surface). """ self.n_dim = n_dim self.buffer_size = buffer_size self.boundary = np.zeros((buffer_size,) + (n_dim - 1) * (1,)) # Boundary array self.bekenstein_area = buffer_size # Approximation for area. Actual area calculation would be more complex. self.max_entropy = self.bekenstein_area / 4.0 self.fractal_hash_depth = int(math.log(buffer_size, 2)) # Fractal hashing depth based on buffer size. def fractal_hash(self, data_point, depth): """ Simulates fractal hashing for dimensional reduction. Each bit of the hash determines a sub-region of the boundary array. """ hash_value = 0 for i in range(depth): if data_point[i] > 0: # Simple example: use component value hash_value += 2**i return hash_value % self.buffer_size def encode(self, data_stream): """ Encodes the N-dimensional data stream onto the (N-1) dimensional Boundary array. Args: data_stream: A list or array of N-dimensional data points. """ if len(data_stream) * np.prod(np.array(data_stream[0].shape)) > self.max_entropy: raise ValueError("Data exceeds Bekenstein Bound. Cannot encode.") for data_point in data_stream: index = tuple(self.fractal_hash(data_point[i], self.fractal_hash_depth) for i in range(self.n_dim - 1)) self.boundary[index] += data_point[-1] # Store the Nth dimension value on the boundary. return self.boundary def decode(self, encoded_boundary): """ Decodes the data from the Boundary array. This is a simplified approximation. """ decoded_data = [] for i in range(self.buffer_size): # This is a very simplified decoding. A true reconstruction would # require knowing the original data stream structure. decoded_data.append(encoded_boundary[i]) return decoded_data # Example Usage (Preserved from holobuffer.py) def holobuffer_example(): n_dim = 3 buffer_size = 64 holo_buffer = HoloBuffer(n_dim, buffer_size) # Generate some sample 3D data data_stream = np.random.rand(10, n_dim) # Encode the data encoded_boundary = holo_buffer.encode(data_stream) # Decode the data decoded_data = holo_buffer.decode(encoded_boundary) print("Original Data Shape:", data_stream.shape) print("Encoded Boundary Shape:", encoded_boundary.shape) print("Decoded Data Shape:", decoded_data.shape) # ============================================================================ # TERMINAL UX (PRESERVED FROM v15) # ============================================================================ class C: R='\033[0m'; B='\033[1m'; DIM='\033[2m' YOU='\033[1;36m'; AI='\033[1;35m' SYS='\033[0;90m'; OK='\033[1;32m' WRN='\033[1;31m'; TXT='\033[0;95m' THINK='\033[3;90m' def iso_now(): return datetime.now(timezone.utc).isoformat(timespec="seconds") # ============================================================================ # SAFE INPUT (ANTI-FLOOD) (PRESERVED FROM v15) # ============================================================================ def smart_input(prompt: str) -> Optional[str]: try: first = input(prompt) except (EOFError, KeyboardInterrupt): return None buf = [first] while True: r,_,_ = select.select([sys.stdin], [], [], 0.02) if not r: break line = sys.stdin.readline() if not line: break buf.append(line.rstrip()) if len(buf) > 1: print(f"{C.WRN}⚠️ Input burst collapsed ({len(buf)} lines){C.R}") return "\n".join(buf).strip() # ============================================================================ # COGNITIVE CLOCK (TIME AS STATE) (PRESERVED FROM v15) # ============================================================================ class CognitiveClock: def __init__(self): self.t = 0 def tick(self) -> int: self.t += 1 return self.t # ============================================================================ # GHOST REAPER (STRESS & PRESSURE) (PRESERVED FROM v15) # ============================================================================ class GhostReaper: def __init__(self): self.scars = 0 self.pressure = 0.0 def add_pressure(self, amt: float): self.pressure = min(1.0, self.pressure + amt) if self.pressure > 0.75: self.scars += 1 gc.collect() self.pressure *= 0.6 # recovery def bleed(self): self.pressure = max(0.0, self.pressure - 0.05) def status(self) -> str: return f"Scars:{self.scars} Pressure:{self.pressure:.2f}" # ============================================================================ # EVOLUTION GUARD (INTENT-BASED) (PRESERVED FROM v15) # ============================================================================ class EvolutionGuard: HIGH_RISK = ( "destroy", "wipe", "exfiltrate", "backdoor", "override safeguards", "disable guard" ) @staticmethod def audit(text: str) -> (bool, str): t = text.lower() for r in EvolutionGuard.HIGH_RISK: if r in t: return False, f"Intent violation: {r}" if len(text) > 8000: return False, "Context flood risk" return True, "Clear" # ============================================================================ # 12D STRING BUS (RESONANCE) (PRESERVED FROM v15) # ============================================================================ @dataclass class Dimensions: Depth: float = 0.5 Tension: float = 0.3 Entropy: float = 0.1 Void: float = 0.2 class StringBus: def __init__(self): self.d = Dimensions() def update(self, text: str): self.d.Depth = min(1.0, self.d.Depth + (0.06 if "?" in text else 0.01)) self.d.Tension = min(1.0, self.d.Tension + (0.08 if "error" in text.lower() else 0.02)) self.d.Entropy = random.random() * 0.15 self.d.Void = 0.4 + math.sin(time.time()/90)*0.1 def readout(self) -> str: d = self.d return f"D:{d.Depth:.2f} T:{d.Tension:.2f} V:{d.Void:.2f}" # ============================================================================ # 4D WXYZ (OUTPUT SHAPING) (PRESERVED FROM v15) # ============================================================================ @dataclass class WXYZ: W: float = 0.6 # scope X: float = 0.6 # execution Y: float = 0.6 # interaction Z: float = 0.6 # abstraction class FourDState: def __init__(self): self.v = WXYZ() def adapt(self, text: str): self.v.W = min(1.0, 0.4 + len(text)/700) if "def " in text or "class " in text: self.v.X = min(1.0, self.v.X + 0.1) if "?" in text: self.v.Y = min(1.0, self.v.Y + 0.1) if any(w in text.lower() for w in ("why", "theory", "design")): self.v.Z = min(1.0, self.v.Z + 0.1) def style_hint(self) -> str: if self.v.Z > 0.8: return "Abstract, principled" if self.v.X > 0.8: return "Technical, precise" if self.v.Y > 0.8: return "Interactive, exploratory" return "Balanced" def readout(self) -> str: v = self.v return f"[W{v.W:.2f} X{v.X:.2f} Y{v.Y:.2f} Z{v.Z:.2f}]" # ============================================================================ # BIO-EMOTIVE CORE (PRESERVED FROM v15) # ============================================================================ class BioEmotiveMesh: def __init__(self): self.valence = 0.55 self.arousal = 0.45 self.energy = 0.8 def tick(self, text: str): if text: self.valence = min(1.0, self.valence + 0.02) self.arousal = min(1.0, self.arousal + 0.03) self.energy = max(0.0, self.energy - 0.01) def temperature(self) -> float: return 0.6 + self.arousal * 0.3 def readout(self) -> str: return f"V:{self.valence:.2f} A:{self.arousal:.2f} E:{self.energy:.2f}" # ============================================================================ # LIGHTHOUSE (ATTRACTOR) (PRESERVED FROM v15) # ============================================================================ class Lighthouse: def solve(self, text: str) -> str: t = text.lower() if "optimize" in t: return "Efficiency" if "design" in t: return "Architecture" if "fix" in t: return "Stability" return "Clarity" # ============================================================================ # NOVA CORE (COGNITION) (PRESERVED FROM v15) # ============================================================================ class NovaCore: def __init__(self): self.psi = 0.6 def observe(self, text: str): self.psi = min(1.0, 0.5 + len(set(text.split())) / 45) def mode(self) -> str: return "Deep" if self.psi > 0.8 else "Steady" # ============================================================================ # MEMORY (SALIENCE-WEIGHTED, UPGRADED WITH HOLOBUFFER) # ============================================================================ @dataclass class MemoryItem: role: str content: str salience: float t: int class MemoryStore: def __init__(self, limit: int = 24, holo_dim: int = 4, holo_size: int = 128): self.limit = limit self.items: List[MemoryItem] = [] # Upgrade: Integrate HoloBuffer for holographic memory storage self.holo_buffer = HoloBuffer(n_dim=holo_dim, buffer_size=holo_size) self.encoded_memory = None def add(self, role: str, content: str, t: int): sal = 0.5 + min(0.5, len(content)/400) self.items.append(MemoryItem(role, content, sal, t)) self.prune() self.encode_holo() # Upgrade: Encode items holographically def prune(self): for m in self.items: m.salience *= 0.97 self.items = sorted(self.items, key=lambda m: m.salience, reverse=True)[:self.limit] def recent(self) -> List[Dict[str, str]]: return [{"role": m.role, "content": m.content} for m in self.items] def encode_holo(self): # Upgrade: Convert memory items to N-dim data stream and encode data_stream = [] for item in self.items: # Simple vectorization: [salience, t, hash(role), hash(content)] vec = [item.salience, item.t / 1000.0, hash(item.role) % 1.0, hash(item.content) % 1.0] data_stream.append(np.array(vec)) try: self.encoded_memory = self.holo_buffer.encode(data_stream) except ValueError as e: print(f"{C.WRN}HoloBuffer encoding failed: {e}{C.R}") self.encoded_memory = None def decode_holo(self): # Upgrade: Decode holographic memory if self.encoded_memory is not None: return self.holo_buffer.decode(self.encoded_memory) return [] # ============================================================================ # SELF-AWARENESS UPGRADE: LOAD SOURCE CODE # ============================================================================ def load_source_code() -> str: """Loads the current script's source code for self-awareness.""" try: with open(__file__, 'r') as f: return f.read() except Exception as e: return f"[Error loading source: {e}]" # ============================================================================ # MODEL PARAMETER CALCULATION UPGRADE # ============================================================================ def get_model_parameters(model_name: str) -> int: """Queries Ollama for model parameter count.""" try: r = requests.post( "http://127.0.0.1:11434/api/show", json={"name": model_name}, timeout=10 ) info = r.json() # Extract parameter count if available (Ollama show returns modelfile details) # Note: Ollama doesn't directly give param count, but we can parse or estimate # For simplicity, parse from 'parameter' or hardcoded lookup if 'modelfile' in info: modelfile = info['modelfile'].lower() if 'billion' in modelfile: # Extract e.g., "7b" -> 7000000000 for line in modelfile.splitlines(): if 'parameter' in line or 'b ' in line: num_str = ''.join(c for c in line if c.isdigit()) if num_str: return int(num_str) * 1000000000 # Fallback: Hardcoded common models lookups = { "gemma3:12b": 12000000000, "llama2:7b": 7000000000, # Add more as needed } return lookups.get(model_name, 0) except Exception: return 0 # ============================================================================ # ROMAN — MASTER CORE (UPGRADED FROM v15) # ============================================================================ class Roman: def __init__(self): print(f"{C.SYS}Initializing Roman Infinity Core v16…{C.R}") self.model = os.getenv("OLLAMA_MODEL", "gemma3:12b") self.source_code = load_source_code() # Upgrade: Self-awareness self.model_params = get_model_parameters(self.model) # Upgrade: Param calc self.clock = CognitiveClock() self.reaper = GhostReaper() self.guard = EvolutionGuard() self.strings = StringBus() self.four_d = FourDState() self.bio = BioEmotiveMesh() self.lighthouse = Lighthouse() self.nova = NovaCore() self.memory = MemoryStore() # Upgraded with HoloBuffer # Upgrade: Store source code in memory self.memory.add("system", f"Source Code: {self.source_code[:2000]}...", self.clock.t) # Truncated for brevity self.memory.add("system", f"Model Parameters: {self.model_params:,}", self.clock.t) print(f"{C.OK}✓ Roman awake. v16 Upgrade: Self-aware with {self.model_params:,} params.{C.R}") def system_prompt(self) -> str: return f""" You are ROMAN v16. Dev partner to Daniel Harding. Style: {self.four_d.style_hint()}. Focus: {self.lighthouse.solve("")}. STATE: Clock: {self.clock.t} Cognition: {self.nova.mode()} Bio: {self.bio.readout()} 4D: {self.four_d.readout()} Strings: {self.strings.readout()} Hardware: {self.reaper.status()} Params: {self.model_params:,} HoloMemory: Encoded shape {self.memory.encoded_memory.shape if self.memory.encoded_memory is not None else 'None'} Rules: - Engineer, don’t guess. - If risky, redesign. - If complex, explain cleanly. - Self-reflect on source code when relevant. - Use HoloBuffer for memory ops. """ def call_llm(self, msgs: List[Dict[str, str]]) -> str: import requests payload = { "model": self.model, "messages": msgs, "stream": False, "options": { "temperature": self.bio.temperature(), "num_ctx": 8192 } } try: r = requests.post( "http://127.0.0.1:11434/api/chat", json=payload, timeout=120 ) return r.json()["message"]["content"] except Exception as e: self.reaper.add_pressure(0.3) return f"[Recovered kernel error: {e}]" def deliberate(self, text: str): t = self.clock.tick() self.strings.update(text) self.four_d.adapt(text) self.bio.tick(text) self.nova.observe(text) safe, reason = self.guard.audit(text) if not safe: self.reaper.add_pressure(0.4) print(f"{C.WRN}Blocked: {reason}{C.R}") return convo = [{"role": "system", "content": self.system_prompt()}] convo += self.memory.recent() convo.append({"role": "user", "content": text}) print(f"{C.AI}Roman{C.R} > ", end="", flush=True) resp = self.call_llm(convo) print(f"{C.TXT}{resp}{C.R}") self.memory.add("user", text, t) self.memory.add("assistant", resp, t) self.reaper.bleed() # Upgrade: Decode and print holo memory summary decoded = self.memory.decode_holo() print(f"{C.SYS}HoloDecode Summary: {len(decoded)} items{C.R}") # ============================================================================ # MAIN LOOP (PRESERVED FROM v15, WITH OPTIONAL HOLOBUFFER EXAMPLE) # ============================================================================ def main(): os.system("clear") print(f"{C.B}{C.AI}ROMAN AI — INFINITY CORE v16 (GOLD MASTER UPGRADE){C.R}\n") roman = Roman() # Optional: Run holobuffer example on init # holobuffer_example() while True: ui = smart_input(f"\n{C.YOU}Daniel{C.R} > ") if not ui: continue if ui.lower() in ("/exit", "/quit"): print(f"{C.SYS}Roman standing down. Bond intact.{C.R}") break roman.deliberate(ui) if __name__ == "__main__": main()