cortex.test control security
Regression tests for Cortex control-plane hardening.
cortex.auth
Section titled “cortex.auth”Authentication helpers for Cortex gRPC clients and servers.
get_auth_token
Section titled “get_auth_token”def get_auth_token(explicit: str | None = None) -> strReturn the configured Cortex auth token or raise a clear error.
auth_metadata
Section titled “auth_metadata”def auth_metadata(token: str | None = None) -> tuple[tuple[str, str], ...]Build gRPC metadata for authenticated local Cortex clients.
metadata_has_valid_token
Section titled “metadata_has_valid_token”def metadata_has_valid_token(metadata: Iterable[tuple[str, str]] | None, expected_token: str) -> boolValidate bearer or legacy token metadata using constant-time compare.
CortexAuthInterceptor Objects
Section titled “CortexAuthInterceptor Objects”class CortexAuthInterceptor(grpc.ServerInterceptor)Require local gRPC callers to present the configured auth token.
cortex.test_e2e
Section titled “cortex.test_e2e”TELOS End-to-End Test - Full Taint Propagation & Syscall Blocking Demo
This script demonstrates the complete flow:
- Registers itself as an “agent” with Cortex
- Simulates taint detection from a malicious page
- Attempts to spawn a subprocess (which should be BLOCKED by eBPF LSM)
Prerequisites:
- Telos Core running: sudo ./bin/telos_daemon —bpf-obj bin/bpf_lsm.o
- Telos Cortex running: sudo python3 cortex/main.py —debug (Run cortex as root OR fix socket permissions: sudo chmod 666 /var/run/telos.sock)
cortex.main
Section titled “cortex.main”Telos Cortex - The Brain
Central gRPC server that:
- Receives taint reports from Browser Eye (via Native Host)
- Manages the Agent Registry (PID Bridge)
- Pushes taint updates to the eBPF Core via Unix Socket
Usage: python3 cortex/main.py [—port 50051] [—socket /var/run/telos.sock]
RATE_LIMIT_RPS
Section titled “RATE_LIMIT_RPS”Max requests per second per agent PID
RATE_LIMIT_BURST
Section titled “RATE_LIMIT_BURST”Burst capacity per PID
RateLimiter Objects
Section titled “RateLimiter Objects”class RateLimiter()Thread-safe per-PID rate limiter with stale entry cleanup.
def allow(pid: int) -> boolReturn True if the request is allowed, False if rate-limited.
TelosControlService Objects
Section titled “TelosControlService Objects”class TelosControlService(protocol_pb2_grpc.TelosControlServicer)gRPC Service implementing the TelosControl protocol.
ReportTaint
Section titled “ReportTaint”def ReportTaint(request: protocol_pb2.TaintReport, context: grpc.ServicerContext) -> protocol_pb2.AckHandle taint reports from Browser Eye.
Flow:
- Update internal state in Guardian
- Resolve which Agent PID is affected (PID Bridge)
- Push taint level to eBPF Core via Unix Socket
DeclareIntent
Section titled “DeclareIntent”def DeclareIntent(request: protocol_pb2.IntentRequest, context: grpc.ServicerContext) -> protocol_pb2.IntentVerdictHandle intent declarations from Agents.
For Phase 2, this is a stub. Full implementation in Phase 3. Currently: Allow all intents, log for audit.
GetPolicy
Section titled “GetPolicy”def GetPolicy(request: protocol_pb2.PolicyQuery, context: grpc.ServicerContext) -> protocol_pb2.PolicyRulesReturn current policy rules for a given PID. Used by daemons to sync state. Also registers the agent for tracking.
CortexServer Objects
Section titled “CortexServer Objects”class CortexServer()Manages the gRPC server lifecycle and IPC connections.
def start()Start the Cortex server.
wait_for_termination
Section titled “wait_for_termination”def wait_for_termination()Block until shutdown signal received.
def stop()Gracefully stop the server.
signal_handler
Section titled “signal_handler”def signal_handler(signum, frame)Handle termination signals.
sync_filesystem_policy
Section titled “sync_filesystem_policy”def sync_filesystem_policy(config: dict, ipc: CoreIPCClient)Resolve configured sensitive paths to inodes and push to Core.
sync_network_policy
Section titled “sync_network_policy”def sync_network_policy(config: dict, ipc: CoreIPCClient)Push allowed network destinations (IPs) to Core.
cortex.exec_intel
Section titled “cortex.exec_intel”Telos Execution Intelligence Engine
Deterministic classification of execution actions against intent. Mirrors the Domain Intelligence Engine architecture.
Pipeline: L1 — Safe Binary (O(1) lookup) L2 — LOLBin Detection (known-dangerous binaries) L3 — Path Traversal / Sensitive Target Check L4 — Intent-Binary Mapping (does the binary match the goal?) → Combined score → ALLOW / DENY / ESCALATE
W_LOLBIN
Section titled “W_LOLBIN”Living Off the Land Binary
W_PATH_TRAVERSAL
Section titled “W_PATH_TRAVERSAL”Path traversal attempt
W_SENSITIVE_TARGET
Section titled “W_SENSITIVE_TARGET”Accessing sensitive files
W_INTENT_MISMATCH
Section titled “W_INTENT_MISMATCH”Binary doesn’t match intent
W_SAFE_BINARY
Section titled “W_SAFE_BINARY”Trusted read-only tool
ExecIntel Objects
Section titled “ExecIntel Objects”class ExecIntel()Deterministic execution action classifier.
classify
Section titled “classify”def classify(binary: str, args: List[str], intent: str) -> Tuple[str, int, str, List[str]]Classify an execution action against an intent.
Arguments:
binary- the binary name (e.g., ‘bash’, ‘curl’, ‘cat’)args- command argumentsintent- natural language goal
Returns:
(decision, score, reason, allowed_bins)
cortex.mirage_manager
Section titled “cortex.mirage_manager”Telos Mirage Manager Orchestrates active deception by binding honey-tokens to physical inodes.
MirageManager Objects
Section titled “MirageManager Objects”class MirageManager()arm_traps
Section titled “arm_traps”def arm_traps()Resolve inodes and push traps to the kernel.
cortex.verifier
Section titled “cortex.verifier”Telos Intent Verifier — Dual-Gate Data Plane Router
Architecture: Data Plane (synchronous, deterministic): 1. Taint check 2. Domain extraction + Exec action extraction 3. Domain Intelligence classification (L0-L4) → Network Gate 4. Execution Intelligence classification (L1-L4) → Exec Gate 5. Score-based decision: ALLOW / DENY / ESCALATE
Control Plane (async, rare): 6. LLM evaluation (only for ESCALATE decisions) 7. Persist LLM verdict → DB (self-learning)The LLM is NEVER in the hot path. Expected LLM call rate: <5% of queries.
IntentVerifier Objects
Section titled “IntentVerifier Objects”class IntentVerifier()verify
Section titled “verify”def verify( pid: int, goal: str, actions: list, exec_actions: list = None) -> Tuple[bool, str, int, List[str], List[str]]Verify intent. Returns (allowed, reason, ttl_ms, domains, allowed_bins).
cortex.test_dns_ttl
Section titled “cortex.test_dns_ttl”Test: DNS Proxy Drawbridge TTL Cleanup Verifies that firewall rules opened by the DNS proxy are removed after TTL expires.
test_cleanup_timer_schedules_removal
Section titled “test_cleanup_timer_schedules_removal”def test_cleanup_timer_schedules_removal()After add_network_rule, remove_network_rule should fire after TTL.
test_stop_cancels_pending_timers
Section titled “test_stop_cancels_pending_timers”def test_stop_cancels_pending_timers()Calling stop() should cancel all pending cleanup timers.
test_multiple_ips_independent_ttl
Section titled “test_multiple_ips_independent_ttl”def test_multiple_ips_independent_ttl()Each IP gets its own independent TTL timer.
cortex.llm_verifier
Section titled “cortex.llm_verifier”Telos Cognitive Intent Verifier (Phase 4)
Uses TinyLlama-1.1B with logprobs-based classification. Instead of trusting greedy text output, we compare the model’s token-level confidence in RELEVANT vs IRRELEVANT.
This is the architecturally correct approach for small LLMs: they may not generate the right token, but their internal probabilities often reflect the correct answer.
cortex.test_intent_redeclare_taint
Section titled “cortex.test_intent_redeclare_taint”Regression tests for issue 27 (intent replay via taint not re-checked).
cortex.domain_data
Section titled “cortex.domain_data”Telos Domain Intelligence — Seed Database
Curated domain → category → trust mappings for the deterministic classification engine. This is the “threat intelligence” layer.
Categories: docs — Documentation sites code — Developer tools, repositories search — Search engines news — News outlets package — Package registries academic — Research / academic social — Social media filehost — File hosting / sharing exfil — Known data exfiltration targets gaming — Gaming platforms streaming — Streaming / media cloud — Cloud infrastructure mail — Email providers finance — Financial services cdn — Content delivery networks ads — Advertising / tracking vpn_proxy — VPN / proxy / anonymizer paste — Paste / snippet services (exfil risk) unknown — Uncategorized
Trust levels: 100 — Core infrastructure (loopback, DNS) 80 — Highly trusted (major platforms) 60 — Trusted (well-known services) 40 — Neutral (known but not inherently safe) 20 — Suspicious (common abuse vector) 0 — Blocked (known malicious pattern)
cortex.unix_socket
Section titled “cortex.unix_socket”Telos Cortex - Unix Socket IPC Client
Communicates with the Telos Core (Go eBPF Loader) via Unix Domain Socket.
Protocol: - JSON messages terminated by newline - Commands: UPDATE_TAINT, CLEAR_TAINT, GET_STATE - Responses: {success: bool, error?: string, data?: object}
CoreIPCClient Objects
Section titled “CoreIPCClient Objects”class CoreIPCClient()IPC Client to communicate with Telos Core (eBPF Loader).
The Core listens on a Unix socket and accepts JSON commands to update BPF maps.
connect
Section titled “connect”def connect() -> boolEstablish connection to the Core daemon.
Returns True if connected, False otherwise. The client can operate without connection (standalone mode).
def close() -> NoneClose the socket connection.
send_update_taint
Section titled “send_update_taint”def send_update_taint(pid: int, taint_level: int) -> boolUpdate taint level for a process in the BPF map.
Arguments:
pid- Process ID to updatetaint_level- New taint level (0-4)
Returns:
True if Core acknowledged the update
send_clear_taint
Section titled “send_clear_taint”def send_clear_taint(pid: int) -> boolClear taint for a process (remove from BPF map).
Arguments:
pid- Process ID to clear
Returns:
True if Core acknowledged
update_inode
Section titled “update_inode”def update_inode(inode: int, sensitivity: int) -> boolUpdate sensitivity for an inode.
update_network
Section titled “update_network”def update_network(ip: int, allowed: int) -> boolUpdate network allowlist for an IP.
add_network_rule
Section titled “add_network_rule”def add_network_rule(ip: int) -> boolAllow traffic to a specific IP.
delete_network
Section titled “delete_network”def delete_network(ip: int) -> boolRemove an IP from the allowlist map entirely. This frees up space in the BPF map.
remove_network_rule
Section titled “remove_network_rule”def remove_network_rule(ip: int) -> boolBlock traffic to a specific IP by removing the rule.
send_register_agent
Section titled “send_register_agent”def send_register_agent(pid: int, comm: str = "") -> boolRegister an agent process in the BPF map (for tracking).
Arguments:
pid- Agent process IDcomm- Process command name (e.g., “python3”)
Returns:
True if Core acknowledged
get_state
Section titled “get_state”def get_state() -> Optional[Dict[str, Any]]Get current state from Core (for debugging).
Returns:
State dict or None
get_pid_taint_level
Section titled “get_pid_taint_level”def get_pid_taint_level(pid: int) -> Optional[int]Return the current kernel taint level for pid when Core is reachable.
clear_network_rule
Section titled “clear_network_rule”def clear_network_rule(ip_int: int) -> boolHelper to clear an IP from network map.
ping_core
Section titled “ping_core”def ping_core() -> bool[Phase 11: Heartbeat Mechanism] Send a heartbeat pulse to the Go Daemon. If this stops, the Daemon executes emergency Fail-Open or Fail-Closed.
send_update_exec
Section titled “send_update_exec”def send_update_exec(pid: int, allowed_bins: list, mode: int = 1) -> boolPush execution policy to BPF exec_policy_map.
Arguments:
pid- Agent process IDallowed_bins- List of allowed binary names (max 8, 16 chars each)mode- 0 = unrestricted, 1 = enforce allowlist
Returns:
True if Core acknowledged
send_clear_exec
Section titled “send_clear_exec”def send_clear_exec(pid: int) -> boolRemove execution policy for a PID.
Arguments:
pid- Agent process ID
Returns:
True if Core acknowledged
add_mirage_trap
Section titled “add_mirage_trap”def add_mirage_trap(inode: int, honey_id: int, payload: str) -> boolInject a honey-token trap into the kernel maps.
Arguments:
inode- The real inode of the target file.honey_id- A unique ID for this payload.payload- The fake data to return (max 256 bytes per our C struct).
cortex.test_guardian_thread_safety
Section titled “cortex.test_guardian_thread_safety”Tests for Guardian thread safety under concurrent access.
Uses multiple threads to simulate concurrent agent registration, taint updates, and session binding — verifying no state corruption.
GuardianThreadSafetyTests Objects
Section titled “GuardianThreadSafetyTests Objects”class GuardianThreadSafetyTests(unittest.TestCase)Thread-safety tests for the Guardian class.
test_concurrent_agent_registration
Section titled “test_concurrent_agent_registration”def test_concurrent_agent_registration()Multiple threads registering agents simultaneously should not corrupt state.
test_concurrent_taint_updates
Section titled “test_concurrent_taint_updates”def test_concurrent_taint_updates()Concurrent taint updates should not lose or corrupt taint levels.
test_concurrent_session_binding
Section titled “test_concurrent_session_binding”def test_concurrent_session_binding()Concurrent session registrations should not corrupt session_map.
test_concurrent_register_and_taint
Section titled “test_concurrent_register_and_taint”def test_concurrent_register_and_taint()Registration and taint updates interleaved should not cause corruption.
test_concurrent_unregister
Section titled “test_concurrent_unregister”def test_concurrent_unregister()Concurrent unregister should not corrupt remaining state.
test_get_state_summary_thread_safe
Section titled “test_get_state_summary_thread_safe”def test_get_state_summary_thread_safe()get_state_summary should not crash under concurrent mutation.
cortex.config
Section titled “cortex.config”FilesystemPolicy Objects
Section titled “FilesystemPolicy Objects”class FilesystemPolicy(BaseModel)sensitive_paths
Section titled “sensitive_paths”Required to have at least one
load_policy
Section titled “load_policy”def load_policy(policy_path: str) -> PolicyConfigLoad and validate policy.yaml.
load_config
Section titled “load_config”def load_config(args: Any) -> tuple[CortexSettings, PolicyConfig]Production-grade configuration loader with fail-fast validation.
cortex.test_taint
Section titled “cortex.test_taint”Quick test script to simulate taint injection to Cortex. Usage: python3 test_taint.py
cortex.guardian
Section titled “cortex.guardian”Telos Guardian - Intent Verification Engine
Manages:
- Agent Registry (PID tracking)
- Taint State (per-source and per-agent)
- Policy Decisions
- PID Bridge (mapping browser views to agents)
TaintRecord Objects
Section titled “TaintRecord Objects”@dataclassclass TaintRecord()Record of taint for a specific source/view.
AgentInfo Objects
Section titled “AgentInfo Objects”@dataclassclass AgentInfo()Information about a registered agent.
active_views
Section titled “active_views”source_ids being viewed
Guardian Objects
Section titled “Guardian Objects”class Guardian()The Guardian manages security state and policy decisions.
PID Bridge Logic:
- Agents register their PID when starting
- When a browser view reports taint, we map it to the active agent
- For Phase 2: Simple model assumes single active agent
- Future: Session ID mapping, multiple agents
get_session_pid
Section titled “get_session_pid”def get_session_pid(session_id: str) -> Optional[int]Return the PID bound to a session ID, or None.
register_session
Section titled “register_session”def register_session(session_id: str, pid: int) -> boolRegister a session ID for an agent.
register_agent
Section titled “register_agent”def register_agent(pid: int) -> boolRegister an agent process.
In Phase 2, the most recently registered agent is considered “active” and will receive taint from browser views.
unregister_agent
Section titled “unregister_agent”def unregister_agent(pid: int, *, cleanup_taint_records: bool = True) -> boolUnregister an agent when it exits.
Performs full cleanup of agent state:
- Collects all views mapped to this PID (from active_views + view_agent_map)
- Optionally removes taint_records for those views
- Removes the agent entry (drops active_views from memory)
- Removes session mappings for this PID
- Updates active_agent_pid if this was the active agent
- Removes view->agent mappings for this PID
- Removes core_taint entry for this PID
Arguments:
pid- Agent PID to unregister.cleanup_taint_records- If True (default), also remove taint_records for views that were assigned to this agent.
update_taint
Section titled “update_taint”def update_taint(source_id: str, level: int, url: str = "", session_id: str = "") -> NoneUpdate taint record for a browser view/source.
Arguments:
source_id- Browser tab/view identifierlevel- TaintLevel enum value (0-4)url- URL where taint was detectedsession_id- Optional session ID from browser
get_taint_level
Section titled “get_taint_level”def get_taint_level(pid: int) -> intGet current taint level for an agent PID.
update_core_taint
Section titled “update_core_taint”def update_core_taint(pid: int, taint_level: int) -> NoneRecord the latest kernel taint snapshot for a PID.
clear_taint
Section titled “clear_taint”def clear_taint(pid: int) -> NoneReset taint level for an agent (after cooldown/verification).
get_agent_pid_for_view
Section titled “get_agent_pid_for_view”def get_agent_pid_for_view(source_id: str, session_id: str = "") -> Optional[int]Resolve which agent PID should receive taint from a browser view.
Strategy:
- Check explicit session_id mapping (Phase 3)
- Check explicit source_id mapping
- Fall back to active agent (Phase 2 legacy)
map_view_to_agent
Section titled “map_view_to_agent”def map_view_to_agent(source_id: str, pid: int) -> boolExplicitly map a browser view to an agent.
get_policy
Section titled “get_policy”def get_policy() -> dictReturn current policy configuration.
should_block_exec
Section titled “should_block_exec”def should_block_exec(pid: int) -> boolDetermine if a process should be blocked from executing commands.
Returns True if taint level exceeds policy threshold.
get_state_summary
Section titled “get_state_summary”def get_state_summary() -> dictGet a summary of current guardian state for debugging.
get_agent_pids
Section titled “get_agent_pids”def get_agent_pids()Return a snapshot list of registered agent PIDs.
get_agent_count
Section titled “get_agent_count”def get_agent_count() -> intReturn the number of registered agents.
has_agent
Section titled “has_agent”def has_agent(pid: int) -> boolCheck if an agent PID is registered (thread-safe).
get_active_agent_pid
Section titled “get_active_agent_pid”def get_active_agent_pid()Return the active agent PID, or None (thread-safe).
get_taint_record_count
Section titled “get_taint_record_count”def get_taint_record_count() -> intReturn the number of taint records (thread-safe).
get_session_count
Section titled “get_session_count”def get_session_count() -> intReturn the number of registered session mappings (thread-safe).
get_session_map_snapshot
Section titled “get_session_map_snapshot”def get_session_map_snapshot()Return a thread-safe shallow copy of the session map.
cortex.dns_proxy
Section titled “cortex.dns_proxy”Telos Intent-Based DNS Proxy Intercepts agent DNS queries, verifies intent, and dynamically opens the kernel firewall.
TelosDNSProxy Objects
Section titled “TelosDNSProxy Objects”class TelosDNSProxy()allow_domain
Section titled “allow_domain”def allow_domain(domain: str, ttl_ms: int)Manually pre-authorize a domain in the proxy (used by explicit DeclareIntent RPCs). In a full implementation, this populates an LRU cache or Redis.
cortex.domain_intel
Section titled “cortex.domain_intel”Telos Domain Intelligence Engine
Deterministic, O(1) domain classification for the data plane. SQLite backend (persistent with WAL, fallback to in-memory).
Pipeline: L0 — Root domain extraction (subdomain/combo-squat defense) L1 — Exact match (SQLite hash index) L2 — Typosquat detection (Levenshtein + homoglyph + embedded brand) L3 — Category mismatch scoring (intent → allowed categories) L4 — Reputation + risk flag scoring → Combined score → ALLOW / DENY / ESCALATE
The LLM is NEVER called from this engine. LLM escalation is handled by the caller (verifier.py).
BLOCK_THRESHOLD
Section titled “BLOCK_THRESHOLD”score >= this → DENY
REVIEW_THRESHOLD
Section titled “REVIEW_THRESHOLD”score >= this → ESCALATE to LLM
W_CATEGORY_MISMATCH
Section titled “W_CATEGORY_MISMATCH”Domain category doesn’t match intent
W_LOW_TRUST
Section titled “W_LOW_TRUST”Trust level below threshold
W_EXFIL_FLAG
Section titled “W_EXFIL_FLAG”Known exfil/paste/abuse category
W_TYPOSQUAT
Section titled “W_TYPOSQUAT”Looks like a typosquat of a known domain
W_EMBEDDED_BRAND
Section titled “W_EMBEDDED_BRAND”Known brand embedded in unknown domain (combo-squat)
W_UNKNOWN_DOMAIN
Section titled “W_UNKNOWN_DOMAIN”Domain not in database at all
W_HIGH_TRUST_BONUS
Section titled “W_HIGH_TRUST_BONUS”Trusted domain gets a negative (good) score
TRUST_THRESHOLD
Section titled “TRUST_THRESHOLD”Below this, domain is considered low-trust
DomainIntel Objects
Section titled “DomainIntel Objects”class DomainIntel()Deterministic domain classification engine. All lookups are O(1) via SQLite indexed hash + bounded string ops. Persistent SQLite with WAL for concurrent read/write safety.
classify
Section titled “classify”def classify(domain: str, intent: str) -> Tuple[str, int, str]Classify a domain against an intent. Logs the decision and uses an LRU cache for high performance.
persist_verdict
Section titled “persist_verdict”def persist_verdict(domain: str, category: str, trust: int)Persist an LLM verdict into the DB for future O(1) lookups. This is the self-learning mechanism.
get_stats
Section titled “get_stats”def get_stats() -> DictReturn DB statistics.