#!/usr/bin/env python3 # Copyright Daniel Harding - RomanAILabs # Credits: OpenAI GPT-5.2 Thinking (Nova the Workarounder) # Project: Fractal Hyperparameter Iteration (FHI) + LLM Training Harness # Language: Python 3.12+ # Architecture: RAIL-5D-Singularity (Optimization Layer) """ FHI (Fractal Hyperparameter Iteration) — runnable, stable implementation. This file turns your sketch into a working system: - A small causal Transformer LLM (decoder-only) for demonstration - A Fractal Hyperparameter Iteration optimizer that searches hyperparameters - A training loop that: 1) proposes hyperparams via FHI 2) applies them live (no model rebuild) 3) evaluates quick validation fitness 4) trains for an epoch Default hyperparameter vector interpretation (matches your example [0.5, 1024]): h = [dropout, seq_len] dropout ∈ [0.00, 0.60] seq_len ∈ [64, 1024] (snapped to multiples of 32) You can run: python3 fhi_accelerated.py --epochs 10 --steps-per-epoch 200 --val-steps 30 """ from __future__ import annotations import argparse import math import os import random import time from dataclasses import dataclass from typing import Callable, Iterable, List, Tuple import torch import torch.nn as nn import torch.nn.functional as F # ============================================================================= # CONFIG # ============================================================================= @dataclass class TrainConfig: vocab_size: int = 50_000 max_seq_len: int = 1024 d_model: int = 384 n_heads: int = 6 n_layers: int = 6 ff_mult: int = 4 # base optimizer (can be tuned by FHI if you want) base_lr: float = 1e-3 weight_decay: float = 0.05 grad_clip: float = 1.0 # device device: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # reproducibility seed: int = 1337 CFG = TrainConfig() def set_seed(seed: int) -> None: random.seed(seed) os.environ["PYTHONHASHSEED"] = str(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) def watermark() -> str: return "[RomanAILabs]" # ============================================================================= # DATA (runnable, no external deps) # ============================================================================= class RandomTokenDataset: """ A tiny synthetic dataset for demonstration: - Generates random token sequences - Next-token objective (shifted targets) This is NOT "learning language"; it is a harness to prove training + FHI works. Plug in your real dataset/tokenizer later. """ def __init__(self, vocab_size: int, seq_len: int, steps: int, batch_size: int, device: torch.device): self.vocab_size = vocab_size self.seq_len = seq_len self.steps = steps self.batch_size = batch_size self.device = device def __iter__(self) -> Iterable[torch.Tensor]: for _ in range(self.steps): yield torch.randint( low=0, high=self.vocab_size, size=(self.batch_size, self.seq_len), device=self.device, dtype=torch.long, ) # ============================================================================= # MODEL (decoder-only causal Transformer) # ============================================================================= class CausalSelfAttention(nn.Module): """ Stable causal self-attention (standard). """ def __init__(self, d_model: int, n_heads: int, dropout: float): super().__init__() if d_model % n_heads != 0: raise ValueError("d_model must be divisible by n_heads") self.d_model = d_model self.n_heads = n_heads self.head_dim = d_model // n_heads self.scale = self.head_dim ** -0.5 self.qkv = nn.Linear(d_model, 3 * d_model, bias=True) self.proj = nn.Linear(d_model, d_model, bias=True) self.attn_drop = nn.Dropout(dropout) self.resid_drop = nn.Dropout(dropout) self.register_buffer("_mask_cache", torch.empty(0), persistent=False) self._mask_T = 0 def _causal_mask(self, T: int, device: torch.device) -> torch.Tensor: if self._mask_T != T or self._mask_cache.numel() == 0 or self._mask_cache.device != device: mask = torch.triu(torch.ones((T, T), device=device, dtype=torch.bool), diagonal=1) mask = mask.view(1, 1, T, T) self._mask_cache = mask self._mask_T = T return self._mask_cache def forward(self, x: torch.Tensor) -> torch.Tensor: B, T, C = x.shape qkv = self.qkv(x) q, k, v = qkv.chunk(3, dim=-1) q = q.view(B, T, self.n_heads, self.head_dim).transpose(1, 2) # (B,H,T,Hd) k = k.view(B, T, self.n_heads, self.head_dim).transpose(1, 2) v = v.view(B, T, self.n_heads, self.head_dim).transpose(1, 2) scores = (q @ k.transpose(-2, -1)) * self.scale # (B,H,T,T) scores = scores.masked_fill(self._causal_mask(T, x.device), float("-inf")) attn = F.softmax(scores, dim=-1) attn = self.attn_drop(attn) y = attn @ v # (B,H,T,Hd) y = y.transpose(1, 2).contiguous().view(B, T, C) y = self.resid_drop(self.proj(y)) return y class FeedForward(nn.Module): def __init__(self, d_model: int, ff_mult: int, dropout: float): super().__init__() hidden = d_model * ff_mult self.net = nn.Sequential( nn.Linear(d_model, hidden), nn.GELU(), nn.Dropout(dropout), nn.Linear(hidden, d_model), nn.Dropout(dropout), ) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.net(x) class Block(nn.Module): def __init__(self, d_model: int, n_heads: int, ff_mult: int, dropout: float): super().__init__() self.ln1 = nn.LayerNorm(d_model) self.attn = CausalSelfAttention(d_model, n_heads, dropout) self.ln2 = nn.LayerNorm(d_model) self.ff = FeedForward(d_model, ff_mult, dropout) def forward(self, x: torch.Tensor) -> torch.Tensor: x = x + self.attn(self.ln1(x)) x = x + self.ff(self.ln2(x)) return x class TinyLLM(nn.Module): """ A small decoder-only LM: - token + position embeddings - N transformer blocks - projection to vocab logits """ def __init__(self, vocab_size: int, max_seq_len: int, d_model: int, n_heads: int, n_layers: int, ff_mult: int, dropout: float): super().__init__() self.vocab_size = vocab_size self.max_seq_len = max_seq_len self.d_model = d_model self.tok = nn.Embedding(vocab_size, d_model) self.pos = nn.Embedding(max_seq_len, d_model) self.drop = nn.Dropout(dropout) self.blocks = nn.ModuleList([Block(d_model, n_heads, ff_mult, dropout) for _ in range(n_layers)]) self.ln_f = nn.LayerNorm(d_model) self.head = nn.Linear(d_model, vocab_size, bias=False) self.apply(self._init_weights) @staticmethod def _init_weights(m: nn.Module) -> None: if isinstance(m, nn.Linear): nn.init.normal_(m.weight, mean=0.0, std=0.02) if m.bias is not None: nn.init.zeros_(m.bias) elif isinstance(m, nn.Embedding): nn.init.normal_(m.weight, mean=0.0, std=0.02) def set_dropout(self, p: float) -> None: # Live hyperparam updates without rebuilding the model p = float(max(0.0, min(0.9, p))) for m in self.modules(): if isinstance(m, nn.Dropout): m.p = p def forward(self, idx: torch.Tensor) -> torch.Tensor: B, T = idx.shape if T > self.max_seq_len: raise ValueError(f"seq_len {T} exceeds max_seq_len {self.max_seq_len}") tok = self.tok(idx) # (B,T,D) pos_ids = torch.arange(T, device=idx.device, dtype=torch.long) pos = self.pos(pos_ids)[None, :, :] # (1,T,D) x = self.drop(tok + pos) for blk in self.blocks: x = blk(x) x = self.ln_f(x) logits = self.head(x) # (B,T,V) return logits def next_token_loss(logits: torch.Tensor, idx: torch.Tensor) -> torch.Tensor: B, T, V = logits.shape if T < 2: raise ValueError("Need seq_len >= 2") pred = logits[:, :-1, :].contiguous().view(-1, V) target = idx[:, 1:].contiguous().view(-1) return F.cross_entropy(pred, target) # ============================================================================= # FHI (Fractal Hyperparameter Iteration) # ============================================================================= @dataclass class FHISpec: # candidate generation candidates: int = 16 sigma: float = 0.20 # base noise scale k: float = 1.25 # distance energy factor # "fractal" multi-scale weights scales: Tuple[float, ...] = (1.0, 0.5, 0.25, 0.125) # smoothing of the update momentum: float = 0.35 # bounds for [dropout, seq_len] dropout_min: float = 0.00 dropout_max: float = 0.60 seq_min: int = 64 seq_max: int = 1024 seq_step: int = 32 class FractalHyperparamIteration: """ FHI step: - generate multi-scale "fractal" perturbations around current hyperparam vector - evaluate fitness for candidates - weight candidates by exp(-distance/(1+k*sigma^2)) and fitness - update hyperparams toward the best (and optionally a weighted centroid) """ def __init__(self, spec: FHISpec, device: torch.device): self.spec = spec self.device = device self._vel = torch.zeros((2,), device=device, dtype=torch.float32) def _snap_seq_len(self, seq_len: float) -> int: s = int(round(seq_len / self.spec.seq_step) * self.spec.seq_step) s = max(self.spec.seq_min, min(self.spec.seq_max, s)) return int(s) def _clip(self, h: torch.Tensor) -> torch.Tensor: # h[0]=dropout, h[1]=seq_len d = float(h[0].item()) s = float(h[1].item()) d = max(self.spec.dropout_min, min(self.spec.dropout_max, d)) s = float(self._snap_seq_len(s)) return torch.tensor([d, s], device=self.device, dtype=torch.float32) def _fractal_perturb(self) -> torch.Tensor: # Sum of multi-scale noises (fractal-ish) noise = torch.zeros((2,), device=self.device, dtype=torch.float32) for sc in self.spec.scales: noise = noise + (sc * torch.randn((2,), device=self.device, dtype=torch.float32)) noise = noise / max(1.0, math.sqrt(len(self.spec.scales))) return noise @torch.no_grad() def step(self, h_current: List[float], fitness_fn: Callable[[List[float]], float]) -> List[float]: h0 = torch.tensor(h_current, device=self.device, dtype=torch.float32) h0 = self._clip(h0) cand_params: List[List[float]] = [] cand_scores: List[float] = [] # generate candidates around current point for _ in range(self.spec.candidates): perturb = self._fractal_perturb() * self.spec.sigma h_i = self._clip(h0 + perturb) # distance energy term dist = torch.norm(h_i - h0, p=2).item() F_energy = math.exp(-dist / (1.0 + self.spec.k * (self.spec.sigma ** 2))) # fitness (lower is better) fit = float(fitness_fn([float(h_i[0].item()), float(h_i[1].item())])) # turn into a maximization score: # higher score = better candidate score = F_energy / (1e-8 + fit) cand_params.append([float(h_i[0].item()), float(h_i[1].item())]) cand_scores.append(score) # pick best best_idx = max(range(len(cand_scores)), key=lambda i: cand_scores[i]) best = torch.tensor(cand_params[best_idx], device=self.device, dtype=torch.float32) # weighted centroid (soft update) scores_t = torch.tensor(cand_scores, device=self.device, dtype=torch.float32) w = scores_t / (scores_t.sum() + 1e-9) centroid = torch.sum(w[:, None] * torch.tensor(cand_params, device=self.device, dtype=torch.float32), dim=0) target = 0.7 * best + 0.3 * centroid # momentum update (smooth changes) self._vel = (self.spec.momentum * self._vel) + ((1.0 - self.spec.momentum) * (target - h0)) h1 = self._clip(h0 + self._vel) return [float(h1[0].item()), float(h1[1].item())] # ============================================================================= # TRAIN + EVAL # ============================================================================= def quick_validate_loss( model: TinyLLM, steps: int, batch_size: int, seq_len: int, device: torch.device, ) -> float: model.eval() ds = RandomTokenDataset(CFG.vocab_size, seq_len, steps=steps, batch_size=batch_size, device=device) losses: List[float] = [] with torch.no_grad(): for idx in ds: logits = model(idx) loss = next_token_loss(logits, idx) losses.append(float(loss.item())) return float(sum(losses) / max(1, len(losses))) def train_one_epoch( model: TinyLLM, optimizer: optim.Optimizer, steps: int, batch_size: int, seq_len: int, device: torch.device, use_amp: bool, grad_clip: float, ) -> float: model.train() ds = RandomTokenDataset(CFG.vocab_size, seq_len, steps=steps, batch_size=batch_size, device=device) scaler = torch.cuda.amp.GradScaler(enabled=use_amp) losses: List[float] = [] for idx in ds: optimizer.zero_grad(set_to_none=True) with torch.cuda.amp.autocast(enabled=use_amp): logits = model(idx) loss = next_token_loss(logits, idx) scaler.scale(loss).backward() scaler.unscale_(optimizer) torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip) scaler.step(optimizer) scaler.update() losses.append(float(loss.item())) return float(sum(losses) / max(1, len(losses))) # ============================================================================= # MAIN # ============================================================================= def main() -> None: parser = argparse.ArgumentParser(description="RomanAILabs FHI + LLM training harness") parser.add_argument("--epochs", type=int, default=10) parser.add_argument("--batch-size", type=int, default=32) parser.add_argument("--steps-per-epoch", type=int, default=200) parser.add_argument("--val-steps", type=int, default=30) parser.add_argument("--base-lr", type=float, default=CFG.base_lr) parser.add_argument("--weight-decay", type=float, default=CFG.weight_decay) # initial hyperparams (dropout, seq_len) parser.add_argument("--dropout", type=float, default=0.5) parser.add_argument("--seq-len", type=int, default=1024) # FHI knobs parser.add_argument("--candidates", type=int, default=16) parser.add_argument("--sigma", type=float, default=0.20) parser.add_argument("--k", type=float, default=1.25) parser.add_argument("--momentum", type=float, default=0.35) args = parser.parse_args() set_seed(CFG.seed) device = CFG.device use_amp = (device.type == "cuda") print(f"{watermark()} Device: {device} | AMP: {use_amp}") print(f"{watermark()} Building LLM: d_model={CFG.d_model} heads={CFG.n_heads} layers={CFG.n_layers}") model = TinyLLM( vocab_size=CFG.vocab_size, max_seq_len=CFG.max_seq_len, d_model=CFG.d_model, n_heads=CFG.n_heads, n_layers=CFG.n_layers, ff_mult=CFG.ff_mult, dropout=float(args.dropout), ).to(device) optimizer = torch.optim.AdamW( model.parameters(), lr=float(args.base_lr), weight_decay=float(args.weight_decay), betas=(0.9, 0.95), eps=1e-8, ) spec = FHISpec( candidates=int(args.candidates), sigma=float(args.sigma), k=float(args.k), momentum=float(args.momentum), seq_min=64, seq_max=CFG.max_seq_len, seq_step=32, ) fhi = FractalHyperparamIteration(spec=spec, device=device) # Your original sketch: hyperparameters = [0.5, 1024] hyperparameters: List[float] = [float(args.dropout), float(args.seq_len)] print(f"{watermark()} Initial hyperparameters: dropout={hyperparameters[0]:.3f}, seq_len={int(hyperparameters[1])}") print(f"{watermark()} --- BEGIN FHI OPTIMIZATION + TRAINING ---") t0 = time.time() # Fitness function: quick validation loss (lower is better) def fitness_fn(h: List[float]) -> float: dropout, seq_len = float(h[0]), int(h[1]) model.set_dropout(dropout) # Evaluate quickly (few steps) return quick_validate_loss( model=model, steps=int(args.val_steps), batch_size=int(args.batch_size), seq_len=seq_len, device=device, ) for epoch in range(int(args.epochs)): # 1) FHI proposes improved hyperparams hyperparameters = fhi.step(hyperparameters, fitness_fn=fitness_fn) dropout, seq_len = float(hyperparameters[0]), int(hyperparameters[1]) # 2) Apply live model.set_dropout(dropout) # 3) Quick val snapshot val_loss = quick_validate_loss( model=model, steps=int(args.val_steps), batch_size=int(args.batch_size), seq_len=seq_len, device=device, ) # 4) Train for one epoch under these hyperparams train_loss = train_one_epoch( model=model, optimizer=optimizer, steps=int(args.steps_per_epoch), batch_size=int(args.batch_size), seq_len=seq_len, device=device, use_amp=use_amp, grad_clip=CFG.grad_clip, ) elapsed = time.time() - t0 print( f"{watermark()} [EPOCH {epoch:02d}] " f"dropout={dropout:.3f} seq_len={seq_len:4d} | " f"val={val_loss:.4f} train={train_loss:.4f} | " f"time={elapsed:.1f}s" ) print(f"{watermark()} Hyperparameter Optimization Complete!") print(f"{watermark()} Final hyperparameters: dropout={hyperparameters[0]:.3f}, seq_len={int(hyperparameters[1])}") if __name__ == "__main__": main()