Architecture¶
lumen-argus is a transparent HTTP proxy that intercepts requests from AI coding tools (Claude Code, Copilot, Cursor) before they reach provider APIs. It scans outbound request bodies for secrets, PII, and proprietary data, then takes a configurable action: log, alert, redact, or block.
Pipeline overview¶
The system processes each request through a four-stage pipeline:
Client Request
|
v
+-----------------+ +-----------------+ +-----------------+ +-----------------+
| 1. Proxy Server | --> | 2. Scanner | --> | 3. Policy | --> | 4. Audit |
| | | Pipeline | | Engine | | Logger |
+-----------------+ +-----------------+ +-----------------+ +-----------------+
| | |
v v v
Forward or Block Extract -> Detect Resolve action
(block > redact
> alert > log)
Stage 1: Proxy Server¶
Module: lumen_argus/async_proxy.py
The proxy uses aiohttp.web with non-blocking I/O. Each request is handled by a coroutine, and CPU-bound scanning runs in a thread pool via asyncio.to_thread().
| Property | Detail |
|---|---|
| Bind address | 127.0.0.1 by default. Use --host 0.0.0.0 for Docker (logs a warning for non-loopback). |
| Protocol | Plain HTTP in, HTTPS out |
| Concurrency | Coroutine per request (non-blocking I/O) |
| SSE streaming | Async iteration via StreamResponse |
| Connection pooling | aiohttp.ClientSession with TCPConnector(limit=max_connections) |
| Scanning | Thread pool via asyncio.to_thread() (~30ms, doesn't block event loop) |
| Signal handling | loop.add_signal_handler() for SIGINT, SIGTERM, SIGHUP |
| Retry | Connection errors retried up to proxy.retries times |
| WebSocket | Native upgrade on same port (/ws?url=ws://target) — no separate port needed |
| Thread safety | Free-threaded Python 3.13+ safe — ScannerPipeline.scan() snapshots shared references under _reload_lock, _active_requests uses lock |
Backpressure
The proxy limits concurrent upstream connections via aiohttp.TCPConnector(limit=max_connections).
Stage 2: Scanner Pipeline¶
Module: lumen_argus/pipeline.py
The pipeline orchestrates extraction, encoding decode, content fingerprinting, detection, finding dedup, and policy evaluation. Per-stage timing is recorded in ScanResult.stage_timings for observability.
Extraction¶
Module: lumen_argus/extractor.py
The RequestExtractor parses JSON request bodies from three provider formats and produces ScanField objects:
| Provider | Detection method |
|---|---|
| Anthropic | Path prefix /v1/messages, header x-api-key or anthropic-version |
| OpenAI | Path prefix /v1/chat/completions |
| Gemini | Path prefix containing generateContent |
Each ScanField contains:
path-- location in the request body (e.g.,messages[4].content)text-- the extracted string to scansource_filename-- filename if from a tool-result file read (used for path allowlisting)
Scan budget
To keep scan time under 50ms, the pipeline caps total scanned text at 200 KB per request. Fields are scanned in reverse order (newest messages first), since older messages were already scanned in previous requests.
Encoding Decode¶
Module: lumen_argus/decoders.py
The ContentDecoder expands extracted fields by decoding encoded content before detection. Each field produces the original text plus any decoded variants:
| Encoding | Detection pattern | Example |
|---|---|---|
| Base64 | 20+ chars of [A-Za-z0-9+/] |
c2tfbGl2ZV8... → sk_live_... |
| Hex | 16+ hex chars (even length) | 736b5f6c69... → sk_live_... |
| URL | %XX sequences |
sk%5Flive%5F... → sk_live_... |
| Unicode | \uXXXX sequences |
\u0073\u006b... → sk_... |
Decoded variants get annotated paths: messages[0].content[base64]. A quality filter (_is_meaningful) rejects non-printable binary output (images, protobuf).
Configurable via pipeline.stages.encoding_decode in YAML or the Pipeline dashboard page:
- Per-encoding toggles (base64, hex, URL, Unicode)
max_depth(default 2) — nested encoding layersmin_decoded_length/max_decoded_length— filter noise
Gated by the encoding_decode pipeline stage toggle. When disabled, fields pass through unchanged.
Detection¶
Module: lumen_argus/detectors/
Detectors run sequentially on extracted fields. Each detector implements the BaseDetector ABC defined in lumen_argus/detectors/__init__.py.
When the rules DB table has rules (auto-imported on first run), the pipeline uses RulesDetector which replaces SecretsDetector, PIIDetector, and CustomDetector. ProprietaryDetector always runs (file-pattern based, not regex rules).
| Detector | Module | Description |
|---|---|---|
| RulesDetector | detectors/rules.py |
DB-backed rules with validator registry and license gating. Used when DB has rules. |
| SecretsDetector | detectors/secrets.py |
Fallback: 30+ compiled regex patterns plus Shannon entropy sweep. Used when DB has no rules. |
| PIIDetector | detectors/pii.py |
Fallback: regex patterns with validators (Luhn, SSN, IBAN). Used when DB has no rules. |
| ProprietaryDetector | detectors/proprietary.py |
File pattern blocklist and keyword detection. Always active. |
| CustomDetector | detectors/custom.py |
Fallback: user-defined regex rules from config. Used when DB has no rules. |
All regex patterns are compiled at load time (startup or SIGHUP reload) to avoid runtime compilation overhead. The RulesDetector supports named validators (luhn, ssn_range, iban_mod97, exclude_private_ips) and is license-aware — Pro rules (tier='pro') are skipped when no valid license is present.
Rule Overlap Analysis¶
Optional integration with Crossfire (pip install lumen-argus[rules-analysis]). Detects duplicate, subset, and overlapping rules using corpus-based analysis. The rule_analysis.py module generates test strings per rule via CorpusGenerator, cross-evaluates all rules against all corpora via Evaluator, then classifies pairs via Classifier into duplicates, subsets, and overlaps.
Results cached in the rule_analysis SQLite table. Dashboard "Rule Analysis" page shows findings with Disable/Review/Dismiss actions. Auto-analysis runs in a background thread after rule import. Configurable via rule_analysis: config section (samples, threshold, seed, auto_on_import). Rules page shows [N ovr] overlap badges linking to the analysis page.
Response Scanning¶
Module: lumen_argus/response_scanner.py
The ResponseScanner scans API response text (model output) after it has been forwarded to the client. Runs asynchronously in a background thread — zero latency impact on the request/response cycle.
Two detection types:
| Type | Detector | Patterns | Description |
|---|---|---|---|
| Secrets | Reuses existing detectors (SecretsDetector, PIIDetector, etc.) | All request patterns | Catches secrets leaked from context in model output |
| Injection | Built-in regex patterns | 10 patterns | Detects prompt injection attempts (e.g., "ignore previous instructions") |
Response findings are recorded to the analytics store and audit log with response. location prefix (e.g., response.content). Controlled by response_secrets and response_injection pipeline stages — both disabled by default (opt-in).
Async mode (community): response is forwarded immediately, scanned in background thread, findings recorded post-hoc. Pro adds buffered/blocking mode and custom injection patterns via rules engine.
MCP Scanning¶
Package: lumen_argus/mcp/ — unified scanning proxy with 4 transport modes.
Modules: scanner.py (MCPScanner), proxy.py (transport loops), transport.py (StdioTransport, HTTPClientTransport, WebSocketClientTransport), request_tracker.py, tool_scanner.py, session_binding.py, env_filter.py.
The MCPScanner is shared between two MCP scanning paths:
1. HTTP proxy MCP scanning (enterprise/k8s):
The proxy automatically detects MCP tools/call JSON-RPC in HTTP request bodies via detect_mcp_request(). When detected:
- Checks tool against allow/block lists → blocks with JSON-RPC error if denied
- Scans tool arguments with existing detectors
- Tracks tool usage in mcp_detected_tools table (name, call count, first/last seen)
- Scans response result.content text via detect_mcp_response()
- No developer-side config needed — works automatically for MCP over HTTP
2. lumen-argus mcp — 4 transport modes:
- Stdio subprocess (-- cmd): spawns MCP server as child process, relays stdin/stdout with scanning. Restricted environment (safe vars only).
- HTTP bridge (--upstream http://...): stdio client → HTTP upstream.
- HTTP reverse proxy (--listen :PORT --upstream http://...): accepts HTTP POST, forwards to upstream.
- WebSocket bridge (--upstream ws://...): stdio client → WebSocket upstream.
Security layers (all modes):
- Confused deputy protection: tracks outbound request IDs, rejects unsolicited responses (FIFO eviction at 10K)
- Tool description poisoning detection: 7 pattern categories scanned on tools/list response
- Tool drift detection: SHA-256 baselines in mcp_tool_baselines DB table, detects definition changes
- Session binding (opt-in): validates tools/call against tool inventory from first tools/list
- Environment restriction (stdio only): strips sensitive vars from child process environment
Both paths share:
- Request scanning: serializes tool arguments to text, runs existing detectors
- Response scanning: extracts text from content[] array, runs detectors + injection patterns
- Tool allow/block lists: mcp.allowed_tools / mcp.blocked_tools in config + DB (mcp_tool_lists table)
- Block action: returns JSON-RPC error (-32600)
- Findings have mcp. location prefix (e.g., mcp.tools/call.write_file.arguments)
Controlled by mcp_arguments and mcp_responses pipeline stages (enabled by default).
WebSocket Proxy¶
Modules: lumen_argus/ws_proxy.py (scanner), lumen_argus/async_proxy.py (relay)
The WebSocketScanner scans WebSocket text frames bidirectionally. The relay runs on the same port as the proxy — clients connect with ws://localhost:8080/ws?url=ws://target.
- Outbound scanning: client → server text frames scanned for secrets/PII
- Inbound scanning: server → client text frames scanned for secrets + injection patterns
- Binary frames: passed through without scanning
- Origin check: configurable
websocket.allowed_originslist - Frame size cap:
websocket.max_frame_size(default 1MB) - SSRF protection: target URL must use
ws://orwss://scheme - Findings have
ws.outboundorws.inboundlocation prefix
Controlled by websocket_outbound and websocket_inbound pipeline stages (disabled by default, opt-in). Runs on the same port as the proxy (ws://localhost:8080/ws?url=ws://target). SIGHUP reloads the scanner configuration without server restart.
Connection lifecycle hooks: Each WebSocket connection gets a unique connection_id (UUID). Extension hooks fire on open, finding_detected (text frames with findings), and close events. Community records connection data to ws_connections SQLite table (target URL, origin, duration, frame counts, findings count, close code). Pro can override via extensions.set_ws_connection_hook() for richer per-connection analytics. Hook calls run in thread pool via asyncio.to_thread() to avoid blocking the event loop.
Policy enforcement: WebSocket findings are evaluated against the same policy as HTTP requests. Block action closes the connection immediately (frame not forwarded). Alert/log actions record findings and continue forwarding.
Within-Request Deduplication¶
After detection, findings with the same (detector, type, matched_value) tuple are collapsed into a single finding with an incremented count. This reduces noise from secrets repeated across conversation history.
Cross-Request Deduplication¶
LLM API requests contain the full conversation history — every previous message is re-sent. Without cross-request dedup, the same secret generates new findings on every subsequent request, causing quadratic growth in finding rows.
A 3-layer architecture eliminates this redundancy:
| Layer | Location | What it does | Failure mode if missing |
|---|---|---|---|
| Content fingerprinting | Before detectors | Per-session SHA-256[:16] hash set skips already-scanned fields | 80-95% wasted scan CPU |
| Finding TTL cache | After policy eval, before record_findings() |
Session-scoped (detector, type, hash(matched_value), session_id) cache suppresses duplicate DB writes |
Same finding written N times per conversation |
| Store unique constraint | ON CONFLICT DO UPDATE |
UNIQUE(content_hash, session_id) index, increments seen_count |
Duplicates after process restart or cache eviction |
Content fingerprinting (ContentFingerprint class) uses session.session_id as the conversation key. Each conversation tracks a set of SHA-256[:16] hashes of field text. On each request, only fields with unseen hashes are passed to detectors. Sharded (16 locks) for low contention, with TTL eviction (default 30 minutes) and a per-conversation hash cap (5,000). Two-phase commit: filter_new_fields() returns pending hashes, commit_hashes() stores them only if the request is not blocked. After successful history stripping, commit_pending() commits the hashes so subsequent requests skip the stripped content — avoiding repeated strip overhead for the rest of the session.
Finding TTL cache (_FindingDedup class) filters findings before record_findings() but keeps all findings in ScanResult for policy evaluation — the action (block/alert) still fires even if the finding isn't new. The notification dispatcher also receives all findings (it has its own independent cooldown). Both cleanup schedulers run on background daemon threads.
Store unique constraint uses a content_hash column computed as SHA-256[:16] of detector|type|hash(matched_value). The partial unique index WHERE content_hash != '' excludes legacy rows. ON CONFLICT DO UPDATE SET seen_count = seen_count + 1 tracks how many times each finding was detected. When Layer 1 skips fields (conversation history re-sent), bump_seen_counts() increments all existing findings in the session.
Value hashing: Each finding optionally stores value_hash — an HMAC-SHA-256 hash of matched_value using a server-side key (~/.lumen-argus/hmac.key, auto-generated, 0600 permissions). Full 64 hex chars (256 bits). Enables cross-session secret tracking without persisting raw secrets. Configurable via analytics.hash_secrets (default: true).
Configurable via dedup: in config.yaml:
| Parameter | Default | Description |
|---|---|---|
conversation_ttl_minutes |
30 | Content fingerprint TTL per conversation |
finding_ttl_minutes |
30 | Finding-level cache TTL |
max_conversations |
10,000 | Max tracked conversations (TTL eviction) |
max_hashes_per_conversation |
5,000 | Cap hashes per conversation (~80KB each) |
Stage 3: Policy Engine¶
Module: lumen_argus/policy.py
The policy engine evaluates all findings and determines the highest-priority action.
Action priority¶
Actions are resolved using a strict priority order:
| Priority | Action | Description |
|---|---|---|
| 4 | block |
Reject with HTTP 400 (invalid_request_error) regardless of streaming mode. If findings are only in conversation history, strips affected messages and forwards the cleaned request (logged as strip). |
| 3 | redact |
Replace matched values in the request body before forwarding. (Pro only) |
| 2 | alert |
Forward the request but log and display a warning. |
| 1 | log |
Forward the request and record the finding silently. |
| 0 | pass |
No findings -- forward without action. |
The winning action is the highest priority across all findings in a request. For example, if one finding triggers alert and another triggers block, the request is blocked.
Per-detector overrides¶
Each detector can have its own action via detectors.<name>.action in the config. If not set, the default_action applies.
Community Edition limitation
In the Community Edition, the redact action is automatically downgraded to alert by the policy engine. Full redaction support requires the Pro edition, which registers an evaluate hook to bypass this downgrade.
History stripping¶
When block fires but findings are only in conversation history (not the latest user message), the proxy strips those messages/content blocks from the request body and forwards the cleaned request. This prevents a blocked message from tainting the entire session.
- Message-level: In multi-message conversations, entire messages containing findings are removed.
- Content-block-level: Claude Code packs everything into
messages[0]with multiple content blocks — only the offending blocks are removed, preserving the rest. - Audit: Strip events are logged with
action="strip", preserving findings for forensic review. - Dedup: After successful strip,
commit_pending()commits fingerprint hashes so subsequent requests skip the stripped content (no repeated strip overhead).
Stage 4: Audit Logger¶
Module: lumen_argus/audit.py
The audit logger writes JSONL records for every scanned request to ~/.lumen-argus/audit/guard-{timestamp}.jsonl.
| Property | Detail |
|---|---|
| Format | JSONL (one JSON object per line) |
| File permissions | 0o600 (owner read/write only) |
| Thread safety | Thread-safe writes via locking |
| Retention | Configurable via audit.retention_days (default: 90 days) |
Security: matched_value is never written to disk
The Finding.matched_value field (the actual secret/PII value) is kept in memory only. Audit log entries contain only the masked value_preview (e.g., AKIA****EXAMPLE). This prevents secondary exfiltration of sensitive data through log files.
History strip events are recorded with action="strip" — the findings are preserved in the audit log for forensic review, even though the cleaned request was forwarded.
Key data structures¶
Module: lumen_argus/models.py
All shared data structures are defined in a single module with no internal imports, preventing circular dependencies.
Finding¶
A single detection result.
| Field | Type | Description |
|---|---|---|
detector |
str |
Detector name: secrets, pii, proprietary, or custom rule name |
type |
str |
Finding type (e.g., aws_access_key, ssn, email) |
severity |
str |
critical, high, warning, or info |
location |
str |
Path into request body (e.g., messages[4].content) |
value_preview |
str |
Masked preview (e.g., AKIA****EXAMPLE) |
matched_value |
str |
Full matched text (memory only, never serialized) |
action |
str |
Resolved action for this finding |
count |
int |
Number of occurrences after deduplication |
ScanField¶
An extracted text field from a request body.
| Field | Type | Description |
|---|---|---|
path |
str |
Location descriptor (e.g., messages[3].content) |
text |
str |
The extracted string to scan |
source_filename |
str |
Filename if from a tool-result file read |
ScanResult¶
Aggregated result of scanning a request.
| Field | Type | Description |
|---|---|---|
findings |
list[Finding] |
All findings from the scan |
scan_duration_ms |
float |
Total time spent scanning in milliseconds |
action |
str |
Highest-priority resolved action: pass, log, alert, or block |
stage_timings |
dict[str, float] |
Per-stage timing breakdown in ms: extraction, encoding_decode, fingerprint, outbound_dlp |
SessionContext¶
Session/conversation identity extracted from each request. Populated by the proxy before scanning, passed through the pipeline to audit log and analytics store. Each field is a separate DB column for direct filtering.
| Field | Type | Source | Description |
|---|---|---|---|
account_id |
str |
Anthropic metadata.user_id.account_uuid, OpenAI user |
Account identifier (WHO) |
api_key_hash |
str |
SHA-256[:16] of x-api-key or Authorization header |
Truncated key hash (not in audit JSONL) |
session_id |
str |
Provider metadata or derived fp:<hash> |
Conversation identifier (WHICH CHAT) |
device_id |
str |
Anthropic metadata.user_id.device_id |
Machine identifier (WHICH MACHINE) |
source_ip |
str |
X-Forwarded-For or client_address |
Client IP (WHERE) |
working_directory |
str |
System prompt: Primary working directory: |
Project path (WHAT PROJECT) |
git_branch |
str |
System prompt: Current branch: |
Git branch |
os_platform |
str |
System prompt: Platform: |
OS (darwin, linux, win32) |
client_name |
str |
Client registry (identify_client()) |
Normalized client ID (e.g., "cursor", "aider") |
client_version |
str |
Parsed from User-Agent token |
Client version (e.g., "0.45.1") |
Claude Code metadata parsing
Claude Code sends metadata.user_id as a JSON-encoded string: '{"device_id":"...","account_uuid":"...","session_id":"..."}'. The proxy detects strings starting with {, parses with json.loads(), and extracts individual fields.
AuditEntry¶
A single audit log record. Serialized to JSONL via to_dict(), which explicitly excludes matched_value from all findings and api_key_hash from session context.
| Field | Type | Description |
|---|---|---|
timestamp |
str |
ISO 8601 UTC timestamp |
request_id |
int |
Monotonically increasing request counter |
provider |
str |
Detected provider (anthropic, openai, gemini, unknown) |
model |
str |
Model name from request body |
endpoint |
str |
Request path |
action |
str |
Resolved action |
findings |
list[Finding] |
Findings (serialized without matched_value) |
scan_duration_ms |
float |
Scan time |
request_size_bytes |
int |
Request body size |
passed |
bool |
Whether the request was forwarded to upstream |
| Session fields | str |
All SessionContext fields except api_key_hash (omitted when empty) |
Extensions¶
Module: lumen_argus/extensions.py
The extension system provides the open-core boundary between Community and Pro/Enterprise editions. Extensions are discovered via Python entry points in the lumen_argus.extensions group.
Entry point registration¶
[project.entry-points."lumen_argus.extensions"]
my_plugin = "my_package:register"
def register(registry):
from my_package.detectors import MyDetector
registry.add_detector(MyDetector())
Available hooks¶
| Hook | Signature | Description |
|---|---|---|
pre_request |
(request_id: int) -> None |
Called at the start of each request. Use for correlation ID setup. |
post_scan |
(result: ScanResult, body: bytes, provider: str, session=ctx) -> None |
Called after each scan completes. Use for notifications or SSE push. Accept **kwargs for forward compat. |
evaluate |
(findings: list[Finding], policy: PolicyEngine) -> ActionDecision |
Replaces the default policy evaluation. Used by Pro to support the redact action. Falls back to default on exception. |
redact |
(body: bytes, findings: list[Finding]) -> bytes |
Transforms the request body to redact matched values before forwarding. Pro only. |
config_reload |
(pipeline: ScannerPipeline) -> None |
Called after SIGHUP config reload. Use for plugin re-initialization. |
response_scan |
(text: str, provider: str, model: str, session) -> (action, findings) |
Buffered response scanning. Runs INSTEAD of async scan when set. Return ("block", findings) to reject response with 400. Pro only. |
Dashboard extension hooks¶
| Hook | Signature | Description |
|---|---|---|
register_dashboard_pages(pages) |
pages: list[dict] |
Register additional dashboard pages. Each page dict has name, label, js, order. Pages matching locked placeholders unlock them. |
register_dashboard_css(css) |
css: str |
Register additional CSS injected after community CSS. |
register_dashboard_api(handler) |
async handler(path, method, body, store, audit_reader, agent_identity) -> (status, body) or None |
Register a plugin API handler. Called before community handler; return None to fall through. agent_identity is AgentIdentity | None. |
set_analytics_store(store) |
store: AnalyticsStore |
Override the analytics store (Pro passes its extended store). |
set_sse_broadcaster(broadcaster) |
broadcaster: SSEBroadcaster |
Store the SSE broadcaster for plugin access. |
register_auth_provider(provider) |
provider.authenticate(headers) -> dict or None |
Register an auth provider (Enterprise: OAuth, SAML). |
Notification hooks¶
| Hook | Signature | Description |
|---|---|---|
register_channel_types(types) |
types: dict |
Register channel type definitions for the dashboard dropdown. |
set_notifier_builder(builder) |
builder(channel_dict) -> notifier or None |
Factory that builds notifier instances from DB channel rows. |
set_dispatcher(dispatcher) |
dispatcher.dispatch(findings, provider) |
Set the notification dispatcher. Community calls this from pipeline. |
set_channel_limit(limit) |
limit: int or None |
Set max channels (None = unlimited, 1 = freemium default). |
Observability hooks¶
| Hook | Signature | Description |
|---|---|---|
set_health_hook(hook) |
hook() -> dict |
Merged into /health JSON response. |
set_metrics_hook(hook) |
hook() -> str |
Prometheus text lines appended to /metrics. |
set_trace_request_hook(hook) |
hook(method, path) -> context manager |
Wraps full request lifecycle for OpenTelemetry tracing. |
All observability hooks are fully guarded — exceptions never break requests.
Registry methods¶
Extensions interact with the proxy through the ExtensionRegistry:
add_detector(detector, priority=False)-- Register an additional detector (prepend withpriority=True)add_notifier(notifier)-- Register a notification handlerset_proxy_server(server)/get_proxy_server()-- Access the proxy server instance for runtime config updatesget_analytics_store()/get_sse_broadcaster()-- Access shared infrastructureget_dashboard_pages()/get_dashboard_css()/get_dashboard_api_handler()-- Read registered extensionsget_auth_providers()-- List registered auth providersget_channel_types()/get_notifier_builder()/get_dispatcher()/get_channel_limit()-- Notification infrastructureget_health_hook()/get_metrics_hook()/get_trace_request_hook()-- Observability hooks
Module structure¶
CLI and startup¶
The proxy entry point (cli.py) is a thin argparse dispatcher. Server lifecycle is decomposed into focused modules:
| Module | Responsibility |
|---|---|
cli.py |
Argument parsing, command dispatch, thin _run_* handlers |
startup.py |
Server construction, component wiring, async run loop |
config_loader.py |
HMAC key, rules bundle loading, analytics init, DB config overrides |
reload.py |
SIGHUP hot-reload (config, pipeline, scanners, channels) |
mcp_cmd.py |
Standalone lumen-argus mcp CLI with own detector stack |
Dashboard API¶
The dashboard REST API is split by domain. All handlers import shared utilities from api_helpers.py:
| Module | Endpoints |
|---|---|
dashboard/api.py |
Dispatcher + small inline handlers (ws, audit) |
dashboard/api_helpers.py |
json_response, parse_pagination, parse_json_body, require_store, broadcast_sse, send_sighup, mask_channel |
dashboard/api_findings.py |
Findings list/detail, sessions, stats |
dashboard/api_rules.py |
Rules CRUD, bulk update, clone, analysis |
dashboard/api_channels.py |
Notification channel CRUD, types, test, batch |
dashboard/api_config.py |
Config/pipeline CRUD, status, license, logs, clients |
dashboard/api_allowlists.py |
Allowlist CRUD and pattern testing |
dashboard/api_mcp.py |
MCP tool list management |
Pro extensions should import shared helpers from lumen_argus.dashboard.api_helpers (not from api.py). The public entry point handle_community_api() remains in api.py.