Skip to content

cortex.test control security

Regression tests for Cortex control-plane hardening.

Authentication helpers for Cortex gRPC clients and servers.

def get_auth_token(explicit: str | None = None) -> str

Return the configured Cortex auth token or raise a clear error.

def auth_metadata(token: str | None = None) -> tuple[tuple[str, str], ...]

Build gRPC metadata for authenticated local Cortex clients.

def metadata_has_valid_token(metadata: Iterable[tuple[str, str]] | None,
expected_token: str) -> bool

Validate bearer or legacy token metadata using constant-time compare.

class CortexAuthInterceptor(grpc.ServerInterceptor)

Require local gRPC callers to present the configured auth token.

TELOS End-to-End Test - Full Taint Propagation & Syscall Blocking Demo

This script demonstrates the complete flow:

  1. Registers itself as an “agent” with Cortex
  2. Simulates taint detection from a malicious page
  3. Attempts to spawn a subprocess (which should be BLOCKED by eBPF LSM)

Prerequisites:

  • Telos Core running: sudo ./bin/telos_daemon —bpf-obj bin/bpf_lsm.o
  • Telos Cortex running: sudo python3 cortex/main.py —debug (Run cortex as root OR fix socket permissions: sudo chmod 666 /var/run/telos.sock)

Telos Cortex - The Brain

Central gRPC server that:

  1. Receives taint reports from Browser Eye (via Native Host)
  2. Manages the Agent Registry (PID Bridge)
  3. Pushes taint updates to the eBPF Core via Unix Socket

Usage: python3 cortex/main.py [—port 50051] [—socket /var/run/telos.sock]

Max requests per second per agent PID

Burst capacity per PID

class RateLimiter()

Thread-safe per-PID rate limiter with stale entry cleanup.

def allow(pid: int) -> bool

Return True if the request is allowed, False if rate-limited.

class TelosControlService(protocol_pb2_grpc.TelosControlServicer)

gRPC Service implementing the TelosControl protocol.

def ReportTaint(request: protocol_pb2.TaintReport,
context: grpc.ServicerContext) -> protocol_pb2.Ack

Handle taint reports from Browser Eye.

Flow:

  1. Update internal state in Guardian
  2. Resolve which Agent PID is affected (PID Bridge)
  3. Push taint level to eBPF Core via Unix Socket

def DeclareIntent(request: protocol_pb2.IntentRequest,
context: grpc.ServicerContext) -> protocol_pb2.IntentVerdict

Handle intent declarations from Agents.

For Phase 2, this is a stub. Full implementation in Phase 3. Currently: Allow all intents, log for audit.

def GetPolicy(request: protocol_pb2.PolicyQuery,
context: grpc.ServicerContext) -> protocol_pb2.PolicyRules

Return current policy rules for a given PID. Used by daemons to sync state. Also registers the agent for tracking.

class CortexServer()

Manages the gRPC server lifecycle and IPC connections.

def start()

Start the Cortex server.

def wait_for_termination()

Block until shutdown signal received.

def stop()

Gracefully stop the server.

def signal_handler(signum, frame)

Handle termination signals.

def sync_filesystem_policy(config: dict, ipc: CoreIPCClient)

Resolve configured sensitive paths to inodes and push to Core.

def sync_network_policy(config: dict, ipc: CoreIPCClient)

Push allowed network destinations (IPs) to Core.

Telos Execution Intelligence Engine

Deterministic classification of execution actions against intent. Mirrors the Domain Intelligence Engine architecture.

Pipeline: L1 — Safe Binary (O(1) lookup) L2 — LOLBin Detection (known-dangerous binaries) L3 — Path Traversal / Sensitive Target Check L4 — Intent-Binary Mapping (does the binary match the goal?) → Combined score → ALLOW / DENY / ESCALATE

Living Off the Land Binary

Path traversal attempt

Accessing sensitive files

Binary doesn’t match intent

Trusted read-only tool

class ExecIntel()

Deterministic execution action classifier.

def classify(binary: str, args: List[str],
intent: str) -> Tuple[str, int, str, List[str]]

Classify an execution action against an intent.

