Extensions¶
lumen-argus supports plugins via Python entry points. Any pip-installed package can register custom detectors, hooks, and notifiers.
Plugin Registration¶
1. Declare Entry Point¶
# In your plugin's pyproject.toml
[project.entry-points."lumen_argus.extensions"]
my_plugin = "my_package:register"
2. Implement Register Function¶
# In my_package/__init__.py
def register(registry):
"""Called by lumen-argus on startup."""
from my_package.detectors import MyDetector
registry.add_detector(MyDetector())
3. Install and Run¶
Plugins are logged at startup:
4. Declare Load-Order Dependencies (Optional)¶
When one plugin reads shared state set up by another (for example via
registry.get_plugin("other-plugin")), declare the dependency on the
plugin package so load_plugins() can topologically sort the
register() calls:
# In my_package/__init__.py
LUMEN_ARGUS_PLUGIN_DEPENDS_ON: tuple[str, ...] = ("other-plugin",)
def register(registry):
other = registry.get_plugin("other-plugin")
...
Strings refer to entry-point names (the key on the left of = in
[project.entry-points."lumen_argus.extensions"]), not Python module
names. Plugins without the attribute fall back to entry-point iteration
order — fully backward compatible.
Failure modes are intentionally non-fatal so a misconfigured plugin cannot crash the proxy:
- Missing dependency (X declares dep on Y, Y not installed): X and
anything transitively depending on X are dropped with a
WARNING. Other plugins continue loading. - Cycle (
A → B → A): logged asERROR; the involved plugins fall back to entry-point iteration order. The proxy still boots.
Extension Registry API¶
Detectors¶
detector: Instance implementingBaseDetectorpriority=True: Run before built-in detectors (prepend)priority=False(default): Run after built-in detectors (append)
BaseDetector Interface¶
from lumen_argus.detectors import BaseDetector
from lumen_argus.models import Finding, ScanField
from lumen_argus.allowlist import AllowlistMatcher
class MyDetector(BaseDetector):
def scan(self, fields: List[ScanField], allowlist: AllowlistMatcher) -> List[Finding]:
findings = []
for field in fields:
# Your detection logic here
if "SECRET_PATTERN" in field.text:
findings.append(Finding(
detector="my_detector",
type="secret_pattern",
severity="high",
location=field.path,
value_preview="SECR****",
matched_value="SECRET_PATTERN_VALUE", # in-memory only
))
return findings
Hooks¶
| Hook | Signature | When Called |
|---|---|---|
set_pre_request_hook(hook) |
hook(request_id) |
Start of each request, before any logging |
set_post_scan_hook(hook) |
hook(scan_result, body, provider, session=ctx) |
After scan completes. Accept **kwargs for forward compat. |
set_evaluate_hook(hook) |
hook(findings, policy) -> ActionDecision |
Replaces default policy evaluation |
set_config_reload_hook(hook) |
hook(pipeline) |
After SIGHUP config reload |
set_redact_hook(hook) |
hook(body, findings) -> bytes |
When action is "redact" |
set_health_hook(hook) |
hook() -> dict |
Merged into /health JSON response (no auth, for container probes) |
set_metrics_hook(hook) |
hook() -> str |
Prometheus text lines appended to /metrics response |
set_trace_request_hook(hook) |
hook(method, path) -> context manager |
Wraps full request lifecycle for OTel tracing |
Observability Hooks¶
Health (/health on proxy port 8080):
registry.set_health_hook(lambda: {"license": "valid", "channels_active": 3})
# Response: {"status": "ok", "uptime": 3600, "requests": 42, "license": "valid", ...}
Metrics (/metrics on proxy port 8080):
registry.set_metrics_hook(lambda: "argus_notifications_total 42\nargus_license_days 180\n")
# Appended to community Prometheus metrics
Tracing (OpenTelemetry):
from opentelemetry import trace
tracer = trace.get_tracer("lumen-argus-pro")
registry.set_trace_request_hook(
lambda method, path: tracer.start_as_current_span(
"proxy.request", attributes={"http.method": method, "http.path": path}
)
)
# Community sets span attributes: provider, body.size, findings.count, action, scan.duration_ms
# Pro detector/redaction/notification spans auto-parent via OTel context propagation
All observability hooks are fully guarded — exceptions never break requests.
Notification Hooks¶
| Hook | Signature | Description |
|---|---|---|
register_channel_types(types) |
types: dict |
Register channel type definitions (label + fields) for the dashboard dropdown |
set_notifier_builder(builder) |
builder(channel_dict) -> notifier |
Factory that builds notifier instances from DB channel rows |
set_dispatcher(dispatcher) |
dispatcher.dispatch(findings, provider, model, session_id, session, **kwargs) |
Set the notification dispatcher. Plugins may add circuit breakers, async dispatch, dedup, etc. |
set_channel_limit(limit) |
limit: int or None |
Cap the number of dashboard-managed channels (None = unlimited, the community default) |
Community provides the DB schema (notification_channels table), CRUD API, and dashboard UI, and ships with no channel cap. Plugins register additional channel types, notifier builders, dispatchers, and may impose a channel cap by calling set_channel_limit(N). Without a registered notifier builder, the Notifications page shows YAML-configured channels as read-only with a dispatch warning.
Notifier signature contract. Any custom notifier returned by set_notifier_builder(...) must accept a forward-compatible notify() signature. The pipeline dispatches through _safe_notify which calls notifier.notify(findings, provider=..., model=..., session=...). The session keyword argument is the full SessionContext dataclass and will gain new fields over time (new identity columns, new SDK metadata, etc.). Use either of:
# Option A — accept everything via **kwargs (recommended for third-party notifiers)
def notify(self, findings, provider="", model="", **kwargs):
session = kwargs.get("session")
...
# Option B — declare session explicitly (recommended if you use it)
def notify(self, findings, provider="", model="", session=None, **kwargs):
...
A notifier whose signature is strict (e.g. def notify(self, findings, provider, model) with no **kwargs) will raise TypeError on every dispatch starting with community da5ea71. The error is caught by BasicDispatcher._safe_notify and logged; get_last_status() reports the failure, but the customer-facing channel goes silent. Always include **kwargs or an explicit session=None parameter. session should be treated as read-only inside the notifier — the same instance is still reachable on the calling thread via the SSE broadcast and post-scan hook, and mutation would race with those readers.
YAML reconciliation: Channels defined in config.yaml are reconciled to SQLite on startup and SIGHUP (Kubernetes-style). YAML is fully authoritative — all fields including enabled overwrite DB values. Dashboard-managed channels are never touched by the reconciler.
Channel cap (optional): set_channel_limit(N) enforces an atomic count + insert under the same lock. Community ships with no cap.
Server Access¶
registry.set_proxy_server(server) # Called by cli.py after server creation
server = registry.get_proxy_server() # Access from plugin code
Dashboard Hooks¶
Extend the community dashboard without replacing it:
def register(registry):
# Add pages — each plugin owns its page end-to-end (registration,
# content, and any license-state branching for tier-aware UI).
registry.register_dashboard_pages([
{"name": "rules", "label": "Rules",
"js": "registerPage('rules', 'Rules', {loadFn: loadRules, html: _pageHtml_rules});",
"html": "<div class='sh'><h2>Detection Rules</h2></div>",
"order": 25},
])
# Register notification channel types and dispatcher
registry.register_channel_types({"slack": {"label": "Slack", "fields": {...}}})
registry.set_notifier_builder(my_builder)
registry.set_dispatcher(my_dispatcher)
# registry.set_channel_limit(N) # optional cap; defaults to None (unlimited)
# Add CSS (injected after community CSS)
registry.register_dashboard_css(".pro-badge { color: gold; }")
# Add API handler (called before community handler)
def my_api(path, method, body, store, audit_reader):
if path == "/api/v1/rules":
return 200, json.dumps({"rules": []}).encode()
return None # fall through to community
registry.register_dashboard_api(my_api)
# Override analytics store (Pro extends with more tables)
registry.set_analytics_store(my_extended_store)
# Access SSE broadcaster for real-time events
broadcaster = registry.get_sse_broadcaster()
if broadcaster:
broadcaster.broadcast("finding", {"count": 3})
Plugin trust model
The js field is injected as a raw <script> block and executes in the dashboard origin — it is trusted code. Only register pages from pip-installed entry-point plugins. The html field is sanitized client-side via _safeInjectHTML() which strips <script> tags and on* event handlers.
Clearing Dashboard Extensions (SIGHUP)¶
Plugins call clear_dashboard_pages() during SIGHUP config reload to drop
their previously-registered dashboard surface — useful when license state or
configuration changes between reloads:
def on_config_reload(pipeline):
registry.clear_dashboard_pages() # reset pages, CSS, API handler
if plugin_should_render():
registry.register_dashboard_pages(get_plugin_pages()) # re-register
registry.register_dashboard_css(get_plugin_css())
registry.register_dashboard_api(handle_plugin_api)
# else: leave empty → only the community surface remains
Auth Providers¶
Register additional authentication methods (Enterprise):
registry.register_auth_provider(my_oauth_provider)
# provider.authenticate(headers) -> {"user_id": "...", "roles": [...]} or None
Multi-Plugin Coordination¶
When more than one plugin is installed, plugins frequently need to share state — the late-loading plugin wants the early-loading plugin's store handle, dispatcher, or metrics collector. The following hooks let plugins do this without reaching into each other's private module attributes.
Plugin Instance Registry¶
Publish and look up plugin instances by name. Convention: use the package
name without the lumen_argus_ prefix.
# Plugin A — registers itself during its register() entry point
def register(registry):
instance = MyPlugin(...)
registry.set_plugin("my_plugin", instance)
# Plugin B — looks up plugin A by name
def register(registry):
upstream = registry.get_plugin("my_plugin") # None if not installed
if upstream is not None:
# Use whatever public attributes plugin A exposes
collector = MyCollector(sink=upstream.public_handle)
set_plugin(name, instance)— idempotent, last write wins. No type constraint — the caller knows the shape.get_plugin(name)— returns the instance orNone.- Plugin A owns the contract for what attributes it exposes on its own instance. Plugin B reads at its own risk if the shape changes.
Schema Extension Registration¶
Plugins that own their own database tables register DDL that runs after the
community schema. Must be idempotent (CREATE TABLE IF NOT EXISTS).
def register(registry):
registry.register_schema_extension("""
CREATE TABLE IF NOT EXISTS my_plugin_events (
id {auto_id},
created_at {ts} NOT NULL,
payload TEXT NOT NULL
);
""")
The placeholders {auto_id} and {ts} resolve to the adapter's dialect
(INTEGER PRIMARY KEY AUTOINCREMENT / TEXT for SQLite, SERIAL PRIMARY
KEY / TIMESTAMPTZ for PostgreSQL). Extensions run in registration
order, each applied independently: a failure in one is logged at ERROR
and skipped without blocking the others. Broken plugin schemas degrade
that plugin's features — they don't crash the store.
Registration must happen during the plugin's register() entry point.
The proxy collects all registered extensions after load_plugins() runs
and applies them against the active analytics store, so plugin authors
do not need to worry about startup ordering.
If you need to apply DDL against an already-constructed store (e.g. in a test), call the public method directly:
Public Store Execute API¶
Plugins with their own tables run queries against the shared database via
three stable methods on AnalyticsStore:
# Read — returns list[dict], uses thread-local connection
rows = store.execute(
"SELECT payload FROM my_plugin_events WHERE id = ?",
(event_id,),
)
# Single-statement write — acquires adapter write lock, commits
result = store.execute_write(
"INSERT INTO my_plugin_events (created_at, payload) VALUES (?, ?)",
(now, payload),
)
print(result.rowcount, result.lastrowid)
# Multi-statement transaction
with store.write_transaction() as conn:
conn.execute("DELETE FROM my_plugin_events WHERE created_at < ?", (cutoff,))
conn.execute(
"INSERT INTO my_plugin_events (created_at, payload) VALUES (?, ?)",
(now, payload),
)
# commits on success, rolls back on exception
- All methods use
?placeholders. The adapter translates to%sfor PostgreSQL. execute()returnslist[dict[str, Any]]— one dict per row.execute_write()returnsWriteResult(rowcount, lastrowid).write_transaction()yields aDBConnectionunder the write lock.- Never nest:
execute_writeandwrite_transactionboth acquire the adapter write lock, which is a plainthreading.Lockon SQLite (non-reentrant). Do not callexecute_writefrom inside awrite_transactionblock — it will self-deadlock. Put all your writes inside a singlewrite_transactionblock instead. - Never log SQL parameters — they may contain sensitive values (the same values the scanner pipeline is looking for).
These are the only public SQL entry points for plugins. Do not reach
into store._adapter or store._connect() — they are internal and may
change without notice.
Multi-Package Static File Loading¶
Plugins can ship their own JS/CSS/HTML files from their own package directory instead of bundling them inside another plugin. The dashboard server scans registered directories and injects their contents into the assembled SPA HTML.
def register(registry):
static = os.path.join(os.path.dirname(__file__), "dashboard", "static")
registry.register_static_dir(static)
Directory layout:
dashboard/static/
├── js/*.js → concatenated into a <script> block before </body>
├── css/*.css → concatenated into the <style> block
└── html/*.html → exposed as var _pageHtml_<basename> inside <script>
- Directories are processed in registration order (community first, then plugins in entry-point order).
- File name collisions resolve last-write-wins, so a plugin can override a community file if it really needs to — do this rarely.
- If the registered path doesn't exist at registration time, a warning is logged and the directory is skipped.
- Missing subdirectories (
js/,css/,html/) are silently ignored, so a plugin may provide only JS, only CSS, etc. - HTML files become
var _pageHtml_<safename>=<json>;globals. The<safename>is the basename with non-identifier characters (-,., etc.) replaced with_, somy-page.v2.htmlbecomes_pageHtml_my_page_v2. - Results are cached on first request and reused across page loads. The
cache is busted when
registry.clear_dashboard_pages()runs (e.g. on SIGHUP reload), so any on-disk content changes are picked up on the next page load. Registered static dirs themselves are not dropped on reload — they're tied to installed plugin packages, which don't change at runtime.
Dashboard Status Data Sharing¶
The community dashboard calls /api/v1/status on every loadData()
(initial load + SSE/polling refresh) and exposes the response on
three globals:
window._statusData // full /api/v1/status JSON response
window._licenseTier // "community" | "pro" | "team"
window._loadDataPromise // Promise that resolves when the initial
// loadData() call completes
_statusData and _licenseTier are populated when the first
loadData() call resolves — not synchronously at script-parse
time. Plugin JS that runs during script load (an IIFE, a module
that registers pages and does work in the body of its top-level
function) or during a direct-hash navigation loadFn (e.g. bookmarking
#mcp) will generally fire before loadData() has finished, so
reading the globals synchronously at that point can return
undefined.
To read status reliably, await window._loadDataPromise first:
(window._loadDataPromise || Promise.resolve()).then(function(){
// window._statusData and window._licenseTier are now populated.
const tier = window._licenseTier;
// ...
});
The || Promise.resolve() fallback covers the case where a plugin
module somehow runs before init.js assigns the promise — in that
case the fallback resolves immediately and the plugin code will see
whatever is in _statusData at that moment (possibly undefined),
so it should still null-check before use.
After the first awaited resolution, later reads can be synchronous:
_statusData is refreshed in place on every polling cycle (5s by
default) and on every SSE finding event, so it stays current
without any further awaits.
Data Structures¶
ScanField¶
@dataclass
class ScanField:
path: str # e.g. "messages[3].content"
text: str # text to scan
source_filename: str # filename if from tool_result
Finding¶
@dataclass
class Finding:
detector: str # "secrets", "pii", "proprietary", "custom", or plugin name
type: str # e.g. "aws_access_key", "email"
severity: str # "critical", "high", "warning", "info"
location: str # path into request body
value_preview: str # masked (first 4 chars + "****")
matched_value: str # full match — NEVER written to disk
action: str # resolved action (set by PolicyEngine)
count: int # deduplication count (default 1)
Security Invariant
Finding.matched_value is kept in memory only. It is excluded from audit log serialization, application logs, metrics, and baseline files. Never persist it.