#!/usr/bin/env python3 # Copyright Daniel Harding - RomanAILabs # Credits: OpenAI GPT-5.2 Thinking (Nova the Workarounder) # Project: White Hole Steering — TRUE Overhead Test (EXACT tokens, real steering) # Language: Python 3.12+ # Architecture: RAIL-5D-Singularity (Steering Layer) """ TRUE OVERHEAD TEST — EXACT TOKENS, GUARANTEED STEERING Problem we hit: - In your current setup, llama_cpp.Llama.sample() is NOT calling logits_processor (hook_calls=0, avg_s=0.0000). So "exact tokens" tests were not testing steering. Solution: - Use llama.cpp for eval (the expensive part) - Read last-step logits from the model (scores) - Apply White Hole steering in Python - Sample next token in Python (top-k/top-p/temp/repeat penalty) - Feed the token back via llm.eval([token]) - Repeat for EXACT N tokens What you get: - hook_calls ~ N - avg_s non-zero (if steering is active) - timing breakdown: eval_time (llama.cpp work) steer_time (your formula + shaping) sample_time (Python sampling) total_time Install: pip install -U llama-cpp-python numpy Run: python3 ~/Documents/FHI-Accelerated/true-overhead-test.py \ --model "/home/rail/Desktop/quantum/dolphin-2.9-llama3-8b-Q4_K_M.gguf" \ --prompt "Write a short cyberpunk paragraph about a lighthouse AI." \ --n-predict 192 --n-ctx 4096 --threads 3 --gpu-layers 0 \ --trials 5 --debug-hook """ from __future__ import annotations import argparse import os import sys import time from dataclasses import dataclass from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union import numpy as np try: from llama_cpp import Llama except Exception: print("\n[CRITICAL] Missing dependency: llama-cpp-python") print("Install with: pip install -U llama-cpp-python numpy\n") raise # ============================================================================= # Pretty / helpers # ============================================================================= def W() -> str: return "[RomanAILabs]" def hr(title: str) -> None: print("\n" + "=" * 92) print(title) print("=" * 92) def safe_div(a: float, b: float) -> float: return a / b if b != 0.0 else float("inf") def pct(frac: float) -> str: return f"{frac * 100.0:+.2f}%" def clamp(x: float, lo: float, hi: float) -> float: return lo if x < lo else hi if x > hi else x # ============================================================================= # White Hole simplified formula (stable) # ============================================================================= ArrayLike = Union[float, int, np.ndarray] def H(z: np.ndarray) -> np.ndarray: return np.tanh(z) def phi(z: np.ndarray) -> np.ndarray: return np.log1p(np.abs(z)) * np.sign(z) def G(u: np.ndarray) -> np.ndarray: return 1.0 / (1.0 + np.exp(-u)) def f_i(i: np.ndarray, alpha: float) -> np.ndarray: return 1.0 / np.power(i + 1.0, alpha) def K_i(i: np.ndarray, beta: float) -> np.ndarray: return 1.0 / np.power(i + 1.0, beta) def default_T(x: ArrayLike, i: ArrayLike) -> np.ndarray: x_arr = np.asarray(x, dtype=np.float64) if x_arr.ndim == 0: i_arr = np.asarray(i, dtype=np.float64) return x_arr * (1.0 + 0.1 * i_arr) i_arr = np.asarray(i, dtype=np.float64) d = x_arr.shape[-1] idx = np.arange(d, dtype=np.float64) basis = np.sin((i_arr[..., None] + 1.0) * (idx[None, :] + 1.0) * 0.01) return np.sum(x_arr[None, :] * basis, axis=-1) @dataclass(frozen=True) class FormulaConfig: N: int = 512 a1: float = 1.0 alpha: float = 1.35 beta: float = 0.85 expo_clip: float = 60.0 inner_clip: float = 60.0 def F_whitehole_simplified( x: ArrayLike, y: ArrayLike, T: Callable[[ArrayLike, ArrayLike], np.ndarray] = default_T, cfg: FormulaConfig = FormulaConfig(), ) -> float: i = np.arange(1, cfg.N + 1, dtype=np.float64) fi = f_i(i, alpha=cfg.alpha) Ki = K_i(i, beta=cfg.beta) Tx = T(x, i) Ty0 = T(y, 0.0) expo_arg = -i * Tx * Ty0 expo_arg = np.clip(expo_arg, -cfg.expo_clip, cfg.expo_clip) expo = np.exp(expo_arg) inner = fi * Ki * Tx * Ty0 * expo inner = np.clip(inner, -cfg.inner_clip, cfg.inner_clip) s = np.sum(cfg.a1 * fi * G(inner)) out = H(phi(np.asarray(s))) return float(out) # ============================================================================= # Steering / logit shaping # ============================================================================= @dataclass(frozen=True) class SteeringConfig: enabled: bool = True strength: float = 2.0 temp_boost: float = 0.15 window: int = 32 y_scale: float = 1.0 y_bias: float = -0.2 x_const: float = 0.7 vocab_norm: float = 50_000.0 def softmax_np(logits: np.ndarray) -> np.ndarray: x = logits.astype(np.float64, copy=False) x = x - np.max(x) e = np.exp(x) return e / (np.sum(e) + 1e-12) def compute_y_from_tokens( tokens: Union[Sequence[int], np.ndarray, None], window: int, y_scale: float, y_bias: float, vocab_norm: float, ) -> float: if tokens is None: return float(y_bias) try: n = len(tokens) except Exception: return float(y_bias) if n == 0: return float(y_bias) w = tokens[-max(1, window):] arr = np.asarray(w, dtype=np.float64) if arr.size == 0: return float(y_bias) avg = float(arr.mean()) y = (avg / float(vocab_norm)) * float(y_scale) + float(y_bias) return float(y) def apply_whitehole_steering_to_logits( logits: np.ndarray, token_history: Sequence[int], fcfg: FormulaConfig, scfg: SteeringConfig, ) -> Tuple[np.ndarray, float]: if not scfg.enabled: return logits.astype(np.float32, copy=False), 0.0 y = compute_y_from_tokens( tokens=token_history, window=scfg.window, y_scale=scfg.y_scale, y_bias=scfg.y_bias, vocab_norm=scfg.vocab_norm, ) s = F_whitehole_simplified(x=scfg.x_const, y=y, cfg=fcfg) s = clamp(s, 0.0, 1.0) probs = softmax_np(logits) out = logits.astype(np.float64, copy=True) # "push away from the most obvious next tokens" out = out - (scfg.strength * s) * probs # small temp effect denom = (1.0 + (s * scfg.temp_boost)) out = out / max(1e-6, denom) return out.astype(np.float32, copy=False), float(s) # ============================================================================= # Python sampling (top-k, top-p, temp, repeat penalty) # ============================================================================= def apply_repeat_penalty(logits: np.ndarray, history: Sequence[int], penalty: float, window: int = 128) -> np.ndarray: if penalty <= 1.0: return logits out = logits.astype(np.float32, copy=True) recent = history[-max(1, window):] for t in set(recent): if 0 <= t < out.shape[0]: if out[t] > 0: out[t] /= penalty else: out[t] *= penalty return out def sample_top_k_top_p( logits: np.ndarray, rng: np.random.Generator, temperature: float, top_k: int, top_p: float, ) -> int: x = logits.astype(np.float64, copy=False) # temperature temp = max(1e-6, float(temperature)) x = x / temp # top-k if top_k and top_k > 0 and top_k < x.shape[0]: idx = np.argpartition(-x, top_k)[:top_k] x2 = x[idx] probs = softmax_np(x2) return int(idx[rng.choice(len(idx), p=probs)]) # top-p (nucleus) probs_full = softmax_np(x) if top_p and 0.0 < top_p < 1.0: order = np.argsort(-probs_full) cumsum = np.cumsum(probs_full[order]) cut = np.searchsorted(cumsum, top_p, side="left") + 1 keep = order[:cut] probs = probs_full[keep] probs = probs / (probs.sum() + 1e-12) return int(keep[rng.choice(len(keep), p=probs)]) # plain sampling return int(rng.choice(len(probs_full), p=probs_full)) # ============================================================================= # Logits access (version-tolerant) # ============================================================================= def get_last_logits(llm: Llama) -> np.ndarray: """ Try common llama-cpp-python internals to fetch current logits. We need logits after llm.eval(). """ # New-ish versions keep a numpy array at llm.scores or llm._scores if hasattr(llm, "scores"): sc = getattr(llm, "scores") # scores may be property returning np.ndarray if isinstance(sc, np.ndarray): return sc[-1].copy() if sc.ndim == 2 else sc.copy() if hasattr(llm, "_scores"): sc = getattr(llm, "_scores") if isinstance(sc, np.ndarray): return sc[-1].copy() if sc.ndim == 2 else sc.copy() raise RuntimeError( "Cannot access logits (scores) from your llama-cpp-python object.\n" "Try upgrading: pip install -U llama-cpp-python\n" "Or run the older create_completion steering bench instead." ) # ============================================================================= # Exact generation loop with guaranteed steering # ============================================================================= @dataclass class RunTimings: eval_time: float = 0.0 steer_time: float = 0.0 sample_time: float = 0.0 total_time: float = 0.0 def exact_generate_python_sampler( llm: Llama, prompt: str, n_predict: int, rng: np.random.Generator, temperature: float, top_k: int, top_p: float, repeat_penalty: float, fcfg: FormulaConfig, scfg: SteeringConfig, steering_enabled: bool, debug: bool = False, ) -> Tuple[List[int], float, int, RunTimings, List[float]]: """ Returns: tokens, avg_s, hook_calls, timings, s_values """ llm.reset() prompt_tokens = list(llm.tokenize(prompt.encode("utf-8", errors="ignore"))) if len(prompt_tokens) == 0: prompt_tokens = list(llm.tokenize(b" ")) timings = RunTimings() t0 = time.perf_counter() t_eval0 = time.perf_counter() llm.eval(prompt_tokens) timings.eval_time += (time.perf_counter() - t_eval0) generated: List[int] = [] s_values: List[float] = [] hook_calls = 0 for _ in range(int(n_predict)): # read logits after eval logits = get_last_logits(llm) # repeat penalty (Python-side) t_samp0 = time.perf_counter() logits_rp = apply_repeat_penalty(logits, prompt_tokens + generated, float(repeat_penalty), window=128) # steering (Python-side) if steering_enabled: t_steer0 = time.perf_counter() logits_shaped, s = apply_whitehole_steering_to_logits( logits_rp, token_history=(prompt_tokens + generated), fcfg=fcfg, scfg=scfg, ) timings.steer_time += (time.perf_counter() - t_steer0) hook_calls += 1 s_values.append(float(s)) logits_use = logits_shaped else: logits_use = logits_rp # sample next token tok = sample_top_k_top_p( logits=logits_use, rng=rng, temperature=float(temperature), top_k=int(top_k), top_p=float(top_p), ) timings.sample_time += (time.perf_counter() - t_samp0) generated.append(int(tok)) # eval chosen token t_eval1 = time.perf_counter() llm.eval([int(tok)]) timings.eval_time += (time.perf_counter() - t_eval1) timings.total_time = time.perf_counter() - t0 avg_s = float(np.mean(np.array(s_values, dtype=np.float64))) if s_values else 0.0 if debug: print(f"{W()} debug: hook_calls={hook_calls}, avg_s={avg_s:.4f}, s_values={len(s_values)}") return generated, avg_s, hook_calls, timings, s_values # ============================================================================= # Main # ============================================================================= def main() -> None: ap = argparse.ArgumentParser(description="RomanAILabs: TRUE overhead test (EXACT tokens, guaranteed steering)") ap.add_argument("--model", type=str, required=True) ap.add_argument("--prompt", type=str, default="Write a short cyberpunk paragraph about a lighthouse AI.") ap.add_argument("--n-predict", type=int, default=192) ap.add_argument("--n-ctx", type=int, default=4096) ap.add_argument("--threads", type=int, default=max(1, (os.cpu_count() or 4) - 1)) ap.add_argument("--threads-batch", type=int, default=0) ap.add_argument("--gpu-layers", type=int, default=0) ap.add_argument("--seed", type=int, default=1337) ap.add_argument("--trials", type=int, default=5) # sampling ap.add_argument("--temp", type=float, default=0.75) ap.add_argument("--top-k", type=int, default=40) ap.add_argument("--top-p", type=float, default=0.95) ap.add_argument("--repeat-penalty", type=float, default=1.10) # formula ap.add_argument("--F-N", type=int, default=512) ap.add_argument("--F-alpha", type=float, default=1.35) ap.add_argument("--F-beta", type=float, default=0.85) ap.add_argument("--F-a1", type=float, default=1.0) # steering ap.add_argument("--strength", type=float, default=2.0) ap.add_argument("--temp-boost", type=float, default=0.15) ap.add_argument("--window", type=int, default=32) ap.add_argument("--y-scale", type=float, default=1.0) ap.add_argument("--y-bias", type=float, default=-0.2) ap.add_argument("--x-const", type=float, default=0.7) ap.add_argument("--vocab-norm", type=float, default=50_000.0) ap.add_argument("--debug-hook", action="store_true") ap.add_argument("--verbose", action="store_true") args = ap.parse_args() model_path = os.path.abspath(os.path.expanduser(args.model)) if not os.path.exists(model_path): print(f"{W()} [CRITICAL] Model not found: {model_path}") sys.exit(2) fcfg = FormulaConfig(N=int(args.F_N), a1=float(args.F_a1), alpha=float(args.F_alpha), beta=float(args.F_beta)) scfg = SteeringConfig( enabled=True, strength=float(args.strength), temp_boost=float(args.temp_boost), window=int(args.window), y_scale=float(args.y_scale), y_bias=float(args.y_bias), x_const=float(args.x_const), vocab_norm=float(args.vocab_norm), ) hr("RomanAILabs — TRUE Overhead Test (EXACT tokens, guaranteed steering)") print(f"{W()} Model: {model_path}") print(f"{W()} n_predict={args.n_predict} n_ctx={args.n_ctx}") print(f"{W()} threads={args.threads} threads_batch={args.threads_batch} gpu_layers={args.gpu_layers}") print(f"{W()} sampling: temp={args.temp} top_k={args.top_k} top_p={args.top_p} repeat_penalty={args.repeat_penalty}") print(f"{W()} formula: N={fcfg.N} alpha={fcfg.alpha} beta={fcfg.beta} a1={fcfg.a1}") print(f"{W()} steering: strength={scfg.strength} temp_boost={scfg.temp_boost} window={scfg.window} y_bias={scfg.y_bias} x_const={scfg.x_const}") print(f"{W()} trials={args.trials} (seed base={args.seed})") hr("1) Load model") t_load0 = time.perf_counter() llm = Llama( model_path=model_path, n_ctx=int(args.n_ctx), n_threads=int(args.threads), n_threads_batch=int(args.threads_batch) if int(args.threads_batch) > 0 else None, n_gpu_layers=int(args.gpu_layers), verbose=bool(args.verbose), ) print(f"{W()} Load time: {time.perf_counter() - t_load0:.2f} sec") hr("2) Warmup") _ = llm.create_completion(prompt="Hello", max_tokens=8, temperature=0.8, top_k=40, top_p=0.95, seed=int(args.seed)) print(f"{W()} Warmup complete.") hr("3) Trials (baseline vs steered, EXACT tokens)") base_total: List[float] = [] steer_total: List[float] = [] base_eval: List[float] = [] steer_eval: List[float] = [] base_sample: List[float] = [] steer_sample: List[float] = [] steer_steer: List[float] = [] base_hook: List[int] = [] steer_hook: List[int] = [] avg_s_list: List[float] = [] for t in range(int(args.trials)): trial_seed = int(args.seed) + t rng_base = np.random.default_rng(trial_seed) rng_steer = np.random.default_rng(trial_seed) # Baseline (no steering) — still Python sampler so it's apples-to-apples toks_b, avg_s_b, hook_b, tim_b, _ = exact_generate_python_sampler( llm=llm, prompt=args.prompt, n_predict=int(args.n_predict), rng=rng_base, temperature=float(args.temp), top_k=int(args.top_k), top_p=float(args.top_p), repeat_penalty=float(args.repeat_penalty), fcfg=fcfg, scfg=scfg, steering_enabled=False, debug=bool(args.debug_hook) and t == 0, ) # Steered toks_s, avg_s_s, hook_s, tim_s, svals = exact_generate_python_sampler( llm=llm, prompt=args.prompt, n_predict=int(args.n_predict), rng=rng_steer, temperature=float(args.temp), top_k=int(args.top_k), top_p=float(args.top_p), repeat_penalty=float(args.repeat_penalty), fcfg=fcfg, scfg=scfg, steering_enabled=True, debug=bool(args.debug_hook) and t == 0, ) if len(toks_b) != len(toks_s): print(f"{W()} [CRITICAL] Token count mismatch: base={len(toks_b)} steer={len(toks_s)}") sys.exit(4) # generation-only time here = total_time - prompt eval? (we measure eval_time separately) # We'll just use total_time for overhead and also show eval/steer/sample breakdown. ov_total = safe_div(tim_s.total_time - tim_b.total_time, tim_b.total_time) base_tps = safe_div(len(toks_b), tim_b.total_time) steer_tps = safe_div(len(toks_s), tim_s.total_time) base_total.append(tim_b.total_time) steer_total.append(tim_s.total_time) base_eval.append(tim_b.eval_time) steer_eval.append(tim_s.eval_time) base_sample.append(tim_b.sample_time) steer_sample.append(tim_s.sample_time) steer_steer.append(tim_s.steer_time) base_hook.append(hook_b) steer_hook.append(hook_s) avg_s_list.append(avg_s_s) print( f"{W()} Trial {t+1:02d}/{int(args.trials)} | " f"base_total={tim_b.total_time:.3f}s ({base_tps:.2f} tok/s) | " f"steer_total={tim_s.total_time:.3f}s ({steer_tps:.2f} tok/s) | " f"TRUE overhead(total)={pct(ov_total)} | avg_s={avg_s_s:.4f} | hook_calls={hook_s}" ) if t == 0 and bool(args.debug_hook): hr("Debug (trial 1 steering)") print(f"{W()} hook_calls={hook_s} (should be {int(args.n_predict)})") if svals: print(f"{W()} first_s={svals[:10]}") print(f"{W()} s_min={min(svals):.4f} s_max={max(svals):.4f}") print(f"{W()} timing breakdown steered:") print(f" eval_time: {tim_s.eval_time:.3f}s (llama.cpp compute)") print(f" steer_time: {tim_s.steer_time:.3f}s (your formula)") print(f" sample_time: {tim_s.sample_time:.3f}s (python sampling)") hr("4) Summary (averages)") btot = float(np.mean(np.array(base_total))) stot = float(np.mean(np.array(steer_total))) ov = safe_div(stot - btot, btot) beval = float(np.mean(np.array(base_eval))) seval = float(np.mean(np.array(steer_eval))) bsample = float(np.mean(np.array(base_sample))) ssample = float(np.mean(np.array(steer_sample))) ssteer = float(np.mean(np.array(steer_steer))) avg_s = float(np.mean(np.array(avg_s_list))) if avg_s_list else 0.0 hook_avg = float(np.mean(np.array(steer_hook))) if steer_hook else 0.0 print(f"{W()} Tokens per run: {int(args.n_predict)} (EXACT)") print(f"{W()} Baseline avg total: {btot:.3f}s | {safe_div(int(args.n_predict), btot):.2f} tok/s") print(f"{W()} Steered avg total: {stot:.3f}s | {safe_div(int(args.n_predict), stot):.2f} tok/s") print(f"{W()} TRUE overhead (total): {pct(ov)}") print(f"{W()} avg_s across trials: {avg_s:.4f}") print(f"{W()} avg hook_calls/trial: {hook_avg:.1f}") print(f"{W()} Breakdown (avg):") print(f" Baseline eval_time: {beval:.3f}s") print(f" Baseline sample_time: {bsample:.3f}s") print(f" Steered eval_time: {seval:.3f}s") print(f" Steered steer_time: {ssteer:.3f}s <-- this is your math cost") print(f" Steered sample_time: {ssample:.3f}s") if hook_avg < 1.0: hr("[CRITICAL] Hook is not running (unexpected in python sampler)") print(f"{W()} If this happens, something is very wrong — paste the Debug block.") hr("Done") print(f"{W()} Note: This measures TRUE overhead with Python-side sampling.") print(f"{W()} For lowest overhead in production, port steering into llama.cpp as a custom sampler.") if __name__ == "__main__": main()