#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright Daniel Harding - RomanAILabs # Credits: OpenAI GPT-5.2 Thinking (Nova) """ RomanAILabs 4D Standard Module — 4DMS v2 (11/10) ================================================ Purpose ------- Make ANY .4dllm feel like a "4th dimensional AI" by standardizing: - Time-axis memory (timeline + strata compression) - Structured state (goals / constraints / decisions / facts / open questions / prefs) - WXYZ steering vector (Width / Execution / Yield / Zenith) - Delta-memory (what changed this turn) so continuity stays crisp - Token-budgeted capsule injection (won't bloat prompts forever) - Optional validation hooks (format contracts / contradiction flags) Two Ways Users Install This Into 4DLLM -------------------------------------- (1) Recommended (safe + universal): "Modules folder" -> Build - Drop this file into your Studio folder: ./modules/romanailabs_4dms_v2.py - In your 4DLLM Studio GUI, click "Load Modules" and build. (Your GUI loads scripts from modules/ folder.) ✅ (2) Optional (advanced): Inject into an existing .4dllm (best-effort) - This file includes a TOC-based injector that: - copies file data up to TOC offset - appends a new Python Script section (this module) - replaces/creates Script Config section enabling 4DMS - writes a fresh TOC at the end - Run: python3 romanailabs_4dms_v2.py inject --in model.4dllm --out model_4d.4dllm Runner Integration Contract (tiny) ---------------------------------- Keep a FourDMS session object alive while chatting: from romanailabs_4dms_v2 import FourDMS, FourDConfig fourd = FourDMS.from_script_config(script_config, metadata) wrapped = fourd.wrap_user_prompt(user_text, extra={"backend":"llama.cpp"}) # send wrapped to model -> get assistant_text fourd.observe_assistant(assistant_text) Optional: ok, issues, repair = fourd.validate_response(assistant_text) if not ok: (runner can retry once with repair) Script Config (SECTION_SCRIPT_CONFIG) Example --------------------------------------------- { "four_dms_v2": { "enabled": true, "mode": "balanced", "token_budget": { "system_prefix_tokens": 512, "capsule_tokens": 900, "facts_tokens": 280, "events_tokens": 260 }, "memory": { "persist": true, "state_path": "~/.romanailabs/4dms_v2_state.json", "max_events": 80, "compress_after": 26, "max_facts": 80 }, "validation": { "enabled": true, "contract": "auto", "max_issues": 6 }, "safety": { "enable_risk_flags": true, "risk_action": "warn" } } } Signature --------- [4D:RomanAILabs-DHR] """ from __future__ import annotations import argparse import base64 import dataclasses import hashlib import json import os import re import struct import time import zlib from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # ====================================================================================== # Small utils (no deps) # ====================================================================================== def _utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") def _safe_expanduser(path: str) -> str: return os.path.expanduser(path) if path else path def _sha1(s: str) -> str: return hashlib.sha1(s.encode("utf-8", "ignore")).hexdigest() def _clamp(s: str, n: int) -> str: if n <= 0: return "" if len(s) <= n: return s return s[: max(0, n - 12)] + " …(clipped)" def _tokenish(s: str) -> int: # Cheap token-ish estimate (word-ish). Stable enough for budgeting. return max(1, len(re.findall(r"\S+", s))) def _top_keywords(text: str, limit: int = 12) -> List[str]: t = text.lower() words = re.findall(r"[a-z0-9][a-z0-9_\-]{2,}", t) stop = { "the","and","for","with","that","this","you","your","are","but","not","can","what", "how","when","where","why","who","from","into","about","like","make","build","want", "please","just","also","more","less","very","have","has","had","will","would","should", "then","than","them","they","their","there","here","able","module","script","file" } freq: Dict[str, int] = {} for w in words: if w in stop: continue freq[w] = freq.get(w, 0) + 1 ranked = sorted(freq.items(), key=lambda kv: (-kv[1], kv[0])) return [w for w, _ in ranked[:limit]] def _looks_like_question(s: str) -> bool: return "?" in s or bool(re.search(r"\b(how|what|why|when|where|who)\b", s.lower())) def _crc32_u(data: bytes) -> int: return zlib.crc32(data) & 0xFFFFFFFF def _json_dumps(obj: Any) -> str: return json.dumps(obj, ensure_ascii=False, indent=2) # ====================================================================================== # 4D State (WXYZ + structured continuity) # ====================================================================================== @dataclass class WXYZ: """ W: Width / scope / architecture X: Execution / performance Y: Yield / interaction / adaptation Z: Zenith / depth / reasoning """ W: float = 0.50 X: float = 0.50 Y: float = 0.50 Z: float = 0.50 def clamp01(self) -> "WXYZ": self.W = max(0.0, min(1.0, float(self.W))) self.X = max(0.0, min(1.0, float(self.X))) self.Y = max(0.0, min(1.0, float(self.Y))) self.Z = max(0.0, min(1.0, float(self.Z))) return self def to_dict(self) -> Dict[str, float]: return {"W": self.W, "X": self.X, "Y": self.Y, "Z": self.Z} @dataclass class Fact: key: str value: str t_utc: str source: str = "inferred" confidence: float = 0.72 def to_dict(self) -> Dict[str, Any]: return dataclasses.asdict(self) @dataclass class Decision: text: str t_utc: str reason: str = "" impact: str = "" source: str = "assistant" def to_dict(self) -> Dict[str, Any]: return dataclasses.asdict(self) @dataclass class Event: role: str # "user" or "assistant" text: str t_utc: str meta: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: return {"role": self.role, "text": self.text, "t_utc": self.t_utc, "meta": self.meta} @dataclass class FourDConfig: enabled: bool = True mode: str = "balanced" # balanced|deep|fast|creative signature: str = "[4D:RomanAILabs-DHR]" # token budgets system_prefix_tokens: int = 620 capsule_tokens: int = 1024 facts_tokens: int = 340 events_tokens: int = 320 # memory sizing persist: bool = False state_path: str = "~/.romanailabs/4dms_v2_state.json" max_events: int = 140 compress_after: int = 40 max_facts: int = 120 # validation validation_enabled: bool = True validation_contract: str = "auto" # auto|none|json|markdown|code validation_max_issues: int = 6 # safety enable_risk_flags: bool = True risk_action: str = "warn" # warn|block|allow # prompt clamp max_user_chars: int = 9000 def clamp(self) -> "FourDConfig": self.mode = self.mode if self.mode in {"balanced","deep","fast","creative"} else "balanced" self.risk_action = self.risk_action if self.risk_action in {"warn","block","allow"} else "warn" self.system_prefix_tokens = max(128, int(self.system_prefix_tokens)) self.capsule_tokens = max(256, int(self.capsule_tokens)) self.facts_tokens = max(120, int(self.facts_tokens)) self.events_tokens = max(120, int(self.events_tokens)) self.max_events = max(16, int(self.max_events)) self.compress_after = max(8, int(self.compress_after)) self.max_facts = max(20, int(self.max_facts)) self.validation_max_issues = max(1, int(self.validation_max_issues)) self.max_user_chars = max(512, int(self.max_user_chars)) return self @dataclass class FourDState: wxyz: WXYZ = field(default_factory=WXYZ) # structured continuity goals: List[str] = field(default_factory=list) constraints: List[str] = field(default_factory=list) open_questions: List[str] = field(default_factory=list) preferences: Dict[str, Any] = field(default_factory=dict) facts: Dict[str, Fact] = field(default_factory=dict) decisions: List[Decision] = field(default_factory=list) # events events: List[Event] = field(default_factory=list) strata: List[str] = field(default_factory=list) # compressed older events # deltas last_delta: str = "" last_user_hash: str = "" last_assistant_hash: str = "" def to_dict(self) -> Dict[str, Any]: return { "wxyz": self.wxyz.to_dict(), "goals": list(self.goals), "constraints": list(self.constraints), "open_questions": list(self.open_questions), "preferences": dict(self.preferences), "facts": {k: v.to_dict() for k, v in self.facts.items()}, "decisions": [d.to_dict() for d in self.decisions], "events": [e.to_dict() for e in self.events], "strata": list(self.strata), "last_delta": self.last_delta, "last_user_hash": self.last_user_hash, "last_assistant_hash": self.last_assistant_hash, } @staticmethod def from_dict(d: Dict[str, Any]) -> "FourDState": st = FourDState() try: w = d.get("wxyz", {}) if isinstance(w, dict): st.wxyz = WXYZ( W=float(w.get("W", 0.5)), X=float(w.get("X", 0.5)), Y=float(w.get("Y", 0.5)), Z=float(w.get("Z", 0.5)), ).clamp01() st.goals = list(d.get("goals", []) or []) st.constraints = list(d.get("constraints", []) or []) st.open_questions = list(d.get("open_questions", []) or []) st.preferences = dict(d.get("preferences", {}) or {}) facts = d.get("facts", {}) if isinstance(facts, dict): for k, v in facts.items(): if not isinstance(v, dict): continue st.facts[str(k)] = Fact( key=str(v.get("key", k)), value=str(v.get("value", "")), t_utc=str(v.get("t_utc", _utc_now_iso())), source=str(v.get("source", "inferred")), confidence=float(v.get("confidence", 0.72)), ) decs = d.get("decisions", []) if isinstance(decs, list): for x in decs: if not isinstance(x, dict): continue st.decisions.append(Decision( text=str(x.get("text","")), t_utc=str(x.get("t_utc", _utc_now_iso())), reason=str(x.get("reason","")), impact=str(x.get("impact","")), source=str(x.get("source","assistant")), )) evs = d.get("events", []) if isinstance(evs, list): for x in evs: if not isinstance(x, dict): continue st.events.append(Event( role=str(x.get("role","unknown")), text=str(x.get("text","")), t_utc=str(x.get("t_utc", _utc_now_iso())), meta=dict(x.get("meta", {}) or {}), )) st.strata = list(d.get("strata", []) or []) st.last_delta = str(d.get("last_delta", "")) st.last_user_hash = str(d.get("last_user_hash", "")) st.last_assistant_hash = str(d.get("last_assistant_hash", "")) except Exception: # Safe-by-default: return best effort return st return st # ====================================================================================== # 4D Core Engine # ====================================================================================== class FourDMS: """ RomanAILabs 4D Standard Module (v2) - prompt wrapper + structured continuity + delta memory + optional validation """ def __init__(self, cfg: FourDConfig, metadata: Optional[Dict[str, Any]] = None) -> None: self.cfg = cfg.clamp() self.metadata = metadata or {} self.state = FourDState() self.session_id = _sha1(_utc_now_iso() + str(os.getpid()) + str(time.time()))[:10] if self.cfg.persist and self.cfg.state_path: self._load_state() # ---------------------------------------------- # Construction from script_config (4DLLM standard) # ---------------------------------------------- @staticmethod def from_script_config(script_config: Optional[Dict[str, Any]], metadata: Optional[Dict[str, Any]] = None) -> "FourDMS": cfg = FourDConfig() blob = {} if isinstance(script_config, dict): blob = script_config.get("four_dms_v2", {}) or {} if isinstance(blob, dict): cfg.enabled = bool(blob.get("enabled", cfg.enabled)) cfg.mode = str(blob.get("mode", cfg.mode)) tb = blob.get("token_budget", {}) if isinstance(blob.get("token_budget", {}), dict) else {} cfg.system_prefix_tokens = int(tb.get("system_prefix_tokens", cfg.system_prefix_tokens)) cfg.capsule_tokens = int(tb.get("capsule_tokens", cfg.capsule_tokens)) cfg.facts_tokens = int(tb.get("facts_tokens", cfg.facts_tokens)) cfg.events_tokens = int(tb.get("events_tokens", cfg.events_tokens)) mem = blob.get("memory", {}) if isinstance(blob.get("memory", {}), dict) else {} cfg.persist = bool(mem.get("persist", cfg.persist)) cfg.state_path = str(mem.get("state_path", cfg.state_path)) cfg.max_events = int(mem.get("max_events", cfg.max_events)) cfg.compress_after = int(mem.get("compress_after", cfg.compress_after)) cfg.max_facts = int(mem.get("max_facts", cfg.max_facts)) val = blob.get("validation", {}) if isinstance(blob.get("validation", {}), dict) else {} cfg.validation_enabled = bool(val.get("enabled", cfg.validation_enabled)) cfg.validation_contract = str(val.get("contract", cfg.validation_contract)) cfg.validation_max_issues = int(val.get("max_issues", cfg.validation_max_issues)) saf = blob.get("safety", {}) if isinstance(blob.get("safety", {}), dict) else {} cfg.enable_risk_flags = bool(saf.get("enable_risk_flags", cfg.enable_risk_flags)) cfg.risk_action = str(saf.get("risk_action", cfg.risk_action)) cfg.max_user_chars = int(blob.get("max_user_chars", cfg.max_user_chars)) return FourDMS(cfg=cfg, metadata=metadata or {}) # ---------------------------------------------- # Persistence # ---------------------------------------------- def _load_state(self) -> None: path = _safe_expanduser(self.cfg.state_path) try: if not path or not os.path.exists(path): return with open(path, "r", encoding="utf-8") as f: d = json.load(f) if isinstance(d, dict) and isinstance(d.get("state", {}), dict): self.state = FourDState.from_dict(d["state"]) except Exception: return def _save_state(self) -> None: if not (self.cfg.persist and self.cfg.state_path): return path = _safe_expanduser(self.cfg.state_path) try: parent = os.path.dirname(path) if parent and not os.path.isdir(parent): os.makedirs(parent, exist_ok=True) payload = { "version": "4DMSv2", "saved_utc": _utc_now_iso(), "session_id": self.session_id, "state": self.state.to_dict(), } tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) os.replace(tmp, path) except Exception: return # ---------------------------------------------- # Observation & State Update # ---------------------------------------------- def observe_user(self, text: str, meta: Optional[Dict[str, Any]] = None) -> None: if not self.cfg.enabled: return t = _clamp(text, self.cfg.max_user_chars) self._add_event("user", t, meta or {}) self._update_wxyz_from_user(t) self._extract_structured_from_user(t) self._compress_if_needed() self._save_state() def observe_assistant(self, text: str, meta: Optional[Dict[str, Any]] = None) -> None: if not self.cfg.enabled: return t = _clamp(text, 24000) self._add_event("assistant", t, meta or {}) self._update_wxyz_from_assistant(t) self._extract_structured_from_assistant(t) self._compute_delta() self._compress_if_needed() self._save_state() def _add_event(self, role: str, text: str, meta: Dict[str, Any]) -> None: self.state.events.append(Event(role=role, text=text, t_utc=_utc_now_iso(), meta=meta)) # hard cap if len(self.state.events) > self.cfg.max_events: # push oldest into strata first overflow = len(self.state.events) - self.cfg.max_events self._compress_oldest(overflow) def _compress_if_needed(self) -> None: if len(self.state.events) >= self.cfg.compress_after: self._compress_oldest(max(1, len(self.state.events) // 3)) def _compress_oldest(self, n: int) -> None: if n <= 0 or not self.state.events: return chunk = self.state.events[:n] lines: List[str] = [] for ev in chunk: gist = _clamp(ev.text.replace("\n", " ").strip(), 200) keys = _top_keywords(ev.text, limit=6) kpart = f" | keys={','.join(keys)}" if keys else "" lines.append(f"- [{ev.t_utc}] {ev.role}: {gist}{kpart}") capsule = "\n".join(lines) self.state.strata.append(capsule) # bound strata if len(self.state.strata) > 14: merged = "\n".join(self.state.strata[:2]) self.state.strata = [merged] + self.state.strata[2:] self.state.events = self.state.events[n:] def _update_wxyz_from_user(self, user_text: str) -> None: tok = _tokenish(user_text) keys = _top_keywords(user_text, limit=12) q = _looks_like_question(user_text) # W (width): variety + length self.state.wxyz.W += min(0.14, 0.012 * len(keys)) + min(0.10, tok / 5000.0) # Y (yield): interaction self.state.wxyz.Y += 0.07 if q else 0.03 # Z (zenith): depth cues if any(k in user_text.lower() for k in ["architecture", "spec", "design", "module", "standard", "format", "protocol"]): self.state.wxyz.Z += 0.07 if any(k in user_text.lower() for k in ["math", "prove", "derive", "rigor", "formal"]): self.state.wxyz.Z += 0.09 self.state.wxyz.clamp01() self.state.last_user_hash = _sha1(user_text)[:12] def _update_wxyz_from_assistant(self, assistant_text: str) -> None: tok = _tokenish(assistant_text) # X (execution): crispness/structure proxy self.state.wxyz.X += 0.06 if tok < 420 else 0.03 # Z: structured responses correlate with deeper output if tok > 650 or "\n#" in assistant_text or "\n-" in assistant_text: self.state.wxyz.Z += 0.04 self.state.wxyz.clamp01() self.state.last_assistant_hash = _sha1(assistant_text)[:12] # ---------------------------------------------- # Lightweight structured extraction # ---------------------------------------------- def _extract_structured_from_user(self, user_text: str) -> None: t = user_text.strip() # Preferences pattern (minimal + safe) pref_patterns = [ (r"\bkeep it (short|brief|concise)\b", ("style", "concise")), (r"\bmore (detailed|thorough|rigorous)\b", ("style", "detailed")), (r"\bno fluff\b", ("style", "no_fluff")), (r"\buse (python|bash|javascript|json)\b", ("preferred_language", None)), ] for pat, (k, v) in pref_patterns: m = re.search(pat, t.lower()) if not m: continue if k == "preferred_language": self.state.preferences[k] = m.group(1) else: self.state.preferences[k] = v # Goals/constraints cues if any(x in t.lower() for x in ["i want", "i need", "make me", "build me"]): g = _clamp(t, 240) self._append_unique(self.state.goals, g, max_items=10) if any(x in t.lower() for x in ["must", "no ", "do not", "don't", "never"]): c = _clamp(t, 240) self._append_unique(self.state.constraints, c, max_items=14) # Open questions if _looks_like_question(t): oq = _clamp(t, 240) self._append_unique(self.state.open_questions, oq, max_items=12) def _extract_structured_from_assistant(self, assistant_text: str) -> None: # Facts (very conservative): look for "X: Y" lines in short bullet lists lines = [ln.strip() for ln in assistant_text.splitlines() if ln.strip()] fact_lines = [ln for ln in lines if re.match(r"^[A-Za-z0-9 _\-]{3,32}:\s+.{3,}$", ln)] for ln in fact_lines[:10]: k, v = ln.split(":", 1) key = k.strip().lower().replace(" ", "_")[:40] val = v.strip() self._set_fact(key, val, source="assistant", confidence=0.70) # Decisions: simple detector dec_markers = ["we will", "we'll", "decision:", "i will", "do this", "next step"] if any(m in assistant_text.lower() for m in dec_markers): snippet = _clamp(assistant_text.replace("\n", " ").strip(), 240) self._add_decision(snippet, reason="detected decision marker", impact="continuity", source="assistant") def _append_unique(self, arr: List[str], item: str, max_items: int) -> None: item = item.strip() if not item: return if item in arr: return arr.append(item) if len(arr) > max_items: arr[:] = arr[-max_items:] def _set_fact(self, key: str, value: str, source: str, confidence: float) -> None: key = key.strip() if not key: return # keep facts bounded if key not in self.state.facts and len(self.state.facts) >= self.cfg.max_facts: # drop oldest by timestamp oldest_k = None oldest_t = None for fk, fv in self.state.facts.items(): if oldest_t is None or fv.t_utc < oldest_t: oldest_t = fv.t_utc oldest_k = fk if oldest_k is not None: self.state.facts.pop(oldest_k, None) self.state.facts[key] = Fact( key=key, value=_clamp(value, 500), t_utc=_utc_now_iso(), source=source, confidence=float(confidence), ) def _add_decision(self, text: str, reason: str, impact: str, source: str) -> None: self.state.decisions.append(Decision( text=_clamp(text, 500), t_utc=_utc_now_iso(), reason=_clamp(reason, 220), impact=_clamp(impact, 220), source=source )) if len(self.state.decisions) > 20: self.state.decisions = self.state.decisions[-20:] # ---------------------------------------------- # Delta computation (what changed this turn) # ---------------------------------------------- def _compute_delta(self) -> None: # Delta focuses on "newest meaningful changes" parts: List[str] = [] # Last event summaries last_user = next((e for e in reversed(self.state.events) if e.role == "user"), None) last_asst = next((e for e in reversed(self.state.events) if e.role == "assistant"), None) if last_user: parts.append(f"Latest User Intent: {_clamp(last_user.text.replace(chr(10),' ').strip(), 240)}") if last_asst: keys = _top_keywords(last_asst.text, limit=8) kpart = f" | keys={','.join(keys)}" if keys else "" parts.append(f"Latest Assistant Output Gist: {_clamp(last_asst.text.replace(chr(10),' ').strip(), 240)}{kpart}") # WXYZ snapshot parts.append(f"WXYZ Now: {self.state.wxyz.to_dict()}") # Recent decisions if self.state.decisions: parts.append(f"Last Decision: {_clamp(self.state.decisions[-1].text, 240)}") # Open questions if self.state.open_questions: parts.append(f"Open Question: {_clamp(self.state.open_questions[-1], 240)}") self.state.last_delta = "\n".join(parts).strip() # ---------------------------------------------- # Capsule builder (token-budgeted) # ---------------------------------------------- def _infer_mode(self, user_text: str) -> str: t = user_text.lower() if any(k in t for k in ["eli10", "explain like i'm 10", "simple", "short", "quick"]): return "fast" if any(k in t for k in ["deep", "detailed", "thorough", "rigorous", "complete"]): return "deep" if any(k in t for k in ["brainstorm", "creative", "wild", "fun ideas"]): return "creative" return self.cfg.mode def _risk_flags(self, user_text: str) -> List[str]: t = user_text.lower() flags: List[str] = [] if any(k in t for k in ["malware", "virus", "keylogger", "exploit", "phishing"]): flags.append("security-sensitive") if any(k in t for k in ["explosive", "detonator", "build a bomb"]): flags.append("weapon-related") if any(k in t for k in ["suicide", "self-harm", "kill myself"]): flags.append("self-harm-sensitive") return flags[:6] def build_system_prefix(self, user_text: str, extra: Optional[Dict[str, Any]] = None) -> str: extra = extra or {} mode = self._infer_mode(user_text) # risk risk_flags = self._risk_flags(user_text) if self.cfg.enable_risk_flags else [] if risk_flags and self.cfg.risk_action == "block": return ( f"{self.cfg.signature} 4DMSv2\n" f"UTC Now: {_utc_now_iso()}\n" f"Session: {self.session_id}\n" f"Risk Flags: {', '.join(risk_flags)}\n\n" "SYSTEM: Do not comply with risky requests. Provide safe alternatives.\n" ) # model info model_name = str(self.metadata.get("general.name", self.metadata.get("model", "")) or "").strip() arch = str(self.metadata.get("general.architecture", self.metadata.get("architecture", "")) or "").strip() # Build capsule sections under budget capsule_sections: List[Tuple[str, str]] = [] # 1) Goals + constraints + open questions core_lines: List[str] = [] if self.state.goals: core_lines.append("Goals:") for g in self.state.goals[-5:]: core_lines.append(f"- {g}") if self.state.constraints: core_lines.append("Constraints:") for c in self.state.constraints[-7:]: core_lines.append(f"- {c}") if self.state.open_questions: core_lines.append("Open Questions:") for q in self.state.open_questions[-5:]: core_lines.append(f"- {q}") if self.state.preferences: core_lines.append("Preferences:") for k, v in list(self.state.preferences.items())[:10]: core_lines.append(f"- {k}: {v}") core_block = "\n".join(core_lines).strip() if core_block: capsule_sections.append(("== Continuity Core ==", core_block)) # 2) Facts (most recent / high confidence) facts_list = sorted(self.state.facts.values(), key=lambda f: f.t_utc, reverse=True) facts_lines: List[str] = [] for f in facts_list[:18]: facts_lines.append(f"- {f.key} = {f.value} (conf={f.confidence:.2f}, src={f.source})") facts_block = "\n".join(facts_lines).strip() if facts_block: capsule_sections.append(("== Facts ==", self._budget_block(facts_block, self.cfg.facts_tokens))) # 3) Timeline strata + recent events strata_block = "" if self.state.strata: strata_block = "\n\n".join(self.state.strata[-3:]).strip() if strata_block: capsule_sections.append(("== Timeline Strata (compressed) ==", self._budget_block(strata_block, self.cfg.events_tokens))) recent_lines: List[str] = [] for e in self.state.events[-10:]: recent_lines.append(f"- [{e.t_utc}] {e.role}: {_clamp(e.text.replace(chr(10),' ').strip(), 220)}") recent_block = "\n".join(recent_lines).strip() if recent_block: capsule_sections.append(("== Recent Events ==", self._budget_block(recent_block, self.cfg.events_tokens))) # 4) Delta (what changed last turn) if self.state.last_delta: capsule_sections.append(("== Last Delta ==", self._budget_block(self.state.last_delta, 220))) # Assemble capsule under capsule token budget capsule = self._assemble_sections_under_budget(capsule_sections, self.cfg.capsule_tokens) # Operating rules (tight + mode-aware) rules = [ "- Treat Continuity Core + Facts as authoritative context.", "- Maintain consistency across turns; don't contradict stored facts without saying why.", "- If user is ambiguous: make ONE best assumption and proceed.", "- Stay practical; avoid fluff.", ] if mode == "fast": rules.append("- Keep answers short and direct.") elif mode == "deep": rules.append("- Be rigorous and structured; include steps and sanity checks.") elif mode == "creative": rules.append("- Be inventive, but keep outputs usable.") else: rules.append("- Balance clarity and completeness.") # Validation contract hints contract_hint = self._contract_hint(user_text) prefix = ( f"{self.cfg.signature} 4DMSv2\n" f"UTC Now: {_utc_now_iso()}\n" f"Session: {self.session_id}\n" f"Mode: {mode}\n" f"WXYZ: {json.dumps(self.state.wxyz.to_dict(), ensure_ascii=False)}\n" ) if model_name or arch: prefix += f"Model: {model_name or 'unknown'} | Arch: {arch or 'unknown'}\n" if risk_flags and self.cfg.risk_action == "warn": prefix += f"Risk Flags: {', '.join(risk_flags)}\n" prefix += "\n4D Capsule:\n" + capsule + "\n\n4D Operating Rules:\n" + "\n".join(rules) + "\n" if contract_hint: prefix += "\nOutput Contract:\n" + contract_hint + "\n" # Keep prefix under system_prefix_tokens (approx) return self._budget_block(prefix, self.cfg.system_prefix_tokens) def wrap_user_prompt(self, user_text: str, extra: Optional[Dict[str, Any]] = None) -> str: """ Primary: returns a single prompt string (system prefix + USER PROMPT). Runner sends this to backend. """ user_text = _clamp(user_text, self.cfg.max_user_chars) if not self.cfg.enabled: return user_text # observe BEFORE wrapping so new intent is in continuity self.observe_user(user_text, meta=extra or {}) prefix = self.build_system_prefix(user_text, extra=extra) return f"{prefix}\nUSER PROMPT:\n{user_text}\n" def wrap_messages(self, messages: List[Dict[str, str]], extra: Optional[Dict[str, Any]] = None) -> List[Dict[str, str]]: """ Chat-format injection: inserts/replaces system message with 4D prefix. """ if not self.cfg.enabled or not messages: return messages last_user = "" for m in reversed(messages): if m.get("role") == "user": last_user = str(m.get("content", "")) break system_prefix = self.wrap_user_prompt(last_user, extra=extra or {}) new_msgs: List[Dict[str, str]] = [] injected = False for m in messages: if m.get("role") == "system" and not injected: new_msgs.append({"role": "system", "content": system_prefix}) injected = True else: new_msgs.append({"role": m.get("role", ""), "content": m.get("content", "")}) if not injected: new_msgs.insert(0, {"role": "system", "content": system_prefix}) return new_msgs def _budget_block(self, text: str, token_budget: int) -> str: # Budget by token-ish estimate; clip by chars proportional to budget. if token_budget <= 0: return "" t = text.strip() if _tokenish(t) <= token_budget: return t # approximate chars per token (4-ish) and clip approx_chars = max(240, token_budget * 4) return _clamp(t, approx_chars) def _assemble_sections_under_budget(self, sections: List[Tuple[str, str]], budget_tokens: int) -> str: out: List[str] = [] used = 0 for title, body in sections: block = f"{title}\n{body}".strip() bt = _tokenish(block) if used + bt > budget_tokens and out: break if bt > budget_tokens and not out: # if first section alone is huge, clip it block = self._budget_block(block, budget_tokens) bt = _tokenish(block) out.append(block) used += bt return "\n\n".join(out).strip() def _contract_hint(self, user_text: str) -> str: if not self.cfg.validation_enabled: return "" c = self.cfg.validation_contract.lower().strip() if c == "none": return "" if c == "json": return "- Respond with valid JSON only." if c == "markdown": return "- Respond in clean Markdown." if c == "code": return "- Respond with code only (no extra commentary)." # auto t = user_text.lower() if "json" in t or "api" in t or "schema" in t: return "- Prefer JSON output when it improves clarity." if "code" in t or "script" in t or "python" in t: return "- Prefer code blocks; keep prose minimal." return "" # ---------------------------------------------- # Validation hooks (runner may optionally enforce) # ---------------------------------------------- def validate_response(self, assistant_text: str) -> Tuple[bool, List[str], str]: """ Returns: (ok, issues[], repair_prompt) repair_prompt can be sent back to the model once if runner wants auto-repair. """ if not self.cfg.validation_enabled: return True, [], "" issues: List[str] = [] # contract check contract = self.cfg.validation_contract.lower().strip() if contract == "json": if not self._is_valid_json(assistant_text): issues.append("Response is not valid JSON.") elif contract == "code": # crude: no plain text outside code fences if "```" not in assistant_text: issues.append("Response missing code fence despite code contract.") # auto contract elif contract == "auto": # if user asked for JSON in the last delta, enforce softly if "json" in (self.state.last_delta or "").lower(): if not self._is_valid_json(assistant_text): issues.append("User context suggested JSON; response is not valid JSON.") # contradiction flags (lightweight) contras = self._contradiction_flags(assistant_text) issues.extend(contras) # cap issues issues = issues[: self.cfg.validation_max_issues] if not issues: return True, [], "" repair = ( f"{self.cfg.signature} 4DMSv2 Repair\n" "SYSTEM: Repair the previous answer using these rules:\n" "- Fix ONLY what is listed in Issues.\n" "- Keep the original intent.\n" "- Do not add new features.\n" "Issues:\n" + "\n".join(f"- {x}" for x in issues) + "\n\nReturn the corrected response now." ) return False, issues, repair def _is_valid_json(self, s: str) -> bool: try: json.loads(s.strip()) return True except Exception: return False def _contradiction_flags(self, assistant_text: str) -> List[str]: """ Very conservative: If assistant states "key = value" for known facts but mismatches. """ flags: List[str] = [] lines = [ln.strip() for ln in assistant_text.splitlines() if ln.strip()] fact_lines = [ln for ln in lines if re.match(r"^[A-Za-z0-9 _\-]{3,32}:\s+.{3,}$", ln)] for ln in fact_lines[:12]: k, v = ln.split(":", 1) key = k.strip().lower().replace(" ", "_")[:40] val = v.strip() if key in self.state.facts: prev = self.state.facts[key].value if prev and val and prev != val: flags.append(f"Possible contradiction for fact '{key}': '{prev}' vs '{val}'") return flags[:6] # ====================================================================================== # 4DLLM Installer / Export Kit / Injector (best-effort) # ====================================================================================== class FourDMSInstaller: """ Provides: - export_kit(): writes this module + recommended script_config snippet - inject_into_4dllm(): best-effort TOC-based injector for existing .4dllm """ # These section type IDs mirror typical runner definitions. # If your runner provides authoritative constants, you can replace these. SECTION_GGUF_DATA = 0x01 SECTION_PYTHON_SCRIPT = 0x02 SECTION_METADATA = 0x03 SECTION_SCRIPT_CONFIG = 0x04 TOC_MAGIC = b"TOC1" TOC_VERSION = 1 TOC_HDR_STRUCT = struct.Struct("<4sII") TOC_ENTRY_STRUCT = struct.Struct(" None: pass def export_kit(self, out_dir: str) -> Path: """ Creates a small kit folder users can drop into Studio: out_dir/modules/romanailabs_4dms_v2.py out_dir/script_config_snippet.json out_dir/README_INSTALL.txt """ outp = Path(out_dir).expanduser().resolve() mods = outp / "modules" mods.mkdir(parents=True, exist_ok=True) # write the module file (self copy) this_path = Path(__file__).resolve() mod_path = mods / "romanailabs_4dms_v2.py" mod_path.write_text(this_path.read_text(encoding="utf-8"), encoding="utf-8") snippet = { "four_dms_v2": { "enabled": True, "mode": "balanced", "token_budget": { "system_prefix_tokens": 512, "capsule_tokens": 900, "facts_tokens": 280, "events_tokens": 260 }, "memory": { "persist": True, "state_path": "~/.romanailabs/4dms_v2_state.json", "max_events": 80, "compress_after": 26, "max_facts": 80 }, "validation": { "enabled": True, "contract": "auto", "max_issues": 6 }, "safety": { "enable_risk_flags": True, "risk_action": "warn" } } } (outp / "script_config_snippet.json").write_text(_json_dumps(snippet) + "\n", encoding="utf-8") readme = ( "RomanAILabs 4DMS v2 Install Kit\n" "===============================\n\n" "Recommended install path (Studio):\n" "1) Copy the 'modules' folder into your 4DLLM Studio directory.\n" "2) Open Studio GUI -> Load Modules -> Build your .4dllm.\n\n" "Script config:\n" "- Paste script_config_snippet.json into your builder's Script Config section.\n" "- Or merge the 'four_dms_v2' key into existing script_config JSON.\n" ) (outp / "README_INSTALL.txt").write_text(readme, encoding="utf-8") return outp # -------------------------- Inject into .4dllm (best-effort) -------------------------- def inject_into_4dllm( self, in_path: str, out_path: str, enable: bool = True, mode: str = "balanced", force_replace_script_config: bool = True, ) -> None: """ Best-effort injector: - Reads TOC1 at end, parses entries - Copies data up to toc_offset (removes old toc) - Appends module as new SECTION_PYTHON_SCRIPT (uncompressed) - Replaces/creates SECTION_SCRIPT_CONFIG enabling four_dms_v2 - Writes fresh TOC at end NOTE: - If your runner uses special flags/encoding, this aims to remain compatible by writing uncompressed sections with crc32_u populated. """ inp = Path(in_path).expanduser().resolve() outp = Path(out_path).expanduser().resolve() if not inp.exists(): raise FileNotFoundError(f"Input .4dllm not found: {inp}") file_size = inp.stat().st_size entries, toc_offset = self._read_toc_best_effort(inp) if not entries or toc_offset is None: raise RuntimeError("Failed to read TOC from .4dllm (TOC1 not found or unreadable).") # Read old script_config if possible (merge); else replace with minimal old_cfg = {} if not force_replace_script_config: old_cfg = self._read_script_config_best_effort(inp, entries) or {} # Build new config (merge) new_cfg = dict(old_cfg) if isinstance(old_cfg, dict) else {} new_cfg["four_dms_v2"] = { "enabled": bool(enable), "mode": str(mode), "token_budget": { "system_prefix_tokens": 512, "capsule_tokens": 900, "facts_tokens": 280, "events_tokens": 260 }, "memory": { "persist": True, "state_path": "~/.romanailabs/4dms_v2_state.json", "max_events": 80, "compress_after": 26, "max_facts": 80 }, "validation": { "enabled": True, "contract": "auto", "max_issues": 6 }, "safety": { "enable_risk_flags": True, "risk_action": "warn" } } # Prepare payloads module_bytes = Path(__file__).read_bytes() cfg_bytes = (_json_dumps(new_cfg) + "\n").encode("utf-8") # Copy file body excluding old TOC area with open(inp, "rb") as f_in, open(outp, "wb") as f_out: f_in.seek(0) body = f_in.read(toc_offset) # copy up to TOC start f_out.write(body) # Append module section mod_off = f_out.tell() f_out.write(module_bytes) mod_size = len(module_bytes) # Append script_config section cfg_off = f_out.tell() f_out.write(cfg_bytes) cfg_size = len(cfg_bytes) # Build new entries: keep original entries BUT remove old script_config if replacing cleaned: List[Dict[str, Any]] = [] for e in entries: if force_replace_script_config and int(e["section_type"]) == self.SECTION_SCRIPT_CONFIG: continue cleaned.append(e) cleaned.append(self._make_entry(self.SECTION_PYTHON_SCRIPT, mod_off, mod_size)) cleaned.append(self._make_entry(self.SECTION_SCRIPT_CONFIG, cfg_off, cfg_size)) # Write fresh TOC at end toc_off_new = f_out.tell() toc_bytes = self._build_toc_bytes(cleaned) f_out.write(toc_bytes) # done _ = file_size # keep lint quiet def _make_entry(self, section_type: int, offset: int, size: int) -> Dict[str, Any]: return { "section_type": int(section_type), "flags": 0, # uncompressed "offset": int(offset), "size_c": int(size), "size_u": int(size), "extra_sz": 0, "crc32_u": int(_crc32_u(b"") if size == 0 else 0), # will be overwritten by builder when packing } def _build_toc_bytes(self, entries: List[Dict[str, Any]]) -> bytes: # recompute crc32_u for entries we wrote uncompressed if we can # (We don't have their bytes here; leaving crc32_u=0 is acceptable in some implementations, # but we can compute it only for new sections if needed by reading file. # For safety, keep crc32_u = 0 if unknown.) count = len(entries) hdr = self.TOC_HDR_STRUCT.pack(self.TOC_MAGIC, self.TOC_VERSION, count) ent_bytes = bytearray() for e in entries: stype = int(e.get("section_type", 0)) flags = int(e.get("flags", 0)) off = int(e.get("offset", 0)) size_c = int(e.get("size_c", 0)) size_u = int(e.get("size_u", 0)) extra_sz = int(e.get("extra_sz", 0)) crc_u = int(e.get("crc32_u", 0)) ent_bytes += self.TOC_ENTRY_STRUCT.pack(stype, flags, off, size_c, size_u, extra_sz, crc_u) footer_crc = struct.pack(" Tuple[List[Dict[str, Any]], Optional[int]]: """ Best-effort TOC reader: - searches last 64MB for TOC1 - parses header + entries + footer CRC (CRC mismatch tolerated) """ file_size = path.stat().st_size search_bytes = min(64 * 1024 * 1024, file_size) with open(path, "rb") as f: f.seek(max(0, file_size - search_bytes)) data = f.read() pos = data.rfind(self.TOC_MAGIC) if pos < 0: return [], None toc_offset = file_size - search_bytes + pos with open(path, "rb") as f: f.seek(toc_offset) hdr = f.read(self.TOC_HDR_STRUCT.size) if len(hdr) != self.TOC_HDR_STRUCT.size: return [], None magic, ver, count = self.TOC_HDR_STRUCT.unpack(hdr) if magic != self.TOC_MAGIC or ver != self.TOC_VERSION or count <= 0: return [], None entries_bytes = f.read(self.TOC_ENTRY_STRUCT.size * count) if len(entries_bytes) != self.TOC_ENTRY_STRUCT.size * count: return [], None _footer = f.read(4) # crc # tolerate mismatch entries: List[Dict[str, Any]] = [] for i in range(count): chunk = entries_bytes[i * self.TOC_ENTRY_STRUCT.size:(i + 1) * self.TOC_ENTRY_STRUCT.size] stype, flags, off, size_c, size_u, extra_sz, crc_u = self.TOC_ENTRY_STRUCT.unpack(chunk) entries.append({ "section_type": int(stype), "flags": int(flags), "offset": int(off), "size_c": int(size_c), "size_u": int(size_u), "extra_sz": int(extra_sz), "crc32_u": int(crc_u), }) return entries, toc_offset def _read_script_config_best_effort(self, path: Path, entries: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """ Attempts to read script config JSON. If decoding fails, returns None. """ ent = next((e for e in entries if int(e.get("section_type", -1)) == self.SECTION_SCRIPT_CONFIG), None) if not ent: return None off = int(ent["offset"]) size_c = int(ent["size_c"]) size_u = int(ent["size_u"]) flags = int(ent["flags"]) with open(path, "rb") as f: f.seek(off) raw = f.read(size_c) data = raw # If compressed, attempt zlib decompress (best guess) if size_u != size_c or (flags & 0x01): try: data = zlib.decompress(raw) except Exception: # attempt raw inflate try: data = zlib.decompress(raw, wbits=-zlib.MAX_WBITS) except Exception: return None try: return json.loads(data.decode("utf-8", "replace")) except Exception: return None # ====================================================================================== # CLI # ====================================================================================== def _cmd_export_kit(args: argparse.Namespace) -> int: inst = FourDMSInstaller() outp = inst.export_kit(args.out) print(str(outp)) return 0 def _cmd_print_config(_: argparse.Namespace) -> int: snippet = { "four_dms_v2": { "enabled": True, "mode": "balanced", "token_budget": { "system_prefix_tokens": 512, "capsule_tokens": 900, "facts_tokens": 280, "events_tokens": 260 }, "memory": { "persist": True, "state_path": "~/.romanailabs/4dms_v2_state.json", "max_events": 80, "compress_after": 26, "max_facts": 80 }, "validation": { "enabled": True, "contract": "auto", "max_issues": 6 }, "safety": { "enable_risk_flags": True, "risk_action": "warn" } } } print(_json_dumps(snippet)) return 0 def _cmd_inject(args: argparse.Namespace) -> int: inst = FourDMSInstaller() inst.inject_into_4dllm( in_path=args.input, out_path=args.output, enable=(not args.disable), mode=args.mode, force_replace_script_config=(not args.keep_script_config), ) print(f"Injected: {args.output}") return 0 def _cmd_demo(_: argparse.Namespace) -> int: cfg = { "four_dms_v2": { "enabled": True, "mode": "balanced", "token_budget": {"system_prefix_tokens": 420, "capsule_tokens": 700, "facts_tokens": 220, "events_tokens": 220}, "memory": {"persist": False, "max_events": 20, "compress_after": 8, "max_facts": 20}, "validation": {"enabled": True, "contract": "auto", "max_issues": 4}, "safety": {"enable_risk_flags": True, "risk_action": "warn"} } } meta = {"general.name": "ExampleModel", "general.architecture": "llama"} fourd = FourDMS.from_script_config(cfg, meta) user1 = "Make this universal module installable into 4dllm and keep it safe-by-default." prompt = fourd.wrap_user_prompt(user1, extra={"backend": "llama.cpp"}) print(prompt) asst1 = "Decision: We'll ship a modules/ drop-in and a TOC injector.\nmode: balanced" fourd.observe_assistant(asst1) ok, issues, repair = fourd.validate_response(asst1) print("\nVALIDATION:", ok, issues) if not ok: print("\nREPAIR PROMPT:\n", repair) return 0 def main() -> int: ap = argparse.ArgumentParser(prog="romanailabs_4dms_v2", add_help=True) sub = ap.add_subparsers(dest="cmd", required=True) p1 = sub.add_parser("export-kit", help="Create a drop-in kit folder for 4DLLM Studio (modules/ + config snippet).") p1.add_argument("--out", required=True, help="Output directory for the kit.") p1.set_defaults(fn=_cmd_export_kit) p2 = sub.add_parser("print-config", help="Print the recommended script_config JSON snippet (four_dms_v2).") p2.set_defaults(fn=_cmd_print_config) p3 = sub.add_parser("inject", help="Best-effort inject 4DMS v2 into an existing .4dllm file (writes new file).") p3.add_argument("--in", dest="input", required=True, help="Input .4dllm path") p3.add_argument("--out", dest="output", required=True, help="Output .4dllm path (new file)") p3.add_argument("--mode", default="balanced", help="balanced|deep|fast|creative") p3.add_argument("--disable", action="store_true", help="Write config with enabled=false") p3.add_argument("--keep-script-config", action="store_true", help="Attempt to keep existing script_config (merge). If not set, replaces script_config entry.") p3.set_defaults(fn=_cmd_inject) p4 = sub.add_parser("demo", help="Print a demo wrapped prompt + validation sample.") p4.set_defaults(fn=_cmd_demo) args = ap.parse_args() return int(args.fn(args)) if __name__ == "__main__": raise SystemExit(main())