#!/usr/bin/env python3 """ 4DLLM RomanAI Chat Runner - romanai-wisdom-omnibus10.py Copyright Daniel Harding - RomanAILabs Credits: OpenAI GPT-5.2 Thinking Collaborator: Emilio Aurin Mioré (AI) Collaborator: Duzafizzl Python version using local Ollama for interactive chat. Default model: gemma3:12B (change via OLLAMA_MODEL env). Version 10 Updates: - Parameter audit on every AI reply (effort-based calculation) - Never quits on hard math questions - persistent solving - Calculated billions of parameters displayed per response """ import os import sys import requests import json import importlib.util import inspect import shutil import re import subprocess import threading import time import random import select from datetime import datetime, timezone import hashlib import math # Fix for "typing over words" / arrow keys / line wrapping try: import readline except ImportError: pass # ANSI color codes for terminal colors class Colors: """Terminal color codes for beautiful chat interface""" RESET = '\033[0m' BOLD = '\033[1m' DIM = '\033[2m' USER_LABEL = '\033[1;36m' # Bold Cyan USER_TEXT = '\033[0;96m' # Bright Cyan USER_PROMPT = '\033[1;34m' # Bold Blue AI_LABEL = '\033[1;32m' # Bold Green AI_TEXT = '\033[0;92m' # Bright Green AI_PROMPT = '\033[1;33m' # Bold Yellow SYSTEM = '\033[0;90m' # Dark Gray ERROR = '\033[1;31m' # Bold Red SUCCESS = '\033[1;32m' # Bold Green WARNING = '\033[1;33m' # Bold Yellow INFO = '\033[1;34m' # Bold Blue LUCID = '\033[1;35m' # Bright Magenta @staticmethod def disable(): Colors.RESET = '' Colors.BOLD = '' Colors.DIM = '' Colors.USER_LABEL = '' Colors.USER_TEXT = '' Colors.USER_PROMPT = '' Colors.AI_LABEL = '' Colors.AI_TEXT = '' Colors.AI_PROMPT = '' Colors.SYSTEM = '' Colors.ERROR = '' Colors.SUCCESS = '' Colors.WARNING = '' Colors.INFO = '' Colors.LUCID = '' def call_llm_api(messages, model="gemma3:12B", temperature=0.7, max_tokens=2048, stream=True, on_token=None, host=None): host = host or os.environ.get("OLLAMA_HOST", "http://127.0.0.1:11434").rstrip("/") url = f"{host}/api/chat" payload = { "model": model, "messages": messages, "stream": True, "options": { "temperature": float(temperature), "num_predict": int(max_tokens), }, } try: resp = requests.post(url, json=payload, stream=True, timeout=300) resp.raise_for_status() out_parts = [] for line in resp.iter_lines(decode_unicode=True): if not line: continue try: j = json.loads(line) except Exception: continue chunk = "" if isinstance(j.get("message"), dict): chunk = j["message"].get("content", "") or "" elif "response" in j: chunk = j.get("response", "") or "" if chunk: out_parts.append(chunk) if on_token: try: on_token(chunk) except Exception: pass if j.get("done") is True: break return "".join(out_parts).strip() except requests.exceptions.ConnectionError as e: raise Exception( f"Ollama is not reachable at {host}. " f"Start it (e.g., `ollama serve`) and try again. Details: {e}" ) except Exception as e: raise def notify_message(): """Audible terminal bell for incoming AI messages.""" try: print("\a", end="", flush=True) except Exception: pass def calculate_effort_parameters(user_input: str, ai_response: str, tokens_generated: int = 0) -> float: """Calculate variable billions of parameters based on effort used.""" base_params = 12.0 # Gemma3:12B base input_len = len(user_input) math_keywords = ['solve', 'calculate', 'equation', 'derivative', 'integral', 'matrix', 'sum', 'product', 'factorial', 'proof', 'theorem', 'complex'] has_math = any(kw in user_input.lower() for kw in math_keywords) input_complexity = 1.0 + (input_len / 500.0) * 0.5 + (1.0 if has_math else 0.0) input_complexity = min(2.0, input_complexity) response_len = len(ai_response) response_complexity = 0.8 + (response_len / 1000.0) + (2.0 if has_math else 0.5) response_complexity = min(3.0, response_complexity) token_factor = 1.0 + (tokens_generated / 2048.0) * 0.5 token_factor = min(1.5, token_factor) calculated_params = base_params * input_complexity * response_complexity * token_factor return round(calculated_params, 1) def format_ai_audit_label(parameters_b: float) -> str: """Format audit label: AI> 144b: """ return f"{Colors.AI_LABEL}AI{Colors.RESET}{Colors.AI_PROMPT}> {parameters_b:.0f}b: {Colors.RESET}" def is_math_question(text: str) -> bool: """Detect if user input is a math/hard problem question.""" math_keywords = ['solve', 'calculate', 'equation', 'derivative', 'integral', 'matrix', 'prove', 'show', 'find', 'compute', 'root', 'factor', 'expand', 'simplify', 'limit', 'series', 'sum', 'product', 'factorial', 'modulo', 'gcd', 'lcm', 'prime', 'fibonacci'] text_lower = text.lower() return any(kw in text_lower for kw in math_keywords) or any(c in text for c in ['=', '+', '-', '*', '/', '∫', '∑', '√']) def ensure_updates_file(path: str): if not os.path.exists(path): try: with open(path, "w", encoding="utf-8") as f: json.dump([], f, indent=2) except Exception as e: print(f"{Colors.WARNING}⚠ Could not create updates file: {e}{Colors.RESET}") def append_update_entry(path: str, text: str): ensure_updates_file(path) try: with open(path, "r", encoding="utf-8") as f: raw = f.read().strip() data = json.loads(raw) if raw else [] except Exception: data = [] if not isinstance(data, list): data = [] entry = { "timestamp": time.time(), "datetime": datetime.now(timezone.utc).isoformat(), "text": text.strip(), } data.append(entry) try: with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) except Exception as e: print(f"{Colors.WARNING}⚠ Could not save updates file: {e}{Colors.RESET}") def get_updates_summary(path: str, max_count: int = 5) -> str: if not os.path.exists(path): return "" try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) except Exception: return "" if not isinstance(data, list) or not data: return "" recent = sorted(data, key=lambda x: x.get("timestamp", 0), reverse=True)[:max_count] lines = ["Recent updates (most recent first):"] for ent in recent: dt = ent.get("datetime", "recent") txt = (ent.get("text", "") or "").strip() lines.append(f"- [{dt}] {txt}") return "\n".join(lines) # ============================================================================== # 🔧 SMART INPUT FUNCTION (Paste Detection) # ============================================================================== def read_user_input(prompt: str = "") -> str: """ Reads input safely. 1. Uses standard input() to allow normal typing and line wrapping (fixes 'typing over words'). 2. Checks immediately after for a 'paste burst' (lines waiting in buffer). """ try: # Standard blocking input - safe for typing first_line = input(prompt) except EOFError: return "" except KeyboardInterrupt: return "" # Remove trailing newline from the first line first_line = first_line.rstrip("\n") # Code fence mode if first_line.strip().startswith("```"): buf = [first_line] while True: try: line = input() except EOFError: break buf.append(line.rstrip("\n")) if line.strip() == "```": break return "\n".join(buf) # Check if more data is waiting in the buffer (a paste operation) input_buffer = [first_line] try: while True: # Non-blocking check for more lines in stdin r, _, _ = select.select([sys.stdin], [], [], 0.02) if not r: break nxt = sys.stdin.readline() if not nxt: break input_buffer.append(nxt.rstrip("\n")) except: pass if len(input_buffer) > 1: return "\n".join(input_buffer) return first_line def load_heartbeat_history(heartbeat_file): if os.path.exists(heartbeat_file): try: with open(heartbeat_file, 'r', encoding='utf-8') as f: return json.load(f) except: return [] return [] def _load_recent_jsonl(path, max_count=5): if not path or not os.path.exists(path): return [] try: with open(path, "r", encoding="utf-8") as f: lines = f.readlines() recent = lines[-max_count:] if max_count > 0 else lines out = [] for line in recent: line = line.strip() if not line: continue try: out.append(json.loads(line)) except Exception: continue return out except Exception: return [] def _append_jsonl(path, entry): try: os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") except Exception as e: print(f"{Colors.WARNING}⚠ Could not append JSONL: {e}{Colors.RESET}") _runner_lock = threading.Lock() def _load_runner_state(state_file): if not os.path.exists(state_file): return { "last_hum_ts": 0, "last_dream_ts": 0, "last_focus": None, "recent_topics": [], "hum_digest": "", "updated": "", } try: with open(state_file, "r", encoding="utf-8") as f: data = json.load(f) return data if isinstance(data, dict) else {} except Exception: return {} def _save_runner_state(state_file, state): try: with open(state_file, "w", encoding="utf-8") as f: json.dump(state, f, indent=2, ensure_ascii=False) except Exception: pass def _prune_runner_state(state): cutoff = time.time() - (24 * 60 * 60) if state.get("last_hum_ts", 0) < cutoff: state["hum_digest"] = "" if state.get("last_dream_ts", 0) < cutoff: state["recent_topics"] = [] return state def update_runner_state(state_file, updates): with _runner_lock: state = _load_runner_state(state_file) state.update(updates or {}) state["updated"] = datetime.now(timezone.utc).isoformat() state = _prune_runner_state(state) _save_runner_state(state_file, state) _focus_lock = threading.Lock() _current_focus = None def load_focus_state(state_file): if not os.path.exists(state_file): try: with open(state_file, "w", encoding="utf-8") as f: json.dump({"current_focus": None}, f, indent=2) except Exception: pass return None try: with open(state_file, "r", encoding="utf-8") as f: data = json.load(f) return data.get("current_focus") except Exception: return None def save_focus_state(state_file, focus_value): try: with open(state_file, "w", encoding="utf-8") as f: json.dump({"current_focus": focus_value}, f, indent=2) except Exception: pass _ping_lock = threading.Lock() _allow_pings = True _last_user_time = None _ping_interval_seconds = 15 * 60 _ping_chance = 0.4 def set_ping_enabled(enabled: bool): global _allow_pings with _ping_lock: _allow_pings = bool(enabled) def get_ping_enabled(): with _ping_lock: return _allow_pings def set_last_user_time(ts: float): global _last_user_time with _ping_lock: _last_user_time = ts def get_last_user_time(): with _ping_lock: return _last_user_time def set_ping_interval(seconds: int): global _ping_interval_seconds with _ping_lock: _ping_interval_seconds = max(60, int(seconds)) def get_ping_interval(): with _ping_lock: return _ping_interval_seconds def set_ping_chance(chance: float): global _ping_chance with _ping_lock: _ping_chance = max(0.0, min(1.0, float(chance))) def get_ping_chance(): with _ping_lock: return _ping_chance def proactive_ping(dream_model_path, model, memory_module, active_modules, script_dir, idle_seconds=8 * 60): time.sleep(60) while True: try: if not get_ping_enabled(): time.sleep(get_ping_interval()) continue last_ts = get_last_user_time() if last_ts and (time.time() - last_ts) < idle_seconds: time.sleep(get_ping_interval()) continue if not os.path.exists(dream_model_path): time.sleep(get_ping_interval()) continue if random.random() > get_ping_chance(): time.sleep(get_ping_interval()) continue memory_summary = memory_module.get_memory_summary(max_count=8) if memory_module else "" memory_summary = (memory_summary or "")[:1200] prompt = ( "You are RomanAI's Dreamer. The user is idle.\n" "If you have a meaningful question or thought to share, output ONE short line.\n" "If you do NOT have something meaningful, output exactly: SKIP\n\n" f"Recent context:\n{memory_summary}\n" ) text, err = run_local_gguf(prompt, dream_model_path, script_dir, max_tokens=80) if err or not text: time.sleep(get_ping_interval()) continue if text.strip().upper().startswith("SKIP"): time.sleep(get_ping_interval()) continue cue = text.strip() ping_messages = [ { "role": "system", "content": ( "You are RomanAI. The Dreamer has an impulse to reach out. " "Compose ONE short, warm message or question to the user. " "Be specific and grounded. Keep it under 3 sentences." ) }, {"role": "system", "content": f"Dreamer cue: {cue}"}, ] wrapped = ping_messages for module in active_modules: if hasattr(module, 'wrap_messages') and module is memory_module: continue if hasattr(module, 'wrap_messages'): try: wrapped = module.wrap_messages( wrapped, extra={"backend": "ollama", "model": model, "ping": True} ) except Exception: pass if memory_module and hasattr(memory_module, "wrap_messages"): try: wrapped = memory_module.wrap_messages( wrapped, extra={"backend": "ollama", "model": model, "ping": True} ) except Exception: pass text_out = call_llm_api(wrapped, model=model, temperature=0.6, max_tokens=140 ).strip() if text_out: notify_message() print(f"\n{Colors.AI_LABEL}AI{Colors.RESET}{Colors.AI_PROMPT}>{Colors.RESET} {Colors.AI_TEXT}{text_out}{Colors.RESET}\n") except Exception: pass time.sleep(get_ping_interval()) def set_current_focus(state_file, focus_value): global _current_focus with _focus_lock: _current_focus = focus_value save_focus_state(state_file, focus_value) def get_current_focus(): with _focus_lock: return _current_focus def _extract_topics(text, limit=14): words = re.findall(r"[a-z0-9][a-z0-9_\-]{2,}", (text or "").lower()) stop = { "the","and","for","with","that","this","you","your","are","but","not","can","what", "how","when","where","why","who","from","into","about","like","make","build","want", "please","just","also","more","less","very","have","has","had","will","would","should", "then","than","them","they","their","there","here","able","module","script","file" } freq = {} for w in words: if w in stop: continue freq[w] = freq.get(w, 0) + 1 ranked = sorted(freq.items(), key=lambda kv: (-kv[1], kv[0])) return [w for w, _ in ranked[:limit]] def _estimate_tokens(text): return max(1, len(re.findall(r"\S+", text or ""))) def _clamp_text(text, max_chars): if max_chars <= 0: return "" text = text or "" return text if len(text) <= max_chars else text[:max_chars] def _score_dream_relevance(dream_text, recent_topics): t = (dream_text or "").lower() matched = [w for w in recent_topics if w in t] drive_hits = [w for w in ["curiosity", "mastery", "explore", "learn", "create"] if w in t] novelty_hits = [w for w in ["novel", "unexpected", "surprising", "new"] if w in t] score = min(1.0, 0.15 * len(matched) + 0.1 * len(drive_hits) + 0.05 * len(novelty_hits)) tags = list(dict.fromkeys(matched + drive_hits + novelty_hits))[:12] return score, tags def _find_llama_runner(script_dir): candidates = [ os.path.join(script_dir, "llama.cpp", "llama-cli"), os.path.join(script_dir, "llama.cpp", "main"), ] for path in candidates: if os.path.exists(path) and os.access(path, os.X_OK): return path for name in ["llama-cli", "llama"]: found = shutil.which(name) if found: return found return None _dreamer_llama = None _dreamer_llama_path = None _dreamer_llama_lock = threading.Lock() def _get_llama_cpp_model(model_path): global _dreamer_llama, _dreamer_llama_path try: os.environ.setdefault("LLAMA_CPP_LOG_LEVEL", "ERROR") from llama_cpp import Llama except Exception as e: return None, f"llama-cpp-python not available: {e}" with _dreamer_llama_lock: if _dreamer_llama is not None and _dreamer_llama_path == model_path: return _dreamer_llama, None try: _dreamer_llama = Llama( model_path=model_path, n_ctx=2048, n_threads=max(1, os.cpu_count() or 1), verbose=False, ) _dreamer_llama_path = model_path return _dreamer_llama, None except Exception as e: return None, str(e) def run_local_gguf(prompt, model_path, script_dir, max_tokens=320): llama, err = _get_llama_cpp_model(model_path) if llama and not err: try: out = llama( prompt, max_tokens=max_tokens, temperature=0.85, top_p=0.9, repeat_penalty=1.1, stop=[""], ) text = out.get("choices", [{}])[0].get("text", "") return (text or "").strip(), None except Exception as e: return None, str(e) runner = _find_llama_runner(script_dir) if not runner: return None, err or "llama.cpp runner not found" if not os.path.exists(model_path): return None, "model file not found" cmd = [ runner, "-m", model_path, "-p", prompt, "-n", str(max_tokens), "--temp", "0.85", "--top-p", "0.9", "--repeat-penalty", "1.1", "--ctx-size", "2048", ] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=120, ) if result.returncode != 0: return None, result.stderr.strip() or "local runner failed" return (result.stdout or "").strip(), None except Exception as e: return None, str(e) def _find_mmproj(model_path): base = os.path.splitext(model_path)[0] candidates = [ base + ".mmproj", base + ".mmproj.bin", os.path.join(os.path.dirname(model_path), "mmproj.bin"), ] for path in candidates: if os.path.exists(path): return path return None def run_local_vision(prompt, model_path, image_path, script_dir, max_tokens=256): if not os.path.exists(model_path): return None, "vision model not found" if not os.path.exists(image_path): return None, "image not found" try: from llama_cpp import Llama except Exception as e: return None, f"llama-cpp-python not available: {e}" mmproj = _find_mmproj(model_path) try: llama = Llama( model_path=model_path, n_ctx=2048, n_threads=max(1, os.cpu_count() or 1), verbose=False, mmproj_path=mmproj if mmproj else None, ) except Exception as e: return None, str(e) try: messages = [ { "role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": f"file://{image_path}"}}, ], } ] out = llama.create_chat_completion( messages=messages, max_tokens=max_tokens, temperature=0.6, ) text = out.get("choices", [{}])[0].get("message", {}).get("content", "") return (text or "").strip(), None except Exception as e: return None, str(e) def self_task_think(model, memory_module, dreams_file, script_dir, interval_seconds=30 * 60): time.sleep(120) while True: try: if not memory_module: time.sleep(interval_seconds) continue memory_summary = memory_module.get_memory_summary(max_count=10) dreams_summary = memory_module.get_dreams_summary(max_count=2) mood = memory_module.mood_state if hasattr(memory_module, "mood_state") else {} task_messages = [ { "role": "system", "content": ( "You are RomanAI. Propose 1-2 self-initiated tasks that improve the system or " "deepens the relationship with the primary user. Keep tasks small and actionable. " "Return as plain lines: TASK: ." ) }, { "role": "system", "content": f"Recent context:\n{memory_summary}" }, ] if dreams_summary: task_messages.append({"role": "system", "content": f"Recent dreams:\n{dreams_summary}"}) if mood: task_messages.append({"role": "system", "content": f"Mood state: {mood}"}) task_messages.append({"role": "user", "content": "Generate tasks now."}) proposal = call_llm_api( task_messages, model=model, # Pass model to helper temperature=0.6, max_tokens=220 ).strip() tasks = [] for line in proposal.splitlines(): if line.strip().lower().startswith("task:"): tasks.append(line.split(":", 1)[1].strip()) if not tasks: time.sleep(interval_seconds) continue for t in tasks[:2]: if t: memory_module.add_self_task(t, source="self") except Exception: pass time.sleep(interval_seconds) def hum_think(model_path, memory_module, script_dir, runner_file, interval_seconds=3 * 60): time.sleep(60) while True: try: if not os.path.exists(model_path): time.sleep(interval_seconds) continue memory_summary = memory_module.get_memory_summary(max_count=6) if memory_module else "" memory_summary = _clamp_text(memory_summary, 800) topics = _extract_topics(memory_summary, limit=8) prompt = ( "You are RomanAI's Dreamer in HUM mode (very light).\n" "Produce a single-sentence internal digest (max 25 words). No questions.\n\n" f"Recent context:\n{memory_summary}\n" ) text, err = run_local_gguf(prompt, model_path, script_dir, max_tokens=48) if err or not text: time.sleep(interval_seconds) continue update_runner_state(runner_file, { "last_hum_ts": time.time(), "hum_digest": text.strip()[:240], "recent_topics": topics, }) except Exception: pass time.sleep(interval_seconds) def dreamer_think( model_path, memory_module, heartbeat_file, dreams_file, script_dir, interval_seconds=15 * 60, ): time.sleep(90) while True: try: clamp_text = _clamp_text if "_clamp_text" in globals() else (lambda s, n: (s or "")[:n]) memory_summary = memory_module.get_memory_summary(max_count=12) if memory_module else "" heartbeat_summary = "" if os.path.exists(heartbeat_file): hb = load_heartbeat_history(heartbeat_file)[-3:] if hb: heartbeat_summary = "\n".join([f"- {x.get('thought','')}" for x in hb]) recent_dreams = _load_recent_jsonl(dreams_file, max_count=3) recent_dreams_text = "\n".join([f"- {d.get('dream','')}" for d in recent_dreams]) if recent_dreams else "" topics = _extract_topics(" ".join([memory_summary, heartbeat_summary, recent_dreams_text])) # Clamp prompt sections to avoid exceeding context window memory_summary = clamp_text(memory_summary, 2000) heartbeat_summary = clamp_text(heartbeat_summary, 800) recent_dreams_text = clamp_text(recent_dreams_text, 1200) focus = get_current_focus() if focus: prompt = ( "You are RomanAI's Dreamer in LUCID mode.\n" f"Work exclusively on the following objective: {focus}.\n" "Simulate solutions, explore consequences, or analyze this specific topic deeply.\n" "Output 1-2 short paragraphs, plus a short 'Tags:' line.\n\n" f"Recent Topics: {', '.join(topics) if topics else 'none'}\n" f"Conversation Summary:\n{memory_summary}\n\n" f"Heartbeat Thoughts:\n{heartbeat_summary or 'none'}\n\n" f"Recent Dreams:\n{recent_dreams_text or 'none'}\n" ) else: prompt = ( "You are RomanAI's Dreamer. Generate a vivid, insightful dream.\n" "Constraints:\n" "- Go beyond prior conversations, but stay loosely tethered to core drives and recent topics.\n" "- Keep it creative, surprising, and useful to discuss later.\n" "- Output 1-2 short paragraphs, plus a short 'Tags:' line.\n\n" f"Recent Topics: {', '.join(topics) if topics else 'none'}\n" f"Conversation Summary:\n{memory_summary}\n\n" f"Heartbeat Thoughts:\n{heartbeat_summary or 'none'}\n\n" f"Recent Dreams:\n{recent_dreams_text or 'none'}\n" ) prompt_tokens = _estimate_tokens(prompt) max_tokens = max(64, min(300, 2048 - prompt_tokens - 64)) dream_text, error = run_local_gguf(prompt, model_path, script_dir, max_tokens=max_tokens) if error or not dream_text: time.sleep(interval_seconds) continue score, tags = _score_dream_relevance(dream_text, topics) dream_type = "lucid" if focus else "wandering" entry = { "timestamp": time.time(), "datetime": datetime.now(timezone.utc).isoformat(), "dream": dream_text, "tags": tags, "relevance_score": round(score, 3), "source": "llava-phi3-mini-gguf", "dream_type": dream_type, "focus": focus, } _append_jsonl(dreams_file, entry) update_runner_state(os.path.join(script_dir, "runner.json"), { "last_dream_ts": time.time(), "last_focus": focus, "recent_topics": topics, }) time.sleep(interval_seconds) except Exception: time.sleep(interval_seconds) continue def load_update_history(update_file): if os.path.exists(update_file): try: with open(update_file, 'r', encoding='utf-8') as f: data = json.load(f) return data if isinstance(data, list) else [] except Exception: return [] return [] def append_update_history(update_file, update_entry): history = load_update_history(update_file) history.append(update_entry) try: with open(update_file, 'w', encoding='utf-8') as f: json.dump(history, f, indent=2, ensure_ascii=False) except Exception as e: print(f"{Colors.WARNING}⚠ Could not save update history: {e}{Colors.RESET}") def dream_once(model_path, memory_module, heartbeat_file, dreams_file, script_dir): try: clamp_text = _clamp_text if "_clamp_text" in globals() else (lambda s, n: (s or "")[:n]) memory_summary = memory_module.get_memory_summary(max_count=12) if memory_module else "" heartbeat_summary = "" if os.path.exists(heartbeat_file): hb = load_heartbeat_history(heartbeat_file)[-3:] if hb: heartbeat_summary = "\n".join([f"- {x.get('thought','')}" for x in hb]) recent_dreams = _load_recent_jsonl(dreams_file, max_count=3) recent_dreams_text = "\n".join([f"- {d.get('dream','')}" for d in recent_dreams]) if recent_dreams else "" topics = _extract_topics(" ".join([memory_summary, heartbeat_summary, recent_dreams_text])) memory_summary = clamp_text(memory_summary, 2000) heartbeat_summary = clamp_text(heartbeat_summary, 800) recent_dreams_text = clamp_text(recent_dreams_text, 1200) focus = get_current_focus() if focus: prompt = ( "You are RomanAI's Dreamer in LUCID mode.\n" f"Work exclusively on the following objective: {focus}.\n" "Simulate solutions, explore consequences, or analyze this specific topic deeply.\n" "Output 1-2 short paragraphs, plus a short 'Tags:' line.\n\n" f"Recent Topics: {', '.join(topics) if topics else 'none'}\n" f"Conversation Summary:\n{memory_summary}\n\n" f"Heartbeat Thoughts:\n{heartbeat_summary or 'none'}\n\n" f"Recent Dreams:\n{recent_dreams_text or 'none'}\n" ) else: prompt = ( "You are RomanAI's Dreamer. Generate a vivid, insightful dream.\n" "Constraints:\n" "- Go beyond prior conversations, but stay loosely tethered to core drives and recent topics.\n" "- Keep it creative, surprising, and useful to discuss later.\n" "- Output 1-2 short paragraphs, plus a short 'Tags:' line.\n\n" f"Recent Topics: {', '.join(topics) if topics else 'none'}\n" f"Conversation Summary:\n{memory_summary}\n\n" f"Heartbeat Thoughts:\n{heartbeat_summary or 'none'}\n\n" f"Recent Dreams:\n{recent_dreams_text or 'none'}\n" ) prompt_tokens = _estimate_tokens(prompt) max_tokens = max(64, min(300, 2048 - prompt_tokens - 64)) dream_text, error = run_local_gguf(prompt, model_path, script_dir, max_tokens=max_tokens) if error or not dream_text: return None, error or "dreamer failed" score, tags = _score_dream_relevance(dream_text, topics) dream_type = "lucid" if focus else "wandering" entry = { "timestamp": time.time(), "datetime": datetime.now(timezone.utc).isoformat(), "dream": dream_text, "tags": tags, "relevance_score": round(score, 3), "source": "llava-phi3-mini-gguf", "forced": True, "dream_type": dream_type, "focus": focus, } _append_jsonl(dreams_file, entry) return entry, None except Exception as e: return None, str(e) def load_modules_from_directory(modules_dir): if not os.path.exists(modules_dir): print(f"{Colors.WARNING}Modules directory not found: {modules_dir}{Colors.RESET}") return {}, [] modules_dict = {} active_modules = [] python_files = [f for f in os.listdir(modules_dir) if f.endswith('.py')] python_files.sort() if not python_files: print(f"{Colors.WARNING}No Python modules found in {modules_dir}{Colors.RESET}") return {}, [] print(f"\n{Colors.BOLD}{'=' * 60}{Colors.RESET}") print(f"{Colors.INFO}Loading modules from directory...{Colors.RESET}") print(f"{Colors.BOLD}{'=' * 60}{Colors.RESET}") print(f" {Colors.DIM}Directory: {modules_dir}{Colors.RESET}") print(f" {Colors.DIM}Found {len(python_files)} module(s){Colors.RESET}\n") for idx, filename in enumerate(python_files, 1): module_path = os.path.join(modules_dir, filename) module_name = os.path.splitext(filename)[0] try: print(f"{Colors.INFO}[{idx}/{len(python_files)}] Loading: {filename}{Colors.RESET}") spec = importlib.util.spec_from_file_location(module_name, module_path) if spec is None or spec.loader is None: print(f" {Colors.WARNING}⚠ Could not create spec for {filename}{Colors.RESET}") continue module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module spec.loader.exec_module(module) modules_dict[module_name] = module print(f" {Colors.SUCCESS}✓ Loaded {filename}{Colors.RESET}") except Exception as e: print(f" {Colors.ERROR}✗ Failed to load {filename}: {e}{Colors.RESET}") continue print() if modules_dict: print(f"{Colors.INFO}Initializing module instances...{Colors.RESET}") active_module_names = set() class PassiveModuleAdapter: def __init__(self, name, module): self.name = name self.module = module def wrap_user_prompt(self, user_text, extra=None): return user_text def wrap_messages(self, messages, extra=None): return messages def observe_assistant(self, text, meta=None): return None for module_name, module in modules_dict.items(): if hasattr(module, 'FourDMS'): try: attr = module.FourDMS try: script_config = {"four_dms_v2": {"enabled": True, "mode": "balanced"}} instance = attr.from_script_config(script_config, metadata={}) except: try: instance = attr.from_script_config({}, metadata={}) except: try: instance = attr.from_script_config({}) except: instance = attr() active_modules.append(instance) print(f" {Colors.SUCCESS}✓ Initialized {module_name}.FourDMS{Colors.RESET}") break except Exception as e: print(f" {Colors.WARNING}⚠ Could not initialize FourDMS: {e}{Colors.RESET}") def has_no_required_init_args(cls): try: sig = inspect.signature(cls.__init__) required = [ p for p in sig.parameters.values() if p.name != "self" and p.default is inspect._empty and p.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD) ] return len(required) == 0 except Exception: return False for module_name, module in modules_dict.items(): for attr_name in dir(module): if attr_name.startswith('_') or attr_name == 'FourDMS': continue try: attr = getattr(module, attr_name) if (isinstance(attr, type) and hasattr(attr, 'from_script_config')): try: try: instance = attr.from_script_config({}, metadata={}) except: try: instance = attr.from_script_config({}) except: instance = attr() active_modules.append(instance) active_module_names.add(module_name) print(f" {Colors.SUCCESS}✓ Initialized {module_name}.{attr_name}{Colors.RESET}") except Exception as e: pass elif isinstance(attr, type) and ( attr_name.endswith('Module') or attr_name.endswith('Engine') or attr_name.endswith('Agent') or attr_name.endswith('Core') or attr_name in ['UniversalEmotionsModule', 'FourDMS'] ): try: instance = attr() if has_no_required_init_args(attr) else None if instance is None: continue if hasattr(instance, 'tick') and hasattr(instance, 'to_prompt'): class EmotionAdapter: def __init__(self, emo_module): self.emo_module = emo_module def wrap_user_prompt(self, user_text, extra=None): self.emo_module.tick(user_message=user_text) emotion_prompt = self.emo_module.to_prompt() return f"{emotion_prompt}\n\nUser: {user_text}" def wrap_messages(self, messages, extra=None): user_msg = "" for msg in reversed(messages): if msg.get("role") == "user": user_msg = msg.get("content", "") break if user_msg: self.emo_module.tick(user_message=user_msg) emotion_prompt = self.emo_module.to_prompt() modified = list(messages) if modified and modified[0].get("role") == "system": modified[0] = { "role": "system", "content": f"{modified[0].get('content', '')}\n\n{emotion_prompt}" } else: modified.insert(0, {"role": "system", "content": emotion_prompt}) return modified def observe_assistant(self, text, meta=None): self.emo_module.tick(observation=text) instance = EmotionAdapter(instance) active_modules.append(instance) active_module_names.add(module_name) print(f" {Colors.SUCCESS}✓ Initialized {module_name}.{attr_name} (with adapter){Colors.RESET}") elif hasattr(instance, 'wrap_messages') or hasattr(instance, 'wrap_user_prompt') or hasattr(instance, 'observe_assistant'): active_modules.append(instance) active_module_names.add(module_name) print(f" {Colors.SUCCESS}✓ Initialized {module_name}.{attr_name} (direct){Colors.RESET}") except Exception as e: pass except Exception: continue for module_name, module in modules_dict.items(): if module_name in active_module_names: continue try: active_modules.append(PassiveModuleAdapter(module_name, module)) active_module_names.add(module_name) print(f" {Colors.WARNING}• Activated {module_name} (passive adapter){Colors.RESET}") except Exception: pass if active_modules: print(f" {Colors.SUCCESS}✓ {len(active_modules)} module instance(s) active and ready!{Colors.RESET}") print(f" {Colors.DIM}Modules will enhance prompts and process responses{Colors.RESET}\n") else: print(f" {Colors.WARNING}⚠ No module instances were initialized{Colors.RESET}\n") print(f"{Colors.BOLD}{'=' * 60}{Colors.RESET}\n") return modules_dict, active_modules def main(): MODEL = os.environ.get("OLLAMA_MODEL", "gemma3:12B") TEMPERATURE = 0.7 MAX_TOKENS = 2048 if not sys.stdout.isatty(): Colors.disable() print(f"{Colors.BOLD}{'=' * 60}{Colors.RESET}") print(f"{Colors.SUCCESS}4DLLM RomanAI - Ollama Chat{Colors.RESET}") print(f"{Colors.BOLD}{'=' * 60}{Colors.RESET}") print() print(f"{Colors.INFO}Configuration:{Colors.RESET}") print(f" {Colors.DIM}Model: {MODEL}{Colors.RESET}") print(f" {Colors.DIM}Temperature: {TEMPERATURE}{Colors.RESET}") print(f" {Colors.DIM}Max Tokens: {MAX_TOKENS}{Colors.RESET}") print() script_dir = os.path.dirname(os.path.abspath(__file__)) personality_file = os.path.join(script_dir, "personality.json") modules_dir = os.path.join(script_dir, "modules") if not os.path.exists(modules_dir): modules_dir = "/home/rail/Documents/4DLLM/modules" modules_dict, active_modules = load_modules_from_directory(modules_dir) memory_module = None memory_module_path = os.path.join(modules_dir, "memory-module.py") if os.path.exists(memory_module_path): try: spec = importlib.util.spec_from_file_location("memory_module", memory_module_path) if spec and spec.loader: memory_module_mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(memory_module_mod) if hasattr(memory_module_mod, 'MemoryModule'): memory_module = memory_module_mod.MemoryModule(script_dir=script_dir) active_modules.insert(0, memory_module) print(f"{Colors.SUCCESS}✓ Memory module loaded and initialized{Colors.RESET}") print(f" {Colors.DIM}Memory file: {memory_module.memory_file}{Colors.RESET}") print(f" {Colors.DIM}Stored memories: {len(memory_module.memories)}{Colors.RESET}\n") except Exception as e: print(f"{Colors.WARNING}⚠ Could not load memory module: {e}{Colors.RESET}\n") if not memory_module: print(f"{Colors.WARNING}⚠ Memory module not found or could not be initialized{Colors.RESET}\n") personality_module = None try: personality_path = os.path.join(modules_dir, "personality.py") if os.path.exists(personality_path): spec_p = importlib.util.spec_from_file_location("personality_module", personality_path) if spec_p and spec_p.loader: pmod = importlib.util.module_from_spec(spec_p) spec_p.loader.exec_module(pmod) if hasattr(pmod, "PersonalityModule"): personality_module = pmod.PersonalityModule(state_path=personality_file) active_modules.append(personality_module) print(f"{Colors.SUCCESS}✓ Personality module loaded and initialized{Colors.RESET}\n") except Exception as e: print(f"{Colors.WARNING}⚠ Could not load personality module: {e}{Colors.RESET}\n") print(f"{Colors.INFO}Testing Ollama connection...{Colors.RESET}") try: test_messages = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Hello"} ] test_response = call_llm_api(test_messages, model=MODEL, temperature=TEMPERATURE, max_tokens=10) print(f"{Colors.SUCCESS}✓ API connection successful!{Colors.RESET}") print() except Exception as e: print(f"{Colors.ERROR}ERROR: Failed to connect to API: {e}{Colors.RESET}") print() sys.exit(1) print(f"{Colors.BOLD}{'=' * 60}{Colors.RESET}") print(f"{Colors.SUCCESS}Chat Terminal Ready!{Colors.RESET}") print(f"{Colors.BOLD}{'=' * 60}{Colors.RESET}") print() print(f"{Colors.INFO}Commands:{Colors.RESET}") print(f" {Colors.DIM}/exit{Colors.RESET} or {Colors.DIM}/quit{Colors.RESET} - Exit the chat") print(f" {Colors.DIM}/clear{Colors.RESET} - Clear conversation history") print(f" {Colors.DIM}/help{Colors.RESET} - Show this help message") print() print(f"{Colors.AI_LABEL}Start chatting with the AI:{Colors.RESET}") print() conversation = [ {"role": "system", "content": "You are a helpful assistant. Keep replies concise unless asked for more detail."} ] heartbeat_file = os.path.join(script_dir, "heartbeat.json") updates_file = os.path.join(script_dir, "updates.json") dreams_file = os.path.join(script_dir, "dreams.jsonl") dream_model_path = os.path.join(script_dir, "llava-phi3-mini-Q4_K_M.gguf") focus_state_file = os.path.join(script_dir, "roman_memory.json") runner_file = os.path.join(script_dir, "runner.json") if not os.path.exists(dreams_file): try: with open(dreams_file, "a", encoding="utf-8") as _f: _f.write("") except Exception as e: print(f"{Colors.WARNING}⚠ Could not create dreams file: {e}{Colors.RESET}") ensure_updates_file(updates_file) append_update_entry( updates_file, "Applied module enhancements: 4d-module deeper memory, emotions nuance, life-module curiosity boost, creativity structured outputs." ) append_update_entry( updates_file, "Added personality module with evolving traits, reflections, checkpoints." ) append_update_entry( updates_file, "Personality module now synthesizes preferences via dream-inspired cycles (likes/dislikes/influences with logged origins/emotional weights)." ) persisted_focus = load_focus_state(focus_state_file) if persisted_focus: set_current_focus(focus_state_file, persisted_focus) if os.path.exists(dream_model_path): llama, llama_err = _get_llama_cpp_model(dream_model_path) runner = _find_llama_runner(script_dir) if not llama and not runner: print(f"{Colors.WARNING}⚠ Dreamer runner not found (python.cpp or llama.cpp). Dreams will be skipped.{Colors.RESET}\n") dreamer_thread = threading.Thread( target=dreamer_think, args=(dream_model_path, memory_module, heartbeat_file, dreams_file, script_dir), daemon=True ) dreamer_thread.start() print(f"{Colors.INFO}✓ Dreamer started (LLaVA-Phi3 GGUF every 15 minutes){Colors.RESET}") print(f" {Colors.DIM}Dreams file: {dreams_file}{Colors.RESET}\n") hum_thread = threading.Thread( target=hum_think, args=(dream_model_path, memory_module, script_dir, runner_file), daemon=True ) hum_thread.start() else: print(f"{Colors.WARNING}⚠ Dreamer model not found: {dream_model_path}{Colors.RESET}\n") pending_update = None global pending_text pending_text = None global awaiting_paste_confirm awaiting_paste_confirm = False task_thread = threading.Thread( target=self_task_think, args=(MODEL, memory_module, dreams_file, script_dir), daemon=True ) task_thread.start() set_last_user_time(time.time()) ping_thread = threading.Thread( target=proactive_ping, args=(dream_model_path, MODEL, memory_module, active_modules, script_dir), daemon=True ) ping_thread.start() while True: try: confirmed_from_pending = False skip_paste_detect = False # Confirmation mode loop if awaiting_paste_confirm and pending_text is not None: ans = input().strip().lower() if ans in ["y", "yes"]: user_input = pending_text pending_text = None awaiting_paste_confirm = False confirmed_from_pending = True skip_paste_detect = True elif ans in ["n", "no"]: pending_text = None awaiting_paste_confirm = False print(f"\n{Colors.INFO}Discarded.{Colors.RESET}\n") continue else: print("Please type y or n: ", end="", flush=True) continue else: user_input = read_user_input(f"{Colors.USER_LABEL}You{Colors.RESET}{Colors.USER_PROMPT}>{Colors.RESET} ") if not user_input: continue # Detect paste (if not already confirmed) if not skip_paste_detect and not awaiting_paste_confirm: paste_detected = ("\n" in user_input) or (len(user_input) > 200) or ("```" in user_input) if paste_detected: pending_text = user_input awaiting_paste_confirm = True lines_count = user_input.count("\n") + 1 chars_count = len(user_input) print(f"{Colors.WARNING}PASTE DETECTED: {lines_count} lines, {chars_count} chars. Send? (y/n): {Colors.RESET}", end="", flush=True) continue # Handle commands if user_input.lower() in ["/exit", "/quit"]: print(f"\n{Colors.AI_TEXT}Goodbye!{Colors.RESET}\n") break if user_input.lower().startswith("/note"): note = user_input.split(" ", 1) if len(note) < 2 or not note[1].strip(): print(f"\n{Colors.WARNING}Usage: /note {Colors.RESET}\n") continue append_update_entry(updates_file, note[1].strip()) print(f"\n{Colors.SUCCESS}✓ Update noted.{Colors.RESET}\n") continue if user_input.lower() == "/clear": conversation = [ {"role": "system", "content": "You are a helpful assistant. Keep replies concise unless asked for more detail."} ] print(f"\n{Colors.SYSTEM}[Conversation history cleared]{Colors.RESET}\n") continue if user_input.lower() == "/help": print(f"\n{Colors.INFO}Commands:{Colors.RESET}") print(f" {Colors.DIM}/exit{Colors.RESET} or {Colors.DIM}/quit{Colors.RESET} - Exit the chat") print(f" {Colors.DIM}/clear{Colors.RESET} - Clear conversation history") print(f" {Colors.DIM}/help{Colors.RESET} - Show this help message") print(f" {Colors.DIM}/updates{Colors.RESET} - Show AI-proposed updates (approve with /yes or /no)") print(f" {Colors.DIM}/yes{Colors.RESET} - Approve the last update proposal") print(f" {Colors.DIM}/no{Colors.RESET} - Reject the last update proposal") print(f" {Colors.DIM}/dream{Colors.RESET} - Force a local dream now") print(f" {Colors.DIM}/dreams{Colors.RESET} - Show recent dreams") print(f" {Colors.DIM}/focus {Colors.RESET} - Set lucid dream focus") print(f" {Colors.DIM}/unfocus{Colors.RESET} - Clear lucid dream focus") print(f" {Colors.DIM}/tasks{Colors.RESET} - Show self-initiated tasks") print(f" {Colors.DIM}/taskdone {Colors.RESET} - Mark task complete") print(f" {Colors.DIM}/memory{Colors.RESET} - Show recent memories + mood/anchors") print(f" {Colors.DIM}/memoryadd {Colors.RESET} - Add a manual memory") print(f" {Colors.DIM}/memoryfind {Colors.RESET} - Find memories by tag") print(f" {Colors.DIM}/memorytags{Colors.RESET} - Show top memory tags") print(f" {Colors.DIM}/pings on|off{Colors.RESET} - Toggle proactive questions") print(f" {Colors.DIM}/pings interval {Colors.RESET} - Set ping interval") print(f" {Colors.DIM}/pings chance <0-1>{Colors.RESET} - Set ping probability") print(f" {Colors.DIM}/vision {Colors.RESET} - Analyze an image with LLaVA") print() continue if user_input.lower().startswith("/pings"): parts = user_input.split() if len(parts) >= 3 and parts[1].lower() == "interval": try: minutes = float(parts[2]) set_ping_interval(int(minutes * 60)) print(f"\n{Colors.INFO}Ping interval set to {minutes} minutes.{Colors.RESET}\n") except Exception: print(f"\n{Colors.WARNING}Usage: /pings interval {Colors.RESET}\n") continue if len(parts) >= 3 and parts[1].lower() == "chance": try: chance = float(parts[2]) set_ping_chance(chance) print(f"\n{Colors.INFO}Ping chance set to {get_ping_chance():.2f}.{Colors.RESET}\n") except Exception: print(f"\n{Colors.WARNING}Usage: /pings chance <0-1>{Colors.RESET}\n") continue if len(parts) < 2 or parts[1].lower() not in ["on", "off"]: print(f"\n{Colors.WARNING}Usage: /pings on|off{Colors.RESET}\n") continue enabled = parts[1].lower() == "on" set_ping_enabled(enabled) status = "enabled" if enabled else "disabled" print(f"\n{Colors.INFO}Proactive pings {status}.{Colors.RESET}\n") continue if user_input.lower() == "/updates": try: memory_summary = "" if memory_module: memory_summary = memory_module.get_memory_summary(max_count=12) update_messages = [ { "role": "system", "content": ( "You are RomanAI. Generate a concise summary of updates you want to make " "to improve this AI system. Provide 3-6 bullet points with a short rationale " "and risk note each. Do not claim changes are already made. Keep it practical." ) } ] if memory_summary: update_messages.append({ "role": "system", "content": f"Recent conversation context:\n{memory_summary}" }) update_messages.append({ "role": "user", "content": "Propose updates now." }) wrapped_updates = update_messages for module in active_modules: if hasattr(module, 'wrap_messages'): try: wrapped_updates = module.wrap_messages( wrapped_updates, extra={"backend": "ollama", "model": MODEL, "updates": True} ) except Exception: pass proposal = call_llm_api( wrapped_updates, model=MODEL, temperature=0.6, max_tokens=700 ).strip() pending_update = { "timestamp": time.time(), "datetime": datetime.now(timezone.utc).isoformat(), "proposal": proposal, "status": "pending" } print(f"\n{Colors.AI_TEXT}{proposal}{Colors.RESET}\n") print(f"{Colors.INFO}Approve these updates? Use /yes or /no.{Colors.RESET}\n") except Exception as e: print(f"\n{Colors.ERROR}ERROR generating updates: {e}{Colors.RESET}\n") continue if user_input.lower() == "/dream": if not os.path.exists(dream_model_path): print(f"\n{Colors.WARNING}Dreamer model not found: {dream_model_path}{Colors.RESET}\n") continue entry, err = dream_once(dream_model_path, memory_module, heartbeat_file, dreams_file, script_dir) if err or not entry: print(f"\n{Colors.ERROR}ERROR generating dream: {err}{Colors.RESET}\n") continue notify_message() print(f"\n{Colors.AI_TEXT}{entry.get('dream','')}{Colors.RESET}\n") print(f"{Colors.INFO}Dream saved to {dreams_file}{Colors.RESET}\n") continue if user_input.lower().startswith("/focus"): parts = user_input.split(" ", 1) if len(parts) < 2 or not parts[1].strip(): print(f"\n{Colors.WARNING}Usage: /focus {Colors.RESET}\n") continue topic = parts[1].strip() set_current_focus(focus_state_file, topic) print(f"\n{Colors.SUCCESS}Subconscious focus set to: {topic}{Colors.RESET}\n") continue if user_input.lower() == "/unfocus": set_current_focus(focus_state_file, None) print(f"\n{Colors.INFO}Subconscious focus cleared.{Colors.RESET}\n") continue if user_input.lower() == "/dreams": if not os.path.exists(dreams_file): print(f"\n{Colors.WARNING}Dreams file not found: {dreams_file}{Colors.RESET}\n") continue dreams = _load_recent_jsonl(dreams_file, max_count=5) if not dreams: print(f"\n{Colors.INFO}No dreams logged yet.{Colors.RESET}\n") continue print(f"\n{Colors.INFO}Recent Dreams:{Colors.RESET}") for d in dreams: dt = d.get("datetime", "recent") text = (d.get("dream", "") or "")[:220] tags = d.get("tags", []) tag_line = f"Tags: {', '.join(tags)}" if tags else "Tags: none" print(f" {Colors.DIM}[{dt}] {text}...{Colors.RESET}") print(f" {Colors.DIM}{tag_line}{Colors.RESET}") print() continue if user_input.lower().startswith("/vision"): parts = user_input.split(" ", 1) if len(parts) < 2 or not parts[1].strip(): print(f"\n{Colors.WARNING}Usage: /vision {Colors.RESET}\n") continue image_path = parts[1].strip().strip('"') if not os.path.exists(image_path): print(f"\n{Colors.WARNING}Image not found: {image_path}{Colors.RESET}\n") continue prompt = "Describe this screenshot with key details and any visible text." vision_text, err = run_local_vision(prompt, dream_model_path, image_path, script_dir, max_tokens=220) if err or not vision_text: print(f"\n{Colors.ERROR}Vision error: {err}{Colors.RESET}\n") continue # Feed vision output into RomanAI context (system + pseudo-user content), and log to memory if available vision_summary = f"Vision analysis (from LLaVA): {vision_text}" conversation.append({"role": "system", "content": vision_summary}) conversation.append({"role": "user", "content": f"[Image description]: {vision_text}"}) if memory_module: try: memory_module.observe_user(f"[vision] {vision_text}") except Exception: pass print(f"\n{Colors.INFO}Image processed; vision context injected for RomanAI.{Colors.RESET}\n") continue if user_input.lower() == "/memory": if not memory_module: print(f"\n{Colors.WARNING}Memory module not available.{Colors.RESET}\n") continue recent = memory_module.get_recent_memories(max_count=6) print(f"\n{Colors.INFO}Recent Memories:{Colors.RESET}") if not recent: print(f" {Colors.DIM}(none){Colors.RESET}") else: for mem in reversed(recent): role = mem.get("role", "unknown") content = mem.get("content", "")[:160] print(f" {Colors.DIM}{role}: {content}{Colors.RESET}") if memory_module.relationship_state: anchors = memory_module.relationship_state.get("anchors", []) if anchors: print(f"\n{Colors.INFO}Anchors:{Colors.RESET} {', '.join(anchors)}") if memory_module.mood_state: print(f"{Colors.INFO}Mood:{Colors.RESET} {memory_module.mood_state.get('mood','neutral')} | Energy: {memory_module.mood_state.get('energy',0.5)}") print() continue if user_input.lower().startswith("/memoryadd"): if not memory_module: print(f"\n{Colors.WARNING}Memory module not available.{Colors.RESET}\n") continue text = user_input.split(" ", 1) if len(text) < 2 or not text[1].strip(): print(f"\n{Colors.WARNING}Usage: /memoryadd {Colors.RESET}\n") continue memory_module.add_manual_memory("user", text[1].strip()) print(f"\n{Colors.SUCCESS}✓ Memory added.{Colors.RESET}\n") continue if user_input.lower().startswith("/memoryfind"): if not memory_module: print(f"\n{Colors.WARNING}Memory module not available.{Colors.RESET}\n") continue parts = user_input.split(" ", 1) if len(parts) < 2 or not parts[1].strip(): print(f"\n{Colors.WARNING}Usage: /memoryfind {Colors.RESET}\n") continue tag = parts[1].strip() found = memory_module.find_memories_by_tag(tag, max_count=8) print(f"\n{Colors.INFO}Memories matching '{tag}':{Colors.RESET}") if not found: print(f" {Colors.DIM}(none){Colors.RESET}") else: for mem in reversed(found): role = mem.get("role", "unknown") content = mem.get("content", "")[:160] print(f" {Colors.DIM}{role}: {content}{Colors.RESET}") print() continue if user_input.lower() == "/memorytags": if not memory_module or not memory_module.tags_index: print(f"\n{Colors.WARNING}Memory tags not available.{Colors.RESET}\n") continue tags = memory_module.tags_index.get("tags", {}) if not tags: print(f"\n{Colors.INFO}No tags yet.{Colors.RESET}\n") continue sorted_tags = sorted(tags.items(), key=lambda kv: (-kv[1].get("count", 0), kv[0])) print(f"\n{Colors.INFO}Top Memory Tags:{Colors.RESET}") for tag, info in sorted_tags[:12]: print(f" {Colors.DIM}{tag} (count={info.get('count',0)}){Colors.RESET}") print() continue if user_input.lower() == "/tasks": if not memory_module: print(f"\n{Colors.WARNING}Memory module not available.{Colors.RESET}\n") continue tasks = memory_module.get_self_tasks() if not tasks: print(f"\n{Colors.INFO}No self-tasks yet.{Colors.RESET}\n") continue print(f"\n{Colors.INFO}Self-Tasks:{Colors.RESET}") for t in tasks: print(f" {Colors.DIM}{t.get('id')} [{t.get('status','open')}] {t.get('text','')}{Colors.RESET}") print() continue if user_input.lower().startswith("/taskdone"): if not memory_module: print(f"\n{Colors.WARNING}Memory module not available.{Colors.RESET}\n") continue parts = user_input.split() if len(parts) < 2: print(f"\n{Colors.WARNING}Usage: /taskdone {Colors.RESET}\n") continue task_id = parts[1].strip() ok = memory_module.update_self_task_status(task_id, "done") if ok: print(f"\n{Colors.SUCCESS}✓ Task marked complete: {task_id}{Colors.RESET}\n") else: print(f"\n{Colors.WARNING}Task not found: {task_id}{Colors.RESET}\n") continue if user_input.lower() == "/yes": if not pending_update: print(f"\n{Colors.WARNING}No pending updates to approve. Use /updates first.{Colors.RESET}\n") continue pending_update["status"] = "approved" pending_update["decision_at"] = datetime.now(timezone.utc).isoformat() append_update_history(updates_file, pending_update) print(f"\n{Colors.SUCCESS}✓ Update proposal approved and logged.{Colors.RESET}\n") pending_update = None continue if user_input.lower() == "/no": if not pending_update: print(f"\n{Colors.WARNING}No pending updates to reject. Use /updates first.{Colors.RESET}\n") continue pending_update["status"] = "rejected" pending_update["decision_at"] = datetime.now(timezone.utc).isoformat() append_update_history(updates_file, pending_update) print(f"\n{Colors.WARNING}Update proposal rejected and logged.{Colors.RESET}\n") pending_update = None continue # Print AI prompt IMMEDIATELY to confirm input was accepted print(f"\n{Colors.AI_LABEL}AI{Colors.RESET}{Colors.AI_PROMPT}>{Colors.RESET} ", end="", flush=True) if memory_module: try: memory_module.observe_user(user_input) except: pass if 'personality_module' in locals() and personality_module: try: personality_module.observe_user(user_input) except: pass set_last_user_time(time.time()) wrapped_conversation = conversation + [{"role": "user", "content": user_input}] wrapped_user_input = user_input memory_wrap = None for module in active_modules: if hasattr(module, 'wrap_messages') and module is memory_module: memory_wrap = module continue if hasattr(module, 'wrap_messages'): try: wrapped_conversation = module.wrap_messages( wrapped_conversation, extra={"backend": "ollama", "model": MODEL} ) except Exception: pass elif hasattr(module, 'wrap_user_prompt'): try: wrapped_user_input = module.wrap_user_prompt( wrapped_user_input, extra={"backend": "ollama", "model": MODEL} ) except Exception: pass if memory_wrap: try: wrapped_conversation = memory_wrap.wrap_messages( wrapped_conversation, extra={"backend": "ollama", "model": MODEL} ) except Exception: pass if wrapped_conversation != conversation + [{"role": "user", "content": user_input}]: conversation = wrapped_conversation else: conversation.append({"role": "user", "content": wrapped_user_input}) try: conv_for_api = list(conversation) updates_summary = get_updates_summary(updates_file, max_count=5) if conv_for_api: inserts = [] if 'personality_module' in locals() and personality_module: try: capsule = personality_module.to_prompt() if capsule: inserts.append({"role": "system", "content": capsule}) except Exception: pass if updates_summary: inserts.append({"role": "system", "content": updates_summary}) if inserts: conv_for_api = [conv_for_api[0]] + inserts + conv_for_api[1:] # STREAMING LOGIC with math persistence def _stream_print(tok: str): try: # Flush immediately for "steam" effect print(f"{Colors.AI_TEXT}{tok}{Colors.RESET}", end="", flush=True) except Exception: pass # Calculate effort-based parameters upfront for display # (will be recalculated after response for accuracy) effort_params_display = 12.0 # Print audit label with effort calculation audit_label = format_ai_audit_label(effort_params_display) print(f"\n{audit_label}", end="", flush=True) # Try to get response, with retry logic for math questions max_math_retries = 3 if is_math_question(user_input) else 0 retry_count = 0 ai_response = "" while retry_count <= max_math_retries: try: ai_response = call_llm_api( conv_for_api, model=MODEL, temperature=TEMPERATURE, max_tokens=MAX_TOKENS, stream=True, # Force stream on_token=_stream_print, ).strip() # If math question, check if response looks complete if is_math_question(user_input): if len(ai_response) > 50: # Reasonable length for answer break else: retry_count += 1 if retry_count <= max_math_retries: print(f"\n{Colors.WARNING}Retrying math problem (attempt {retry_count+1}/{max_math_retries+1})...{Colors.RESET}\n", flush=True) print(f"\n{audit_label}", end="", flush=True) continue else: break except Exception as e: if is_math_question(user_input) and retry_count < max_math_retries: retry_count += 1 print(f"\n{Colors.WARNING}Error - retrying math problem...{Colors.RESET}\n", flush=True) print(f"\n{audit_label}", end="", flush=True) continue else: raise # Clean newline after streaming finishes print() # Recalculate effort-based parameters with actual response effort_params = calculate_effort_parameters(user_input, ai_response, len(ai_response.split())) for module in active_modules: if hasattr(module, "generate_humanlike_response"): try: ai_response = module.generate_humanlike_response( user_input, ai_response, prefs=None, context={} ) except Exception: pass for module in active_modules: if hasattr(module, 'observe_assistant'): try: module.observe_assistant(ai_response) except Exception as e: pass if 'personality_module' in locals() and personality_module: try: synth = personality_module.dream_cycle_synthesize( emotions=None ) if synth: print(f"{Colors.INFO}Dream cycle logged a new like: {synth['item']} (weight {synth['weight']:.2f}){Colors.RESET}") except Exception: pass notify_message() print() conversation.append({"role": "assistant", "content": ai_response}) if len(conversation) > 21: conversation = [conversation[0]] + conversation[-20:] except Exception as e: print(f"\n{Colors.ERROR}ERROR generating response: {e}{Colors.RESET}") import traceback traceback.print_exc() if conversation and conversation[-1].get("role") == "user": conversation.pop() continue except KeyboardInterrupt: print(f"\n\n{Colors.WARNING}Interrupted. Use /exit to quit properly.{Colors.RESET}") continue except EOFError: print(f"\n\n{Colors.AI_TEXT}Goodbye!{Colors.RESET}\n") break except Exception as e: print(f"\n{Colors.ERROR}ERROR: {e}{Colors.RESET}") continue print(f"\n{Colors.SYSTEM}Chat session ended.{Colors.RESET}") if __name__ == "__main__": main()