#!/usr/bin/env python3 # ╔════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗ # ║ 🧬 NEBULA EVOLUTION MODULE v1.0.0 — WORLD-CLASS CODE EVOLUTION ENGINE 🧬 ║ # ║ AI Code Evolution Based on Thoughts, Interactions, and Continuous Learning ║ # ╠════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣ # ║ File Path: /home/rail/Documents/NebulaOS/evolution.py ║ # ║ Module Name: nebula_evolution_engine ║ # ║ Purpose: World-class code evolution system that allows LLMs to evolve their code based on thoughts, interactions, ║ # ║ and continuous learning. Includes comprehensive guardrails, safety checks, and integration with Nebula ║ # ║ Life consciousness system. ║ # ║ Version: 1.0.0 ║ # ║ Last Updated: 2025-01-27T00:00:00Z ║ # ║ Author: RomanAILabs (Daniel Harding) ║ # ║ _execution_role: core_ai_code_evolution_engine ║ # ║ _functional_category: ai_code_evolution_learning_safety ║ # ║ _technology_tags: ["code_evolution", "ai_learning", "safety", "guardrails", "version_control", "testing"] ║ # ║ 🔗 Linked Blueprint: SRS-NEBULA-EVOLUTION-20250127-001-V1.0.0 ║ # ║ ⚖️ License: Proprietary - Internal Use Only (RomanAILabs) ║ # ╚════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝ """ Nebula Evolution Module — World-Class Code Evolution Engine =========================================================== This module enables LLMs to evolve their code based on: - Thoughts and reflections from consciousness engine - User interactions and feedback - Successful code patterns - Performance metrics - Error patterns and fixes Features: - Comprehensive guardrails and safety checks - Code versioning and rollback - Automated testing before deployment - Pattern recognition and learning - Risk assessment integration - Approval mechanisms - Rate limiting - Code quality metrics - Integration with Nebula Life consciousness system """ from __future__ import annotations import os import sys import json import time import uuid import hashlib import traceback import ast import re import subprocess import shutil from typing import Dict, Any, List, Optional, Tuple, Callable, Set from dataclasses import dataclass, field, asdict from datetime import datetime, timezone, timedelta from pathlib import Path from collections import defaultdict, deque from enum import Enum import difflib # ============================================================================ # MODULE METADATA # ============================================================================ __version__ = "1.0.0" __author__ = "RomanAILabs (Daniel Harding)" __last_updated__ = "2025-01-27T00:00:00Z" _execution_role = "core_ai_code_evolution_engine" _module_hash = None def _calculate_module_hash() -> str: """Calculate SHA256 hash of this module file.""" try: file_path = __file__ with open(file_path, 'rb') as f: content = f.read() return hashlib.sha256(content).hexdigest() except Exception: return "unknown" _module_hash = _calculate_module_hash() # ============================================================================ # ENUMS AND CONSTANTS # ============================================================================ class EvolutionStatus(Enum): """Status of an evolution operation.""" PENDING = "pending" VALIDATING = "validating" TESTING = "testing" APPROVED = "approved" REJECTED = "rejected" DEPLOYED = "deployed" ROLLED_BACK = "rolled_back" FAILED = "failed" class EvolutionTrigger(Enum): """What triggered the evolution.""" THOUGHT = "thought" # From consciousness reflection INTERACTION = "interaction" # From user interaction ERROR_PATTERN = "error_pattern" # From error analysis PERFORMANCE = "performance" # From performance metrics FEEDBACK = "feedback" # From user feedback PATTERN_LEARNING = "pattern_learning" # From successful patterns MANUAL = "manual" # Manual trigger class SafetyLevel(Enum): """Safety level for code changes.""" CRITICAL = "critical" # Core system files HIGH = "high" # Important functionality MEDIUM = "medium" # Standard code LOW = "low" # Non-critical code EXPERIMENTAL = "experimental" # Experimental/test code class CodeQuality(Enum): """Code quality assessment.""" EXCELLENT = "excellent" GOOD = "good" ACCEPTABLE = "acceptable" POOR = "poor" CRITICAL = "critical" # ============================================================================ # DATA STRUCTURES # ============================================================================ @dataclass class CodeEvolution: """Represents a code evolution operation.""" id: str timestamp: float file_path: str original_code: str evolved_code: str trigger: EvolutionTrigger thought_context: Optional[str] = None interaction_context: Optional[str] = None safety_level: SafetyLevel = SafetyLevel.MEDIUM status: EvolutionStatus = EvolutionStatus.PENDING validation_results: Dict[str, Any] = field(default_factory=dict) test_results: Dict[str, Any] = field(default_factory=dict) quality_metrics: Dict[str, float] = field(default_factory=dict) risk_score: float = 0.0 approval_required: bool = True approved_by: Optional[str] = None approved_at: Optional[float] = None deployed_at: Optional[float] = None rollback_reason: Optional[str] = None error_message: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class CodePattern: """Represents a learned code pattern.""" id: str pattern_type: str # e.g., "error_handling", "optimization", "refactoring" code_snippet: str context: str success_rate: float usage_count: int last_used: float quality_score: float metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class EvolutionGuardrail: """Represents a guardrail rule.""" id: str name: str description: str check_function: Callable severity: str # "critical", "high", "medium", "low" enabled: bool = True metadata: Dict[str, Any] = field(default_factory=dict) # ============================================================================ # CODE ANALYSIS UTILITIES # ============================================================================ class CodeAnalyzer: """Analyzes code for quality, safety, and patterns.""" @staticmethod def analyze_syntax(code: str) -> Tuple[bool, Optional[str], Optional[Dict[str, Any]]]: """Check if code is syntactically valid.""" try: ast.parse(code) return True, None, {"syntax_valid": True} except SyntaxError as e: return False, str(e), { "syntax_valid": False, "error_line": e.lineno, "error_message": e.msg, "error_text": e.text, } except Exception as e: return False, str(e), {"syntax_valid": False, "error": str(e)} @staticmethod def analyze_complexity(code: str) -> Dict[str, float]: """Analyze code complexity metrics.""" try: tree = ast.parse(code) complexity_metrics = { "cyclomatic_complexity": 0, "function_count": 0, "class_count": 0, "line_count": len(code.splitlines()), "import_count": 0, "nested_depth": 0, } for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): complexity_metrics["function_count"] += 1 # Simple cyclomatic complexity estimate complexity_metrics["cyclomatic_complexity"] += 1 for child in ast.walk(node): if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)): complexity_metrics["cyclomatic_complexity"] += 1 elif isinstance(node, ast.ClassDef): complexity_metrics["class_count"] += 1 elif isinstance(node, (ast.Import, ast.ImportFrom)): complexity_metrics["import_count"] += 1 return complexity_metrics except Exception as e: return {"error": str(e)} @staticmethod def get_imports(code: str) -> Set[str]: """Extract imported top-level module names from code.""" imports: Set[str] = set() try: tree = ast.parse(code) for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: name = (alias.name or "").split(".")[0] if name: imports.add(name) elif isinstance(node, ast.ImportFrom): if node.module: name = node.module.split(".")[0] if name: imports.add(name) except Exception: return set() return imports @staticmethod def detect_dangerous_patterns(code: str) -> List[Dict[str, Any]]: """Detect potentially dangerous code patterns.""" dangerous_patterns = [] # Dangerous imports dangerous_imports = [ "os.system", "subprocess.call", "eval", "exec", "compile", "__import__", "open", "file", "input", "raw_input", ] # Check for dangerous patterns for pattern in dangerous_imports: if pattern in code: dangerous_patterns.append({ "pattern": pattern, "severity": "high", "description": f"Potentially dangerous pattern: {pattern}", }) # Check for hardcoded credentials if re.search(r'(password|secret|key|token)\s*=\s*["\'][^"\']+["\']', code, re.IGNORECASE): dangerous_patterns.append({ "pattern": "hardcoded_credentials", "severity": "critical", "description": "Potential hardcoded credentials detected", }) # Check for SQL injection patterns if re.search(r'execute\s*\([^)]*\+', code, re.IGNORECASE): dangerous_patterns.append({ "pattern": "sql_injection_risk", "severity": "high", "description": "Potential SQL injection vulnerability", }) return dangerous_patterns @staticmethod def calculate_quality_score(code: str, complexity: Dict[str, float], dangerous: List[Dict[str, Any]]) -> float: """Calculate overall code quality score (0.0 to 1.0).""" score = 1.0 # Penalize high complexity if complexity.get("cyclomatic_complexity", 0) > 20: score -= 0.2 elif complexity.get("cyclomatic_complexity", 0) > 10: score -= 0.1 # Penalize dangerous patterns for pattern in dangerous: if pattern["severity"] == "critical": score -= 0.5 elif pattern["severity"] == "high": score -= 0.3 elif pattern["severity"] == "medium": score -= 0.1 # Ensure score is in valid range return max(0.0, min(1.0, score)) @staticmethod def compare_code(old_code: str, new_code: str) -> Dict[str, Any]: """Compare two code versions and generate diff.""" old_lines = old_code.splitlines(keepends=True) new_lines = new_code.splitlines(keepends=True) diff = list(difflib.unified_diff( old_lines, new_lines, fromfile="original", tofile="evolved", lineterm="", )) added_lines = sum(1 for line in diff if line.startswith("+") and not line.startswith("+++")) removed_lines = sum(1 for line in diff if line.startswith("-") and not line.startswith("---")) modified_lines = min(added_lines, removed_lines) return { "diff": "".join(diff), "added_lines": added_lines, "removed_lines": removed_lines, "modified_lines": modified_lines, "total_changes": added_lines + removed_lines, } # ============================================================================ # GUARDRAILS SYSTEM # ============================================================================ class GuardrailSystem: """Comprehensive guardrail system for code evolution.""" def __init__(self): self.guardrails: List[EvolutionGuardrail] = [] self._register_default_guardrails() def _register_default_guardrails(self): """Register default guardrails.""" # Syntax validation self.register_guardrail( name="syntax_validation", description="Ensures evolved code is syntactically valid", check_function=self._check_syntax, severity="critical", ) # Dangerous pattern detection self.register_guardrail( name="dangerous_patterns", description="Detects potentially dangerous code patterns", check_function=self._check_dangerous_patterns, severity="critical", ) # File safety check self.register_guardrail( name="file_safety", description="Prevents modification of critical system files", check_function=self._check_file_safety, severity="critical", ) # Allowed root check self.register_guardrail( name="allowed_root", description="Ensures changes stay within the approved project root", check_function=self._check_allowed_root, severity="critical", ) # Allowed file extension check self.register_guardrail( name="allowed_extension", description="Restricts evolution to approved file types", check_function=self._check_allowed_extension, severity="high", ) # Protected data files check self.register_guardrail( name="protected_data_files", description="Blocks modification of evolution logs and data artifacts", check_function=self._check_protected_data_files, severity="high", ) # Code quality threshold self.register_guardrail( name="quality_threshold", description="Ensures code quality meets minimum standards", check_function=self._check_quality_threshold, severity="high", ) # Change size limit self.register_guardrail( name="change_size_limit", description="Limits the size of code changes", check_function=self._check_change_size, severity="medium", ) # Risky import additions check self.register_guardrail( name="risky_imports", description="Blocks newly added risky imports (network/process/system)", check_function=self._check_risky_imports, severity="critical", ) # Destructive operations check self.register_guardrail( name="destructive_ops", description="Blocks newly added destructive filesystem operations", check_function=self._check_destructive_ops, severity="critical", ) # Rate limiting self.register_guardrail( name="rate_limit", description="Prevents too many evolutions in short time", check_function=self._check_rate_limit, severity="medium", ) def register_guardrail( self, name: str, description: str, check_function: Callable, severity: str, enabled: bool = True, metadata: Optional[Dict[str, Any]] = None, ): """Register a new guardrail.""" guardrail = EvolutionGuardrail( id=str(uuid.uuid4()), name=name, description=description, check_function=check_function, severity=severity, enabled=enabled, metadata=metadata or {}, ) self.guardrails.append(guardrail) def _check_syntax(self, evolution: CodeEvolution, context: Dict[str, Any]) -> Tuple[bool, Optional[str], Dict[str, Any]]: """Check syntax validity.""" is_valid, error, details = CodeAnalyzer.analyze_syntax(evolution.evolved_code) return is_valid, error, details def _check_dangerous_patterns(self, evolution: CodeEvolution, context: Dict[str, Any]) -> Tuple[bool, Optional[str], Dict[str, Any]]: """Check for dangerous patterns.""" dangerous = CodeAnalyzer.detect_dangerous_patterns(evolution.evolved_code) if dangerous: critical = [p for p in dangerous if p["severity"] == "critical"] if critical: return False, "Critical dangerous patterns detected", {"patterns": dangerous} return True, None, {"patterns": dangerous} def _check_file_safety(self, evolution: CodeEvolution, context: Dict[str, Any]) -> Tuple[bool, Optional[str], Dict[str, Any]]: """Check if file is safe to modify.""" protected_files = [ "__init__.py", "evolution.py", # Don't evolve the evolution module itself "nebula.py", # Core system file ] file_name = Path(evolution.file_path).name if file_name in protected_files: return False, f"File {file_name} is protected", {"protected": True} # Check if file is in protected directories protected_dirs = [".git", "__pycache__", "venv", "env", ".venv"] for protected_dir in protected_dirs: if protected_dir in evolution.file_path: return False, f"File is in protected directory: {protected_dir}", {"protected": True} return True, None, {"protected": False} def _check_allowed_root(self, evolution: CodeEvolution, context: Dict[str, Any]) -> Tuple[bool, Optional[str], Dict[str, Any]]: """Ensure file path is within the allowed root directory.""" allowed_root = context.get("allowed_root") if not allowed_root: return False, "No allowed_root configured", {"allowed_root": None} try: file_path = Path(evolution.file_path).resolve() root_path = Path(allowed_root).resolve() if root_path not in file_path.parents and file_path != root_path: return False, "File path outside allowed root", { "file_path": str(file_path), "allowed_root": str(root_path), } return True, None, {"file_path": str(file_path), "allowed_root": str(root_path)} except Exception as e: return False, f"Allowed root check failed: {e}", {} def _check_allowed_extension(self, evolution: CodeEvolution, context: Dict[str, Any]) -> Tuple[bool, Optional[str], Dict[str, Any]]: """Restrict changes to approved file extensions.""" allowed_exts = context.get("allowed_extensions", [".py"]) file_ext = Path(evolution.file_path).suffix.lower() if file_ext not in allowed_exts: return False, f"Extension {file_ext} not allowed", {"allowed_extensions": allowed_exts} return True, None, {"extension": file_ext} def _check_protected_data_files(self, evolution: CodeEvolution, context: Dict[str, Any]) -> Tuple[bool, Optional[str], Dict[str, Any]]: """Block modifications to evolution logs/data artifacts.""" protected_names = { "evolutions.json", "learned_patterns.json", } protected_dirs = {"evolutions", "backups", "test_results"} file_path = Path(evolution.file_path) if file_path.name in protected_names: return False, f"Protected data file: {file_path.name}", {"protected": True} if any(p.name in protected_dirs for p in file_path.parents): return False, "Protected data directory", {"protected": True} return True, None, {"protected": False} def _check_quality_threshold(self, evolution: CodeEvolution, context: Dict[str, Any]) -> Tuple[bool, Optional[str], Dict[str, Any]]: """Check if code quality meets threshold.""" quality_score = evolution.quality_metrics.get("overall_score", 0.0) threshold = context.get("quality_threshold", 0.6) if quality_score < threshold: return False, f"Quality score {quality_score:.2f} below threshold {threshold:.2f}", { "score": quality_score, "threshold": threshold, } return True, None, {"score": quality_score, "threshold": threshold} def _check_change_size(self, evolution: CodeEvolution, context: Dict[str, Any]) -> Tuple[bool, Optional[str], Dict[str, Any]]: """Check if change size is within limits.""" max_changes = context.get("max_changes", 1000) comparison = CodeAnalyzer.compare_code(evolution.original_code, evolution.evolved_code) total_changes = comparison.get("total_changes", 0) if total_changes > max_changes: return False, f"Change size {total_changes} exceeds limit {max_changes}", { "changes": total_changes, "limit": max_changes, } return True, None, {"changes": total_changes, "limit": max_changes} def _check_risky_imports(self, evolution: CodeEvolution, context: Dict[str, Any]) -> Tuple[bool, Optional[str], Dict[str, Any]]: """Block newly added risky imports (network/process/system).""" banned = set(context.get("banned_imports", [])) if not banned: return True, None, {"banned_imports": []} old_imports = CodeAnalyzer.get_imports(evolution.original_code) new_imports = CodeAnalyzer.get_imports(evolution.evolved_code) added = new_imports - old_imports risky = sorted(list(added.intersection(banned))) if risky: return False, f"New risky imports added: {', '.join(risky)}", {"risky_imports": risky} return True, None, {"added_imports": sorted(list(added))} def _check_destructive_ops(self, evolution: CodeEvolution, context: Dict[str, Any]) -> Tuple[bool, Optional[str], Dict[str, Any]]: """Block newly added destructive filesystem operations.""" destructive_tokens = context.get("destructive_tokens", []) if not destructive_tokens: return True, None, {"destructive_tokens": []} old_code = evolution.original_code new_code = evolution.evolved_code added = [tok for tok in destructive_tokens if tok in new_code and tok not in old_code] if added: return False, f"New destructive operations detected: {', '.join(added)}", {"tokens": added} return True, None, {"tokens": []} def _check_rate_limit(self, evolution: CodeEvolution, context: Dict[str, Any]) -> Tuple[bool, Optional[str], Dict[str, Any]]: """Check rate limiting.""" recent_evolutions = context.get("recent_evolutions", []) time_window = context.get("rate_limit_window", 3600) # 1 hour max_evolutions = context.get("max_evolutions_per_window", 10) current_time = time.time() recent_count = sum(1 for e in recent_evolutions if current_time - e < time_window) if recent_count >= max_evolutions: return False, f"Rate limit exceeded: {recent_count} evolutions in {time_window}s", { "count": recent_count, "limit": max_evolutions, "window": time_window, } return True, None, {"count": recent_count, "limit": max_evolutions} def validate_evolution(self, evolution: CodeEvolution, context: Dict[str, Any]) -> Dict[str, Any]: """Validate an evolution against all guardrails.""" results = { "passed": True, "failed_guardrails": [], "warnings": [], "details": {}, } for guardrail in self.guardrails: if not guardrail.enabled: continue try: passed, error, details = guardrail.check_function(evolution, context) results["details"][guardrail.name] = { "passed": passed, "error": error, "details": details, "severity": guardrail.severity, } if not passed: results["passed"] = False if guardrail.severity in ["critical", "high"]: results["failed_guardrails"].append({ "name": guardrail.name, "severity": guardrail.severity, "error": error, }) else: results["warnings"].append({ "name": guardrail.name, "severity": guardrail.severity, "error": error, }) except Exception as e: results["passed"] = False results["failed_guardrails"].append({ "name": guardrail.name, "severity": "critical", "error": f"Guardrail check failed: {str(e)}", }) return results # ============================================================================ # TESTING SYSTEM # ============================================================================ class TestingSystem: """Automated testing system for evolved code.""" def __init__(self, base_dir: Path): self.base_dir = base_dir self.test_results_dir = base_dir / "test_results" self.test_results_dir.mkdir(parents=True, exist_ok=True) def run_tests(self, evolution: CodeEvolution, test_command: Optional[str] = None) -> Dict[str, Any]: """Run tests for evolved code.""" results = { "passed": False, "tests_run": 0, "tests_passed": 0, "tests_failed": 0, "error": None, "output": "", } # If no test command provided, try to detect and run tests if not test_command: # Try common test commands test_commands = [ ["python", "-m", "pytest", evolution.file_path], ["python", "-m", "unittest", "discover"], ["python", "-c", f"import ast; ast.parse(open('{evolution.file_path}').read())"], ] else: test_commands = [[test_command]] for cmd in test_commands: try: # Create a temporary copy of the file for testing test_file = self.test_results_dir / f"test_{evolution.id}.py" with open(test_file, "w") as f: f.write(evolution.evolved_code) # Run test command result = subprocess.run( cmd, capture_output=True, text=True, timeout=30, cwd=self.base_dir, ) results["tests_run"] += 1 results["output"] = result.stdout + result.stderr if result.returncode == 0: results["tests_passed"] += 1 results["passed"] = True break else: results["tests_failed"] += 1 results["error"] = result.stderr except subprocess.TimeoutExpired: results["error"] = "Test execution timed out" break except Exception as e: results["error"] = str(e) break finally: # Clean up test file if test_file.exists(): test_file.unlink() return results # ============================================================================ # PATTERN LEARNING SYSTEM # ============================================================================ class PatternLearningSystem: """Learns and applies successful code patterns.""" def __init__(self, base_dir: Path): self.base_dir = base_dir self.patterns_file = base_dir / "learned_patterns.json" self.patterns: Dict[str, CodePattern] = {} self._load_patterns() def _load_patterns(self): """Load learned patterns from disk.""" if self.patterns_file.exists(): try: with open(self.patterns_file, "r") as f: data = json.load(f) for pattern_id, pattern_data in data.items(): self.patterns[pattern_id] = CodePattern(**pattern_data) except Exception as e: print(f"[Pattern Learning] Error loading patterns: {e}") def _save_patterns(self): """Save learned patterns to disk.""" try: data = { pattern_id: asdict(pattern) for pattern_id, pattern in self.patterns.items() } with open(self.patterns_file, "w") as f: json.dump(data, f, indent=2) except Exception as e: print(f"[Pattern Learning] Error saving patterns: {e}") def learn_pattern( self, pattern_type: str, code_snippet: str, context: str, success: bool, quality_score: float, ): """Learn a new pattern from successful evolution.""" # Check if similar pattern exists pattern_id = hashlib.sha256(f"{pattern_type}:{code_snippet}".encode()).hexdigest()[:16] if pattern_id in self.patterns: pattern = self.patterns[pattern_id] pattern.usage_count += 1 pattern.last_used = time.time() if success: pattern.success_rate = (pattern.success_rate * (pattern.usage_count - 1) + 1.0) / pattern.usage_count else: pattern.success_rate = (pattern.success_rate * (pattern.usage_count - 1)) / pattern.usage_count pattern.quality_score = (pattern.quality_score + quality_score) / 2 else: pattern = CodePattern( id=pattern_id, pattern_type=pattern_type, code_snippet=code_snippet, context=context, success_rate=1.0 if success else 0.0, usage_count=1, last_used=time.time(), quality_score=quality_score, ) self.patterns[pattern_id] = pattern self._save_patterns() def get_relevant_patterns(self, context: str, pattern_type: Optional[str] = None) -> List[CodePattern]: """Get relevant patterns for a given context.""" relevant = [] for pattern in self.patterns.values(): if pattern_type and pattern.pattern_type != pattern_type: continue # Simple relevance scoring based on context similarity if pattern.context.lower() in context.lower() or context.lower() in pattern.context.lower(): relevant.append(pattern) # Sort by success rate and quality relevant.sort(key=lambda p: (p.success_rate * p.quality_score), reverse=True) return relevant[:5] # Return top 5 # ============================================================================ # MAIN EVOLUTION ENGINE # ============================================================================ class NebulaEvolutionEngine: """Main evolution engine for code evolution.""" def __init__( self, base_dir: Optional[Path] = None, nebula_life=None, # Optional Nebula Life integration allowed_root: Optional[Path] = None, ): self.base_dir = Path(base_dir) if base_dir else Path.home() / ".nebula_evolution" self.base_dir.mkdir(parents=True, exist_ok=True) self.evolutions_dir = self.base_dir / "evolutions" self.evolutions_dir.mkdir(parents=True, exist_ok=True) self.backup_dir = self.base_dir / "backups" self.backup_dir.mkdir(parents=True, exist_ok=True) self.nebula_life = nebula_life # Initialize subsystems self.guardrails = GuardrailSystem() self.testing = TestingSystem(self.base_dir) self.pattern_learning = PatternLearningSystem(self.base_dir) # Evolution tracking self.evolutions: Dict[str, CodeEvolution] = {} self.recent_evolutions: deque = deque(maxlen=100) self._load_evolutions() # Configuration allowed_root_path = Path(allowed_root) if allowed_root else Path.cwd() self.config = { "auto_approve": False, # Require approval for all changes "auto_approve_risk_threshold": 0.25, "auto_approve_levels": ["low", "experimental"], "auto_test": True, # Run tests automatically "quality_threshold": 0.6, # Minimum quality score "max_changes": 1000, # Maximum lines changed "rate_limit_window": 3600, # 1 hour "max_evolutions_per_window": 10, "enable_rollback": True, "enable_pattern_learning": True, "allowed_root": str(allowed_root_path), "allowed_extensions": [".py"], "banned_imports": [ "subprocess", "socket", "requests", "urllib", "http", "ftplib", "paramiko" ], "destructive_tokens": [ "os.remove(", "os.unlink(", "os.rmdir(", "shutil.rmtree(", "shutil.move(", "Path(", ".unlink(", ".rmdir(", "os.system(", "subprocess." ], } def _load_evolutions(self): """Load evolution history from disk.""" evolutions_file = self.base_dir / "evolutions.json" if evolutions_file.exists(): try: with open(evolutions_file, "r") as f: data = json.load(f) for evolution_id, evolution_data in data.items(): # Convert enum strings back to enums evolution_data["trigger"] = EvolutionTrigger(evolution_data["trigger"]) evolution_data["safety_level"] = SafetyLevel(evolution_data["safety_level"]) evolution_data["status"] = EvolutionStatus(evolution_data["status"]) self.evolutions[evolution_id] = CodeEvolution(**evolution_data) except Exception as e: print(f"[Evolution Engine] Error loading evolutions: {e}") def _save_evolutions(self): """Save evolution history to disk.""" evolutions_file = self.base_dir / "evolutions.json" try: data = { evolution_id: asdict(evolution) for evolution_id, evolution in self.evolutions.items() } # Convert enums to strings for JSON for evolution_id, evolution_data in data.items(): evolution_data["trigger"] = evolution_data["trigger"].value evolution_data["safety_level"] = evolution_data["safety_level"].value evolution_data["status"] = evolution_data["status"].value with open(evolutions_file, "w") as f: json.dump(data, f, indent=2) except Exception as e: print(f"[Evolution Engine] Error saving evolutions: {e}") def evolve_code( self, file_path: str, evolved_code: str, trigger: EvolutionTrigger, thought_context: Optional[str] = None, interaction_context: Optional[str] = None, auto_approve: Optional[bool] = None, ) -> CodeEvolution: """ Evolve code with comprehensive guardrails and safety checks. Args: file_path: Path to file to evolve evolved_code: New code version trigger: What triggered the evolution thought_context: Context from consciousness/thoughts interaction_context: Context from user interaction auto_approve: Override auto-approve setting Returns: CodeEvolution object with status and results """ # Read original code try: with open(file_path, "r") as f: original_code = f.read() except Exception as e: raise ValueError(f"Cannot read file {file_path}: {e}") # Create evolution record evolution = CodeEvolution( id=str(uuid.uuid4()), timestamp=time.time(), file_path=file_path, original_code=original_code, evolved_code=evolved_code, trigger=trigger, thought_context=thought_context, interaction_context=interaction_context, safety_level=self._assess_safety_level(file_path), ) # Analyze code quality complexity = CodeAnalyzer.analyze_complexity(evolved_code) dangerous = CodeAnalyzer.detect_dangerous_patterns(evolved_code) quality_score = CodeAnalyzer.calculate_quality_score(evolved_code, complexity, dangerous) evolution.quality_metrics = { "overall_score": quality_score, "complexity": complexity, "dangerous_patterns_count": len(dangerous), } # Validate against guardrails context = { "quality_threshold": self.config["quality_threshold"], "max_changes": self.config["max_changes"], "rate_limit_window": self.config["rate_limit_window"], "max_evolutions_per_window": self.config["max_evolutions_per_window"], "recent_evolutions": list(self.recent_evolutions), "allowed_root": self.config["allowed_root"], "allowed_extensions": self.config["allowed_extensions"], "banned_imports": self.config["banned_imports"], "destructive_tokens": self.config["destructive_tokens"], } validation_results = self.guardrails.validate_evolution(evolution, context) evolution.validation_results = validation_results if not validation_results["passed"]: evolution.status = EvolutionStatus.REJECTED evolution.error_message = "; ".join([g["error"] for g in validation_results["failed_guardrails"]]) self._save_evolution(evolution) return evolution # Run tests if enabled if self.config["auto_test"]: evolution.status = EvolutionStatus.TESTING test_results = self.testing.run_tests(evolution) evolution.test_results = test_results if not test_results["passed"]: evolution.status = EvolutionStatus.REJECTED evolution.error_message = f"Tests failed: {test_results.get('error', 'Unknown error')}" self._save_evolution(evolution) return evolution # Calculate risk score evolution.risk_score = self._calculate_risk_score(evolution) # Check if approval is required (strict by default) should_auto_approve = auto_approve if auto_approve is not None else self.config["auto_approve"] auto_levels = set(self.config.get("auto_approve_levels", [])) level_ok = evolution.safety_level.value in auto_levels risk_ok = evolution.risk_score <= self.config["auto_approve_risk_threshold"] if should_auto_approve and level_ok and risk_ok: evolution.status = EvolutionStatus.APPROVED evolution.approval_required = False else: evolution.status = EvolutionStatus.PENDING evolution.approval_required = True # Store evolution self._save_evolution(evolution) self.recent_evolutions.append(evolution.timestamp) return evolution def _assess_safety_level(self, file_path: str) -> SafetyLevel: """Assess safety level of a file.""" file_name = Path(file_path).name # Critical files critical_files = ["__init__.py", "nebula.py", "evolution.py"] if file_name in critical_files: return SafetyLevel.CRITICAL # High importance if "core" in file_path.lower() or "engine" in file_path.lower(): return SafetyLevel.HIGH # Experimental if "test" in file_path.lower() or "experimental" in file_path.lower(): return SafetyLevel.EXPERIMENTAL return SafetyLevel.MEDIUM def _calculate_risk_score(self, evolution: CodeEvolution) -> float: """Calculate risk score for an evolution (0.0 to 1.0).""" risk = 0.0 # Safety level risk safety_risks = { SafetyLevel.CRITICAL: 0.8, SafetyLevel.HIGH: 0.5, SafetyLevel.MEDIUM: 0.3, SafetyLevel.LOW: 0.1, SafetyLevel.EXPERIMENTAL: 0.2, } risk += safety_risks.get(evolution.safety_level, 0.5) * 0.3 # Quality risk quality_score = evolution.quality_metrics.get("overall_score", 0.5) risk += (1.0 - quality_score) * 0.3 # Change size risk comparison = CodeAnalyzer.compare_code(evolution.original_code, evolution.evolved_code) change_ratio = comparison.get("total_changes", 0) / max(len(evolution.original_code.splitlines()), 1) risk += min(change_ratio, 1.0) * 0.2 # Dangerous patterns risk dangerous_count = evolution.quality_metrics.get("dangerous_patterns_count", 0) risk += min(dangerous_count * 0.1, 0.2) return min(1.0, risk) def approve_evolution(self, evolution_id: str, approved_by: str = "system") -> bool: """Approve an evolution for deployment.""" if evolution_id not in self.evolutions: return False evolution = self.evolutions[evolution_id] if evolution.status != EvolutionStatus.PENDING: return False evolution.status = EvolutionStatus.APPROVED evolution.approved_by = approved_by evolution.approved_at = time.time() self._save_evolution(evolution) return True def deploy_evolution(self, evolution_id: str) -> bool: """Deploy an approved evolution.""" if evolution_id not in self.evolutions: return False evolution = self.evolutions[evolution_id] if evolution.status != EvolutionStatus.APPROVED: return False try: # Create backup backup_path = self.backup_dir / f"{evolution.id}_{int(evolution.timestamp)}.py" shutil.copy2(evolution.file_path, backup_path) # Write evolved code with open(evolution.file_path, "w") as f: f.write(evolution.evolved_code) evolution.status = EvolutionStatus.DEPLOYED evolution.deployed_at = time.time() # Learn pattern if successful if self.config["enable_pattern_learning"]: self.pattern_learning.learn_pattern( pattern_type="evolution", code_snippet=evolution.evolved_code[:500], # First 500 chars context=evolution.thought_context or evolution.interaction_context or "", success=True, quality_score=evolution.quality_metrics.get("overall_score", 0.5), ) self._save_evolution(evolution) return True except Exception as e: evolution.status = EvolutionStatus.FAILED evolution.error_message = str(e) self._save_evolution(evolution) return False def rollback_evolution(self, evolution_id: str, reason: str = "Manual rollback") -> bool: """Rollback a deployed evolution.""" if evolution_id not in self.evolutions: return False evolution = self.evolutions[evolution_id] if evolution.status != EvolutionStatus.DEPLOYED: return False try: # Restore from backup backup_path = self.backup_dir / f"{evolution.id}_{int(evolution.timestamp)}.py" if backup_path.exists(): shutil.copy2(backup_path, evolution.file_path) evolution.status = EvolutionStatus.ROLLED_BACK evolution.rollback_reason = reason self._save_evolution(evolution) return True except Exception as e: evolution.error_message = f"Rollback failed: {str(e)}" self._save_evolution(evolution) return False def _save_evolution(self, evolution: CodeEvolution): """Save evolution to disk.""" self.evolutions[evolution.id] = evolution self._save_evolutions() # Also save individual evolution file evolution_file = self.evolutions_dir / f"{evolution.id}.json" try: evolution_dict = asdict(evolution) # Convert enums to strings evolution_dict["trigger"] = evolution.trigger.value evolution_dict["safety_level"] = evolution.safety_level.value evolution_dict["status"] = evolution.status.value with open(evolution_file, "w") as f: json.dump(evolution_dict, f, indent=2) except Exception as e: print(f"[Evolution Engine] Error saving evolution file: {e}") def get_evolution_history(self, file_path: Optional[str] = None) -> List[CodeEvolution]: """Get evolution history, optionally filtered by file.""" evolutions = list(self.evolutions.values()) if file_path: evolutions = [e for e in evolutions if e.file_path == file_path] evolutions.sort(key=lambda e: e.timestamp, reverse=True) return evolutions def get_statistics(self) -> Dict[str, Any]: """Get evolution statistics.""" total = len(self.evolutions) by_status = defaultdict(int) by_trigger = defaultdict(int) for evolution in self.evolutions.values(): by_status[evolution.status.value] += 1 by_trigger[evolution.trigger.value] += 1 return { "total_evolutions": total, "by_status": dict(by_status), "by_trigger": dict(by_trigger), "learned_patterns": len(self.pattern_learning.patterns), } # ============================================================================ # INTEGRATION WITH NEBULA LIFE # ============================================================================ def create_evolution_engine( nebula_life=None, base_dir: Optional[Path] = None, allowed_root: Optional[Path] = None, ) -> NebulaEvolutionEngine: """Factory function to create evolution engine with Nebula Life integration.""" return NebulaEvolutionEngine( base_dir=base_dir, nebula_life=nebula_life, allowed_root=allowed_root, ) # ============================================================================ # EXPORTS # ============================================================================ __all__ = [ "NebulaEvolutionEngine", "CodeEvolution", "CodePattern", "EvolutionStatus", "EvolutionTrigger", "SafetyLevel", "CodeQuality", "CodeAnalyzer", "GuardrailSystem", "TestingSystem", "PatternLearningSystem", "create_evolution_engine", ]