Every stimulus processed by SYRIS passes through exactly three stages: Normalize, Route, and Execute. The stages are separate Python modules communicating through data structures, not shared state.
[InboundAdapter]
│ raw adapter payload
▼
[normalizer.py]
│ MessageEvent ── emit AuditEvent("event.ingested")
│ ── persist MessageEvent to DB
▼
[router.py]
│ RoutingDecision ── emit AuditEvent("routing.decided")
│ ── persist RoutingDecision to DB
▼
[pipeline/executor.py]
├─ fast ──► [tools/executor.py] ── emit AuditEvent("tool_call.*")
├─ task ──► [tasks/engine.py] ── create Task + Steps
├─ gated ──► [safety/gates.py] ── create Approval
│ ── emit AuditEvent("gate.required")
└─ sandbox──► [workers/manager.py]pipeline/runner.py is a thin loop that sequences these three stages. The router's internal structure (fast-path DSL → rules engine → LLM fallback) is documented in dev/fast-path-intents and proactive/rules-engine.
trace_id is a UUID generated once, by the normaliser, at MessageEvent creation. It never changes and is never regenerated. Every downstream component — router, executor, task engine, tool executor — receives and propagates it.
AuditWriter.emit() requires trace_id as a positional argument, making omission a type error rather than a silent runtime gap.
| Stage | Event type | Trigger |
|---|---|---|
| normalize | event.ingested | MessageEvent successfully created and persisted |
| normalize | event.deduped | Duplicate dedupe_key found within dedup window; event discarded |
| route | routing.decided | RoutingDecision produced and persisted |
| execute | gate.required | Gated lane: Approval record created |
| execute | gate.approved | Operator approves a pending Approval |
| execute | gate.denied | Operator denies a pending Approval |
| execute | gate.antiflap_block | Anti-flap override suppresses action |
| execute | gate.storm_block | Notification storm override suppresses action |
| tool_call | tool_call.attempted | Tool execution started |
| tool_call | tool_call.succeeded | Tool returned success; outcome stored in idempotency_outcomes |
| tool_call | tool_call.failed | Tool returned error (retryable or non-retryable) |
| tool_call | tool_call.deduped | Prior outcome found for idempotency_key; returned without re-execution |
| tool_call | tool_call.unknown | Transport failure mid-call; outcome stored as unknown |
| task | task.step_started | Step marked running by step runner |
| task | task.step_completed | Step succeeded; task advances |
| task | task.step_failed | Step failed after retries exhausted |
| task | task.step_unexpected_error | Unhandled exception in step function |
| scheduler | schedule.fired | Schedule emits its MessageEvent |
| scheduler | schedule.skipped | Schedule suppressed by quiet hours |
| scheduler | schedule.missed | Schedule was due during downtime; catch-up policy applied |
| watcher | watcher.tick | Watcher tick completed |
| watcher | watcher.error | Watcher tick raised an exception |
| rule | rule.triggered | Rule condition matched; actions dispatched |
| rule | rule.suppressed | Rule matched but suppressed by debounce, dedupe, or quiet hours |
| operator | operator.action.* | Any operator control action (autonomy change, pause, cancel, etc.) |
The normaliser computes a dedupe_key for every event from a stable hash of channel, connector, and native message ID. If the key exists in the events table within a configurable window (default: 60 seconds), the event is discarded and event.deduped is emitted.
The dedup check is transactional:
INSERT INTO events ... ON CONFLICT (dedupe_key) DO NOTHINGA UNIQUE partial index on events(dedupe_key) WHERE dedupe_key IS NOT NULL enforces uniqueness at the DB level.