# Copyright Daniel Harding - RomanAILabs # Credits: Nova (GPT-5.2 Thinking) """ notify_guard - Privacy-first notification ingestion for Termux (Android) Safely ingests notifications from Termux:API notification panel, encrypts at rest with AES-256-GCM, deduplicates, applies local rules, and optionally calls remote endpoints for summarization. Default: zero network, local-only processing. Persisted Encryption: - Salt is generated once during setup and stored in config.json (base64). - All encrypt/decrypt operations use the same persisted salt. - Key derivation: Scrypt (preferred) or PBKDF2-HMAC-SHA256 with 480k iterations. Remote Safety: - Local redaction_mode applied before storage. - Additional remote_redaction layer applied before remote API calls. - Hard payload size limit enforced. """ import os import json import sqlite3 import hashlib import secrets import subprocess import logging import re import time import base64 from datetime import datetime from pathlib import Path from typing import Optional, Dict, List, Tuple, Any from dataclasses import dataclass try: from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC CRYPTO_AVAILABLE = True except ImportError: CRYPTO_AVAILABLE = False try: from cryptography.hazmat.primitives.kdf.scrypt import Scrypt SCRYPT_AVAILABLE = True except ImportError: SCRYPT_AVAILABLE = False # ============================================================================ # Configuration & Paths # ============================================================================ CONFIG_DIR = Path.home() / ".config" / "notify_guard" DATA_DIR = Path.home() / ".local" / "share" / "notify_guard" CONFIG_FILE = CONFIG_DIR / "config.json" DB_PATH = DATA_DIR / "romanai-android.db" DEFAULT_CONFIG = { "poll_interval_seconds": 60, "redaction_mode": "truncate", "truncate_len": 80, "store_backend": "sqlite", "db_path": str(DB_PATH), "retention_days": 14, "quiet_hours": {"start": "22:00", "end": "07:00"}, "allow_packages": [], "deny_packages": [], "keywords_priority": ["bank", "code", "urgent", "alert"], "keywords_ignore": ["sale", "promo", "offer"], "salt_b64": "", "remote": { "enabled_default": False, "provider": "grok3", "endpoint": "", "api_key_env": "GROK_API_KEY", "timeout_seconds": 12, "max_chars": 1200, }, } logger = logging.getLogger("notify_guard") # ============================================================================ # Data Classes # ============================================================================ @dataclass class Notification: """Represents a normalized notification.""" id: Optional[str] = None timestamp: int = 0 package: str = "" title: str = "" text: str = "" priority: int = 0 ongoing: bool = False raw: Dict[str, Any] = None def __post_init__(self): if self.raw is None: self.raw = {} def fingerprint(self, window_sec: int = 10) -> str: """Generate dedup fingerprint: sha256 of (pkg|title|text|timestamp_window).""" ts_window = (self.timestamp // window_sec) * window_sec data = f"{self.package}|{self.title}|{self.text}|{ts_window}" return hashlib.sha256(data.encode()).hexdigest() # ============================================================================ # Encryption & Key Derivation # ============================================================================ def derive_key_from_passphrase( passphrase: str, salt: bytes, use_scrypt: bool = True ) -> bytes: """ Derive a 32-byte AES key from passphrase using Scrypt (preferred) or PBKDF2HMAC. Salt MUST be provided (persisted from setup). """ if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography module not installed. pip install cryptography") if use_scrypt and SCRYPT_AVAILABLE: try: kdf = Scrypt( salt=salt, length=32, n=2**14, r=8, p=1, ) key = kdf.derive(passphrase.encode()) return key except Exception as e: logger.warning(f"Scrypt failed, falling back to PBKDF2HMAC: {e}") # Fallback to PBKDF2HMAC kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=480000, ) key = kdf.derive(passphrase.encode()) return key def encrypt_aes_gcm(plaintext: bytes, key: bytes) -> bytes: """ Encrypt plaintext with AES-256-GCM. Returns nonce (12 bytes) + ciphertext as single blob. """ if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography module not installed") nonce = secrets.token_bytes(12) cipher = AESGCM(key) ciphertext = cipher.encrypt(nonce, plaintext, None) return nonce + ciphertext def decrypt_aes_gcm(blob: bytes, key: bytes) -> bytes: """ Decrypt AES-256-GCM blob (nonce + ciphertext with auth tag). """ if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography module not installed") nonce = blob[:12] ciphertext_with_tag = blob[12:] cipher = AESGCM(key) plaintext = cipher.decrypt(nonce, ciphertext_with_tag, None) return plaintext # ============================================================================ # Redaction Engine # ============================================================================ def redact_text(text: str, mode: str, truncate_len: int = 80) -> str: """Apply redaction based on mode: off, truncate, or hash.""" if mode == "off": return text if mode == "truncate": text = re.sub( r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', "[EMAIL]", text, ) text = re.sub(r'\b\+?\d{7,}\b', "[PHONE]", text) text = re.sub(r'https?://[^\s]+', "[URL]", text) text = re.sub(r'\b\d{8,}\b', "[DIGITS]", text) if len(text) > truncate_len: text = text[:truncate_len] + f"... ({len(text)} total chars)" return text if mode == "hash": return "[REDACTED:" + hashlib.sha256(text.encode()).hexdigest()[:8] + "]" return text def remote_redact_text(text: str, truncate_len: int = 100) -> str: """Always-on redaction for remote API calls.""" text = re.sub( r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', "[EMAIL]", text, ) text = re.sub(r'\b\+?\d{7,}\b', "[PHONE]", text) text = re.sub(r'https?://[^\s]+', "[URL]", text) text = re.sub(r'\b\d{8,}\b', "[DIGITS]", text) if len(text) > truncate_len: text = text[:truncate_len] + "..." return text # ============================================================================ # Quiet Hours & Rules # ============================================================================ def is_in_quiet_hours(config: Dict[str, Any]) -> bool: """Check if current time is within quiet hours.""" try: qh = config.get("quiet_hours", {}) start_str = qh.get("start", "22:00") end_str = qh.get("end", "07:00") start_h, start_m = map(int, start_str.split(":")) end_h, end_m = map(int, end_str.split(":")) now = datetime.now() current_minutes = now.hour * 60 + now.minute start_minutes = start_h * 60 + start_m end_minutes = end_h * 60 + end_m if start_minutes < end_minutes: return start_minutes <= current_minutes < end_minutes else: return current_minutes >= start_minutes or current_minutes < end_minutes except Exception: return False def should_skip_notification(package: str, config: Dict[str, Any]) -> bool: """Check if notification should be skipped based on allow/deny lists.""" allow = config.get("allow_packages", []) deny = config.get("deny_packages", []) if deny and package in deny: return True if allow and package not in allow: return True return False def compute_priority_score(title: str, text: str, config: Dict[str, Any]) -> int: """Compute priority score based on keyword matches.""" score = 0 combined = (title + " " + text).lower() priority_kws = config.get("keywords_priority", []) ignore_kws = config.get("keywords_ignore", []) for kw in priority_kws: if kw.lower() in combined: score += 25 for kw in ignore_kws: if kw.lower() in combined: score -= 10 return max(0, min(100, score)) # ============================================================================ # Termux Integration # ============================================================================ def check_termux_api() -> bool: """Check if termux-notification-list is available.""" try: result = subprocess.run( ["termux-notification-list"], capture_output=True, timeout=2, ) return result.returncode == 0 except FileNotFoundError: return False except Exception: return False def get_notifications(config: Dict[str, Any]) -> List[Notification]: """Fetch notifications via termux-notification-list.""" try: result = subprocess.run( ["termux-notification-list"], capture_output=True, timeout=5, text=True, ) if result.returncode != 0: logger.error(f"termux-notification-list failed: {result.stderr}") return [] try: data = json.loads(result.stdout) except json.JSONDecodeError as e: logger.error(f"Failed to parse notification JSON: {e}") return [] if not isinstance(data, list): data = [data] if data else [] notifications = [] for item in data: package = item.get("packageName", "unknown") if should_skip_notification(package, config): logger.debug(f"Skipping notification from {package}") continue notif = Notification( timestamp=int(item.get("when", 0) / 1000), package=package, title=item.get("title", ""), text=item.get("text", ""), priority=item.get("priority", 0), ongoing=item.get("ongoing", False), raw=item, ) notifications.append(notif) return notifications except Exception as e: logger.error(f"Failed to fetch notifications: {e}") return [] def notify_status(title: str, text: str = "", config: Optional[Dict] = None) -> None: """Send a Termux notification for status updates. Skip if in quiet hours.""" if config and is_in_quiet_hours(config): logger.debug("In quiet hours, skipping status notification.") return try: cmd = ["termux-notification", "--title", title] if text: cmd.extend(["--content", text]) subprocess.run(cmd, timeout=2, capture_output=True) except Exception: pass def toast(message: str, config: Optional[Dict] = None) -> None: """Send a Termux toast message. Skip if in quiet hours.""" if config and is_in_quiet_hours(config): return try: subprocess.run( ["termux-toast", message], timeout=2, capture_output=True, ) except Exception: pass # ============================================================================ # Configuration Management # ============================================================================ def load_config() -> Dict[str, Any]: """Load config from file or return defaults.""" if CONFIG_FILE.exists(): try: with open(CONFIG_FILE, "r") as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load config: {e}. Using defaults.") return DEFAULT_CONFIG.copy() def save_config(config: Dict[str, Any]) -> None: """Save config to file.""" CONFIG_DIR.mkdir(parents=True, exist_ok=True) with open(CONFIG_FILE, "w") as f: json.dump(config, f, indent=2) logger.info(f"Config saved to {CONFIG_FILE}") def get_salt_from_config(config: Dict[str, Any]) -> Optional[bytes]: """Extract salt from config (base64 encoded).""" salt_b64 = config.get("salt_b64", "") if salt_b64: try: return base64.b64decode(salt_b64) except Exception as e: logger.warning(f"Failed to decode salt from config: {e}") return None # ============================================================================ # Database # ============================================================================ class NotificationDB: """SQLite storage for encrypted notifications.""" def __init__(self, db_path: Path, key: bytes): self.db_path = db_path self.key = key self.db_path.parent.mkdir(parents=True, exist_ok=True) self._init_db() def _init_db(self) -> None: """Initialize database schema.""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS records ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts INTEGER NOT NULL, pkg TEXT NOT NULL, fp TEXT UNIQUE NOT NULL, blob BLOB NOT NULL, created_at REAL NOT NULL ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_ts ON records(ts DESC)") conn.execute("CREATE INDEX IF NOT EXISTS idx_pkg ON records(pkg)") conn.execute(""" CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ) """) conn.commit() def insert_notification(self, notif: Notification) -> Optional[int]: """Encrypt and store a notification. Return record ID or None if duplicate.""" plaintext = json.dumps({ "ts": notif.timestamp, "pkg": notif.package, "title": notif.title, "text": notif.text, "priority": notif.priority, "ongoing": notif.ongoing, }).encode() blob = encrypt_aes_gcm(plaintext, self.key) fp = notif.fingerprint() try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( """ INSERT INTO records (ts, pkg, fp, blob, created_at) VALUES (?, ?, ?, ?, ?) """, (notif.timestamp, notif.package, fp, blob, time.time()), ) conn.commit() return cursor.lastrowid except sqlite3.IntegrityError: logger.debug(f"Duplicate notification (fp={fp[:8]}...), skipped.") return None def get_records( self, app: Optional[str] = None, contains: Optional[str] = None, since_seconds: Optional[int] = None, limit: int = 100, ) -> List[Tuple[int, Notification]]: """Decrypt and fetch notifications with optional filters.""" query = "SELECT id, ts, pkg, blob FROM records WHERE 1=1" params: List[Any] = [] if app: query += " AND pkg = ?" params.append(app) if since_seconds is not None: cutoff_ts = int(time.time()) - since_seconds query += " AND ts > ?" params.append(cutoff_ts) query += " ORDER BY ts DESC LIMIT ?" params.append(limit) results = [] try: with sqlite3.connect(self.db_path) as conn: for row in conn.execute(query, params): record_id, ts, pkg, blob = row try: plaintext = decrypt_aes_gcm(blob, self.key) data = json.loads(plaintext) notif = Notification( id=str(record_id), timestamp=data["ts"], package=data["pkg"], title=data["title"], text=data["text"], priority=data.get("priority", 0), ongoing=data.get("ongoing", False), ) if contains is None or contains.lower() in ( notif.title + " " + notif.text ).lower(): results.append((record_id, notif)) except Exception as e: logger.warning(f"Failed to decrypt record {record_id}: {e}") except Exception as e: logger.error(f"Database query failed: {e}") return results def get_record_by_id(self, record_id: int) -> Optional[Notification]: """Fetch a single record by ID.""" try: with sqlite3.connect(self.db_path) as conn: row = conn.execute( "SELECT ts, pkg, blob FROM records WHERE id = ?", (record_id,), ).fetchone() if row: ts, pkg, blob = row plaintext = decrypt_aes_gcm(blob, self.key) data = json.loads(plaintext) return Notification( id=str(record_id), timestamp=data["ts"], package=data["pkg"], title=data["title"], text=data["text"], priority=data.get("priority", 0), ongoing=data.get("ongoing", False), ) except Exception as e: logger.error(f"Failed to fetch record {record_id}: {e}") return None def cleanup_old(self, retention_days: int) -> int: """Delete notifications older than retention_days. Returns count deleted.""" cutoff_ts = int(time.time()) - (retention_days * 86400) try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( "DELETE FROM records WHERE ts < ?", (cutoff_ts,), ) conn.commit() deleted = cursor.rowcount if deleted > 0: logger.info(f"Cleaned up {deleted} old records.") return deleted except Exception as e: logger.error(f"Cleanup failed: {e}") return 0 def count_by_package(self) -> Dict[str, int]: """Count notifications per package.""" try: with sqlite3.connect(self.db_path) as conn: rows = conn.execute( "SELECT pkg, COUNT(*) FROM records GROUP BY pkg" ).fetchall() return {pkg: count for pkg, count in rows} except Exception as e: logger.error(f"Count query failed: {e}") return {} def get_all_records(self) -> List[Tuple[int, Notification]]: """Fetch all records (for export).""" return self.get_records(limit=999999) def delete_all(self) -> None: """Delete all records.""" try: with sqlite3.connect(self.db_path) as conn: conn.execute("DELETE FROM records") conn.commit() logger.info("All records deleted.") except Exception as e: logger.error(f"Delete failed: {e}") # ============================================================================ # Public API Functions # ============================================================================ def setup() -> Dict[str, Any]: """Initialize configuration, storage, and generate/persist salt.""" if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography module not installed. pip install cryptography") CONFIG_DIR.mkdir(parents=True, exist_ok=True) DATA_DIR.mkdir(parents=True, exist_ok=True) config = load_config() if not config.get("salt_b64"): salt = secrets.token_bytes(16) config["salt_b64"] = base64.b64encode(salt).decode() save_config(config) logger.info("Generated and persisted encryption salt.") else: logger.info("Salt already exists in config.") try: db = NotificationDB(Path(config["db_path"]), secrets.token_bytes(32)) logger.info(f"Database initialized at {config['db_path']}") except Exception as e: logger.error(f"Failed to initialize database: {e}") raise return config def poll(config: Dict[str, Any], passphrase: str, loop: bool = False, interval: Optional[int] = None) -> Tuple[int, int]: """ Fetch notifications, apply redaction, deduplicate, and encrypt. Returns (stored_count, duplicate_count). """ if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography module required. pip install cryptography") salt = get_salt_from_config(config) if not salt: raise RuntimeError("No encryption salt found. Run setup() first.") notifs = get_notifications(config) if not notifs: logger.info("No notifications found or termux-api unavailable.") return 0, 0 logger.info(f"Fetched {len(notifs)} notifications.") key = derive_key_from_passphrase(passphrase, salt) db = NotificationDB(Path(config["db_path"]), key) redaction_mode = config.get("redaction_mode", "truncate") truncate_len = config.get("truncate_len", 80) for notif in notifs: notif.title = redact_text(notif.title, redaction_mode, truncate_len) notif.text = redact_text(notif.text, redaction_mode, truncate_len) notif.priority = compute_priority_score(notif.title, notif.text, config) stored = 0 duplicates = 0 for notif in notifs: record_id = db.insert_notification(notif) if record_id: stored += 1 else: duplicates += 1 logger.info(f"Stored {stored} new, {duplicates} duplicates skipped.") db.cleanup_old(config.get("retention_days", 14)) if loop: poll_interval = interval or config.get("poll_interval_seconds", 60) logger.info(f"Looping every {poll_interval} seconds. Press Ctrl+C to stop.") try: while True: time.sleep(poll_interval) logger.info("Polling again...") poll(config, passphrase, loop=True, interval=poll_interval) except KeyboardInterrupt: logger.info("Stopped.") return stored, duplicates def list_notifications(config: Dict[str, Any], passphrase: str, app: Optional[str] = None, contains: Optional[str] = None, since_hours: Optional[int] = None, limit: int = 50) -> List[Tuple[int, Notification]]: """Decrypt and list notifications.""" if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography module required.") salt = get_salt_from_config(config) if not salt: raise RuntimeError("No encryption salt found. Run setup() first.") key = derive_key_from_passphrase(passphrase, salt) db = NotificationDB(Path(config["db_path"]), key) since_seconds = None if since_hours: since_seconds = since_hours * 3600 return db.get_records( app=app, contains=contains, since_seconds=since_seconds, limit=limit, ) def get_digest(config: Dict[str, Any], passphrase: str, app: Optional[str] = None) -> Dict[str, Any]: """Get local digest: counts per app, recent items, keyword extraction.""" if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography module required.") salt = get_salt_from_config(config) if not salt: raise RuntimeError("No encryption salt found. Run setup() first.") key = derive_key_from_passphrase(passphrase, salt) db = NotificationDB(Path(config["db_path"]), key) counts = db.count_by_package() records = db.get_records(app=app, limit=1000) priority_keywords = set(config.get("keywords_priority", [])) matches = [] for record_id, notif in records: combined = (notif.title + " " + notif.text).lower() for kw in priority_keywords: if kw.lower() in combined: matches.append((notif.package, kw)) break return { "counts": counts, "recent_count": len(records), "recent_items": records[:10], "priority_matches": matches, } def summarize_remote(config: Dict[str, Any], passphrase: str, selected_ids: Optional[List[int]] = None) -> Dict[str, Any]: """Local digest by default. Send redacted data to remote endpoint if configured.""" if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography module required.") salt = get_salt_from_config(config) if not salt: raise RuntimeError("No encryption salt found. Run setup() first.") key = derive_key_from_passphrase(passphrase, salt) db = NotificationDB(Path(config["db_path"]), key) records = db.get_records(limit=200) if not records: return {"error": "No notifications to summarize"} counts = db.count_by_package() summary = { "total": len(records), "by_app": dict(sorted(counts.items(), key=lambda x: -x[1])[:5]), } remote_cfg = config.get("remote", {}) if not remote_cfg.get("endpoint"): return summary endpoint = remote_cfg["endpoint"] api_key = os.getenv(remote_cfg.get("api_key_env", "GROK_API_KEY"), "") if not api_key: return {"error": f"API key not found in env var {remote_cfg.get('api_key_env')}"} if not selected_ids: selected_ids = [rid for rid, _ in records[:20]] payload_items = [] total_chars = 0 max_chars = remote_cfg.get("max_chars", 1200) for rid in selected_ids: notif = db.get_record_by_id(rid) if notif: title_redacted = remote_redact_text(notif.title, 100) text_redacted = remote_redact_text(notif.text, 150) item_str = f"{title_redacted} {text_redacted}" if total_chars + len(item_str) > max_chars: logger.info(f"Payload size limit ({max_chars} chars) reached.") break payload_items.append({ "id": rid, "package": notif.package, "title": title_redacted, "text": text_redacted, }) total_chars += len(item_str) if not payload_items: return {"error": "No items to send after filtering"} payload = { "items": payload_items, "max_chars": max_chars, } logger.info(f"Sending {len(payload_items)} redacted items to {endpoint}...") try: import urllib.request import urllib.error req = urllib.request.Request( endpoint, data=json.dumps(payload).encode(), headers={ "Content-Type": "application/json", "Authorization": f"Bearer {api_key}", }, method="POST", ) timeout = remote_cfg.get("timeout_seconds", 12) with urllib.request.urlopen(req, timeout=timeout) as response: result = json.loads(response.read().decode()) return result except Exception as e: logger.error(f"Remote call failed: {e}") return {"error": str(e)} def export_notifications(config: Dict[str, Any], passphrase: str, export_path: str = "romanai_android_export.enc") -> str: """Export encrypted backup of all notifications.""" if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography module required.") salt = get_salt_from_config(config) if not salt: raise RuntimeError("No encryption salt found. Run setup() first.") key = derive_key_from_passphrase(passphrase, salt) db = NotificationDB(Path(config["db_path"]), key) records = db.get_all_records() if not records: logger.info("No notifications to export.") return "" export_data = { "version": 1, "exported_at": datetime.now().isoformat(), "count": len(records), "salt_b64": config.get("salt_b64", ""), "notifications": [ { "id": rid, "timestamp": notif.timestamp, "package": notif.package, "title": notif.title, "text": notif.text, } for rid, notif in records ], } plaintext = json.dumps(export_data).encode() blob = encrypt_aes_gcm(plaintext, key) with open(export_path, "wb") as f: f.write(blob) logger.info(f"Exported {len(records)} notifications to {export_path}") return export_path def import_notifications(config: Dict[str, Any], passphrase: str, import_path: str) -> int: """Import notifications from encrypted backup.""" if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography module required.") if not Path(import_path).exists(): raise FileNotFoundError(f"Backup file not found: {import_path}") salt = get_salt_from_config(config) if not salt: raise RuntimeError("No encryption salt found. Run setup() first.") key = derive_key_from_passphrase(passphrase, salt) try: with open(import_path, "rb") as f: blob = f.read() plaintext = decrypt_aes_gcm(blob, key) export_data = json.loads(plaintext) except Exception as e: logger.error(f"Failed to decrypt/parse backup: {e}") raise db = NotificationDB(Path(config["db_path"]), key) imported = 0 for item in export_data.get("notifications", []): notif = Notification( timestamp=item["timestamp"], package=item["package"], title=item["title"], text=item["text"], ) record_id = db.insert_notification(notif) if record_id: imported += 1 logger.info(f"Imported {imported} notifications from {import_path}") return imported def purge_notifications(config: Dict[str, Any], passphrase: str) -> None: """Delete all local storage.""" salt = get_salt_from_config(config) if not salt: raise RuntimeError("No encryption salt found. Run setup() first.") key = derive_key_from_passphrase(passphrase, salt) db = NotificationDB(Path(config["db_path"]), key) db.delete_all() logger.info("All notifications purged.")