Arguments:

  • binary - the binary name (e.g., ‘bash’, ‘curl’, ‘cat’)
  • args - command arguments
  • intent - natural language goal

Returns:

(decision, score, reason, allowed_bins)

Telos Mirage Manager Orchestrates active deception by binding honey-tokens to physical inodes.

class MirageManager()

def arm_traps()

Resolve inodes and push traps to the kernel.

Telos Intent Verifier — Dual-Gate Data Plane Router

Architecture: Data Plane (synchronous, deterministic): 1. Taint check 2. Domain extraction + Exec action extraction 3. Domain Intelligence classification (L0-L4) → Network Gate 4. Execution Intelligence classification (L1-L4) → Exec Gate 5. Score-based decision: ALLOW / DENY / ESCALATE

Control Plane (async, rare):
6. LLM evaluation (only for ESCALATE decisions)
7. Persist LLM verdict → DB (self-learning)

The LLM is NEVER in the hot path. Expected LLM call rate: <5% of queries.

class IntentVerifier()

def verify(
pid: int,
goal: str,
actions: list,
exec_actions: list = None
) -> Tuple[bool, str, int, List[str], List[str]]

Verify intent. Returns (allowed, reason, ttl_ms, domains, allowed_bins).

Test: DNS Proxy Drawbridge TTL Cleanup Verifies that firewall rules opened by the DNS proxy are removed after TTL expires.

def test_cleanup_timer_schedules_removal()

After add_network_rule, remove_network_rule should fire after TTL.

def test_stop_cancels_pending_timers()

Calling stop() should cancel all pending cleanup timers.

def test_multiple_ips_independent_ttl()

Each IP gets its own independent TTL timer.

Telos Cognitive Intent Verifier (Phase 4)

Uses TinyLlama-1.1B with logprobs-based classification. Instead of trusting greedy text output, we compare the model’s token-level confidence in RELEVANT vs IRRELEVANT.

This is the architecturally correct approach for small LLMs: they may not generate the right token, but their internal probabilities often reflect the correct answer.

Regression tests for issue 27 (intent replay via taint not re-checked).

Telos Domain Intelligence — Seed Database

Curated domain → category → trust mappings for the deterministic classification engine. This is the “threat intelligence” layer.

Categories: docs — Documentation sites code — Developer tools, repositories search — Search engines news — News outlets package — Package registries academic — Research / academic social — Social media filehost — File hosting / sharing exfil — Known data exfiltration targets gaming — Gaming platforms streaming — Streaming / media cloud — Cloud infrastructure mail — Email providers finance — Financial services cdn — Content delivery networks ads — Advertising / tracking vpn_proxy — VPN / proxy / anonymizer paste — Paste / snippet services (exfil risk) unknown — Uncategorized

Trust levels: 100 — Core infrastructure (loopback, DNS) 80 — Highly trusted (major platforms) 60 — Trusted (well-known services) 40 — Neutral (known but not inherently safe) 20 — Suspicious (common abuse vector) 0 — Blocked (known malicious pattern)

Telos Cortex - Unix Socket IPC Client

Communicates with the Telos Core (Go eBPF Loader) via Unix Domain Socket.

Protocol: - JSON messages terminated by newline - Commands: UPDATE_TAINT, CLEAR_TAINT, GET_STATE - Responses: {success: bool, error?: string, data?: object}

class CoreIPCClient()

IPC Client to communicate with Telos Core (eBPF Loader).

The Core listens on a Unix socket and accepts JSON commands to update BPF maps.

def connect() -> bool

Establish connection to the Core daemon.

Returns True if connected, False otherwise. The client can operate without connection (standalone mode).

def close() -> None

Close the socket connection.

def send_update_taint(pid: int, taint_level: int) -> bool

Update taint level for a process in the BPF map.

Arguments:

  • pid - Process ID to update
  • taint_level - New taint level (0-4)

Returns:

True if Core acknowledged the update

def send_clear_taint(pid: int) -> bool

Clear taint for a process (remove from BPF map).

Arguments:

  • pid - Process ID to clear

Returns:

True if Core acknowledged

def update_inode(inode: int, sensitivity: int) -> bool

Update sensitivity for an inode.

def update_network(ip: int, allowed: int) -> bool

