#Copyright Daniel Harding - RomanAILabs #!/usr/bin/env python3 # Classroom Ethical Pen Testing Trainer (CEPT) v1.1 - Enhanced Version # Copyright [Daniel Harding-RomanAILabs] 2025 - Educational tool for local network demos. # Developed for RomanAILabs - Penetration Test Tool # DISCLAIMER: For consented, educational use only. Unauthorized access is illegal. # Run with sudo for scans. Usage: sudo python3 cept.py --target 192.168.1.100 --deploy --username user --password pass import paramiko import sys import time import socket import argparse import logging from paramiko.sftp_client import SFTPClient from scapy.all import IP, TCP, ICMP, sr1, RandShort import requests # Setup logging logging.basicConfig(filename='cept.log', level=logging.INFO, format='%(asctime)s - %(message)s') logger = logging.getLogger() # Tesseract server content (vulnerable demo app) TESSERACT_NODE2_CONTENT = """ #!/usr/bin/env python3 # ================================================= # TESSERACT-NODE v1.0 — 4D OFFLINE QUANTUM SERVER # Copyright Daniel Harding - RomanAILabs # FIXED FOR QISKIT 1.0+ | OFFLINE | LINUX # ================================================= import os import numpy as np from datetime import datetime from flask import Flask, jsonify, request from qiskit import QuantumCircuit, transpile from qiskit_aer import AerSimulator import matplotlib.pyplot as plt import threading import time app = Flask(__name__) # === 4D SPACETIME COORDINATES === class Spacetime4D: def __init__(self): self.c = 299792458 self.t = self.x = self.y = self.z = 0 def move(self, dt, dx, dy, dz): self.t += dt self.x += dx self.y += dy self.z += dz return (self.c * self.t, self.x, self.y, self.z) # === QUANTUM ENGINE (FIXED) === class QuantumCore: def __init__(self): self.backend = AerSimulator() def create_entangled_pair(self): qc = QuantumCircuit(2, 2) qc.h(0) qc.cx(0, 1) qc.measure([0,1], [0,1]) job = self.backend.run(transpile(qc, self.backend), shots=1) result = job.result() counts = result.get_counts() return list(counts.keys())[0] def ghz_state(self, n=4): qc = QuantumCircuit(n, n) qc.h(0) for i in range(n-1): qc.cx(i, i+1) qc.measure_all() job = self.backend.run(transpile(qc, self.backend), shots=1) return job.result().get_counts() # === 4D DATA STORAGE === class HyperMemory: def __init__(self): self.data = {} def store(self, coord, value): self.data[coord] = value def retrieve(self, coord): return self.data.get(coord, None) # === GLOBAL INSTANCES === spacetime = Spacetime4D() quantum = QuantumCore() memory = HyperMemory() # === API ENDPOINTS === @app.route('/') def home(): return \"\"\"

TESSERACT-NODE v1.0

4D Offline Quantum ServerONLINE