Update network allowlist for an IP.

def add_network_rule(ip: int) -> bool

Allow traffic to a specific IP.

def delete_network(ip: int) -> bool

Remove an IP from the allowlist map entirely. This frees up space in the BPF map.

def remove_network_rule(ip: int) -> bool

Block traffic to a specific IP by removing the rule.

def send_register_agent(pid: int, comm: str = "") -> bool

Register an agent process in the BPF map (for tracking).

Arguments:

  • pid - Agent process ID
  • comm - Process command name (e.g., “python3”)

Returns:

True if Core acknowledged

def get_state() -> Optional[Dict[str, Any]]

Get current state from Core (for debugging).

Returns:

State dict or None

def get_pid_taint_level(pid: int) -> Optional[int]

Return the current kernel taint level for pid when Core is reachable.

def clear_network_rule(ip_int: int) -> bool

Helper to clear an IP from network map.

def ping_core() -> bool

[Phase 11: Heartbeat Mechanism] Send a heartbeat pulse to the Go Daemon. If this stops, the Daemon executes emergency Fail-Open or Fail-Closed.

def send_update_exec(pid: int, allowed_bins: list, mode: int = 1) -> bool

Push execution policy to BPF exec_policy_map.

Arguments:

  • pid - Agent process ID
  • allowed_bins - List of allowed binary names (max 8, 16 chars each)
  • mode - 0 = unrestricted, 1 = enforce allowlist

Returns:

True if Core acknowledged

def send_clear_exec(pid: int) -> bool

Remove execution policy for a PID.

Arguments:

  • pid - Agent process ID

Returns:

True if Core acknowledged

def add_mirage_trap(inode: int, honey_id: int, payload: str) -> bool

Inject a honey-token trap into the kernel maps.

Arguments:

  • inode - The real inode of the target file.
  • honey_id - A unique ID for this payload.
  • payload - The fake data to return (max 256 bytes per our C struct).

Tests for Guardian thread safety under concurrent access.

Uses multiple threads to simulate concurrent agent registration, taint updates, and session binding — verifying no state corruption.

class GuardianThreadSafetyTests(unittest.TestCase)

Thread-safety tests for the Guardian class.

def test_concurrent_agent_registration()

Multiple threads registering agents simultaneously should not corrupt state.

def test_concurrent_taint_updates()

Concurrent taint updates should not lose or corrupt taint levels.

def test_concurrent_session_binding()

Concurrent session registrations should not corrupt session_map.

def test_concurrent_register_and_taint()

Registration and taint updates interleaved should not cause corruption.

def test_concurrent_unregister()

Concurrent unregister should not corrupt remaining state.

def test_get_state_summary_thread_safe()

get_state_summary should not crash under concurrent mutation.

class FilesystemPolicy(BaseModel)

Required to have at least one

def load_policy(policy_path: str) -> PolicyConfig

Load and validate policy.yaml.

def load_config(args: Any) -> tuple[CortexSettings, PolicyConfig]

Production-grade configuration loader with fail-fast validation.

Quick test script to simulate taint injection to Cortex. Usage: python3 test_taint.py

Telos Guardian - Intent Verification Engine

Manages:

  1. Agent Registry (PID tracking)
  2. Taint State (per-source and per-agent)
  3. Policy Decisions
  4. PID Bridge (mapping browser views to agents)

@dataclass
class TaintRecord()

Record of taint for a specific source/view.

@dataclass
class AgentInfo()

Information about a registered agent.

source_ids being viewed

class Guardian()

The Guardian manages security state and policy decisions.

PID Bridge Logic:

  • Agents register their PID when starting
  • When a browser view reports taint, we map it to the active agent
  • For Phase 2: Simple model assumes single active agent
  • Future: Session ID mapping, multiple agents

def get_session_pid(session_id: str) -> Optional[int]

Return the PID bound to a session ID, or None.

def register_session(session_id: str, pid: int) -> bool

Register a session ID for an agent.

def register_agent(pid: int) -> bool

Register an agent process.

In Phase 2, the most recently registered agent is considered “active” and will receive taint from browser views.

def unregister_agent(pid: int, *, cleanup_taint_records: bool = True) -> bool

Unregister an agent when it exits.

Performs full cleanup of agent state:

  1. Collects all views mapped to this PID (from active_views + view_agent_map)
  2. Optionally removes taint_records for those views
  3. Removes the agent entry (drops active_views from memory)
  4. Removes session mappings for this PID
  5. Updates active_agent_pid if this was the active agent
  6. Removes view->agent mappings for this PID
  7. Removes core_taint entry for this PID

Arguments:

  • pid - Agent PID to unregister.
  • cleanup_taint_records - If True (default), also remove taint_records for views that were assigned to this agent.

def update_taint(source_id: str,
level: int,
url: str = "",
session_id: str = "") -> None

Update taint record for a browser view/source.

Arguments:

  • source_id - Browser tab/view identifier
  • level - TaintLevel enum value (0-4)
  • url - URL where taint was detected
  • session_id - Optional session ID from browser

def get_taint_level(pid: int) -> int

Get current taint level for an agent PID.

def update_core_taint(pid: int, taint_level: int) -> None

Record the latest kernel taint snapshot for a PID.

def clear_taint(pid: int) -> None

Reset taint level for an agent (after cooldown/verification).

def get_agent_pid_for_view(source_id: str,
session_id: str = "") -> Optional[int]

Resolve which agent PID should receive taint from a browser view.

Strategy:

  1. Check explicit session_id mapping (Phase 3)
  2. Check explicit source_id mapping
  3. Fall back to active agent (Phase 2 legacy)

def map_view_to_agent(source_id: str, pid: int) -> bool

Explicitly map a browser view to an agent.

def get_policy() -> dict

Return current policy configuration.

def should_block_exec(pid: int) -> bool

Determine if a process should be blocked from executing commands.

Returns True if taint level exceeds policy threshold.

def get_state_summary() -> dict

Get a summary of current guardian state for debugging.

def get_agent_pids()

Return a snapshot list of registered agent PIDs.

def get_agent_count() -> int

Return the number of registered agents.

def has_agent(pid: int) -> bool

Check if an agent PID is registered (thread-safe).

def get_active_agent_pid()

Return the active agent PID, or None (thread-safe).

def get_taint_record_count() -> int

Return the number of taint records (thread-safe).

def get_session_count() -> int

Return the number of registered session mappings (thread-safe).

def get_session_map_snapshot()

Return a thread-safe shallow copy of the session map.

Telos Intent-Based DNS Proxy Intercepts agent DNS queries, verifies intent, and dynamically opens the kernel firewall.

class TelosDNSProxy()

def allow_domain(domain: str, ttl_ms: int)

Manually pre-authorize a domain in the proxy (used by explicit DeclareIntent RPCs). In a full implementation, this populates an LRU cache or Redis.

Telos Domain Intelligence Engine

Deterministic, O(1) domain classification for the data plane. SQLite backend (persistent with WAL, fallback to in-memory).

Pipeline: L0 — Root domain extraction (subdomain/combo-squat defense) L1 — Exact match (SQLite hash index) L2 — Typosquat detection (Levenshtein + homoglyph + embedded brand) L3 — Category mismatch scoring (intent → allowed categories) L4 — Reputation + risk flag scoring → Combined score → ALLOW / DENY / ESCALATE

The LLM is NEVER called from this engine. LLM escalation is handled by the caller (verifier.py).

score >= this → DENY

score >= this → ESCALATE to LLM

Domain category doesn’t match intent

Trust level below threshold

Known exfil/paste/abuse category

Looks like a typosquat of a known domain

Known brand embedded in unknown domain (combo-squat)

Domain not in database at all

Trusted domain gets a negative (good) score

Below this, domain is considered low-trust

class DomainIntel()

Deterministic domain classification engine. All lookups are O(1) via SQLite indexed hash + bounded string ops. Persistent SQLite with WAL for concurrent read/write safety.

def classify(domain: str, intent: str) -> Tuple[str, int, str]

Classify a domain against an intent. Logs the decision and uses an LRU cache for high performance.

def persist_verdict(domain: str, category: str, trust: int)

Persist an LLM verdict into the DB for future O(1) lookups. This is the self-learning mechanism.

def get_stats() -> Dict

Return DB statistics.