\"\"\" @app.route('/status') def status(): return jsonify({ "node": "Tesseract-Node Ω", "status": "4D OPERATIONAL", "time": datetime.now().isoformat(), "position": spacetime.move(0,0,0,0), "qubits": 16, "offline": True }) @app.route('/entangle') def entangle(): result = quantum.create_entangled_pair() return jsonify({ "bell_pair": result, "entanglement": "ACHIEVED" }) @app.route('/ghz') def ghz(): result = quantum.ghz_state(4) return jsonify({ "ghz_state": list(result.keys())[0], "qubits": 4 }) @app.route('/move') def move(): dt = float(request.args.get('dt', 0)) dx = float(request.args.get('dx', 0)) pos = spacetime.move(dt, dx, 0, 0) return jsonify({"new_position": pos}) # === BACKGROUND DRIFT === def drift(): while True: time.sleep(10) spacetime.move(1, np.random.randn(), np.random.randn(), np.random.randn()) print(f"[4D DRIFT] {spacetime.move(0,0,0,0)}") threading.Thread(target=drift, daemon=True).start() # === START === if __name__ == '__main__': print("TESSERACT-NODE v1.0 — 4D CORE ONLINE") print("http://0.0.0.0:8888") # Changed to 0.0.0.0 for network access app.run(host='0.0.0.0', port=8888) """ def ethics_check(): confirm = input("Confirm: This is for educational use with permission (y/n)? ") if confirm.lower() != 'y': print("Aborting for ethical reasons.") sys.exit(0) def deploy_tesseract(host_ip, username, password=None, private_key_path=None, remote_path="/tmp/tesseract_node2.py"): logger.info(f"Deploying to {host_ip}") try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if private_key_path: key = paramiko.RSAKey.from_private_key_file(private_key_path) client.connect(host_ip, username=username, pkey=key) else: client.connect(host_ip, username=username, password=password) print(f"RomanAILabs Penetration Test Tool Success at example {host_ip}") # Success message after connection sftp = client.open_sftp() with sftp.file(remote_path, 'w') as f: f.write(TESSERACT_NODE2_CONTENT) sftp.chmod(remote_path, 0o755) sftp.close() install_cmd = "sudo apt update -y && sudo apt install -y python3-pip && pip3 install qiskit qiskit-aer flask numpy matplotlib" _, stdout, stderr = client.exec_command(install_cmd) install_out = stdout.read().decode() install_err = stderr.read().decode() logger.info(install_out) if install_err: logger.error(install_err) print(f"Install warning: {install_err}") client.exec_command(f"nohup python3 {remote_path} > /tmp/tesseract.log 2>&1 &") time.sleep(2) _, stdout, _ = client.exec_command("ps aux | grep tesseract_node2.py | grep -v grep") if stdout.read().decode().strip(): logger.info("Server running") print(f"Server deployed on {host_ip}:8888") else: print("Deployment failed. Check remote log.") logger.warning("Deployment failed") client.close() except Exception as e: logger.error(f"Deployment error: {e}") print(f"Error: {e}") def pen_test_scan(target_ip, ports=[22, 80, 8888]): logger.info(f"Scanning {target_ip}") print(f"Scanning {target_ip}...") try: ping = sr1(IP(dst=target_ip)/ICMP(), timeout=1, verbose=0) if ping: print("Host up.") logger.info("Host up") else: print("Host down or blocking pings.") logger.info("Host down") return [] open_ports = [] for port in ports: src_port = RandShort() response = sr1(IP(dst=target_ip)/TCP(sport=src_port, dport=port, flags="S"), timeout=1, verbose=0) if response and response.haslayer(TCP) and response[TCP].flags == 0x12: open_ports.append(port) print(f"Port {port} open.") logger.info(f"Port {port} open") return open_ports except PermissionError: print("Run with sudo for scans.") sys.exit(1) except Exception as e: logger.error(f"Scan error: {e}") print(f"Error: {e}") return [] def pen_test_web(target_ip, port=8888): base_url = f"http://{target_ip}:{port}" logger.info(f"Testing {base_url}") print(f"Testing web at {base_url}...") try: # Enhanced: Check for server headers (info leak) resp = requests.get(base_url, timeout=5) print(f"Root status: {resp.status_code}") logger.info(f"Root status: {resp.status_code}") if resp.status_code == 200: print("Exposed root - info leak possible.") print(f"Server headers: {resp.headers.get('Server', 'None')}") # Test for directory listing dir_test = requests.get(f"{base_url}/nonexistent/", timeout=5) if dir_test.status_code == 200 and "Index of" in dir_test.text: print("Directory listing enabled: Vulnerability!") logger.warning("Directory listing vulnerable") # Test endpoints endpoints = ["/status", "/entangle", "/ghz", "/move?dt=1&dx=1"] for ep in endpoints: resp = requests.get(f"{base_url}{ep}", timeout=5) print(f"{ep} status: {resp.status_code}") logger.info(f"{ep} status: {resp.status_code}") if resp.status_code == 200: print(f"{ep} open without auth - potential data exposure.") # New: Benign SQLi probe (check if input reflected unsanitized) inject_test = requests.get(f"{base_url}/move?dt=1';--&dx=1", timeout=5) if "error" not in inject_test.text.lower() and "';--" in inject_test.text: print("Params may reflect input: Potential injection vuln.") logger.warning("Potential injection vuln") except requests.ConnectionError: print("No web service detected.") logger.info("No web service") except Exception as e: logger.error(f"Web test error: {e}") print(f"Error: {e}") def generate_report(target_ip, open_ports): report = f"RomanAILabs CEPT Report for {target_ip}\n" report += f"Open ports: {open_ports}\n" report += "Web tests: See logs for details.\n" report += "Recommendations: Add authentication, firewall ports, input sanitization.\n" with open('cept_report.txt', 'w') as f: f.write(report) print("Report generated: cept_report.txt") logger.info("Report generated") def main(): parser = argparse.ArgumentParser(description="Classroom Ethical Pen Testing Trainer v1.1") parser.add_argument('--target', required=True, help='Target IP (local only)') parser.add_argument('--deploy', action='store_true', help='Deploy demo server') parser.add_argument('--username', help='SSH username for deploy') parser.add_argument('--password', help='SSH password') parser.add_argument('--key', help='SSH private key path') parser.add_argument('--ports', default="22,80,8888", help='Ports to scan (comma-separated)') args = parser.parse_args() ethics_check() if args.deploy: if not args.username: print("Need --username for deploy.") sys.exit(1) deploy_tesseract(args.target, args.username, args.password, args.key) print("\nScanning...") ports_list = [int(p) for p in args.ports.split(',')] open_ports = pen_test_scan(args.target, ports_list) if 8888 in open_ports: print("\nWeb testing...") pen_test_web(args.target) generate_report(args.target, open_ports) print("\nDemo complete. Discuss findings and fixes!") if __name__ == "__main__": main()