Pipeline

The three-stage Normalize → Route → Execute pipeline. data flow, trace threading, and audit emission.

Every stimulus processed by SYRIS passes through exactly three stages: Normalize, Route, and Execute. The stages are separate Python modules communicating through data structures, not shared state.

Stages and data flow

[InboundAdapter]
 raw adapter payload

[normalizer.py]
 MessageEvent ── emit AuditEvent("event.ingested")
               ── persist MessageEvent to DB

[router.py]
 RoutingDecision ── emit AuditEvent("routing.decided")
                 ── persist RoutingDecision to DB

[pipeline/executor.py]
      ├─ fast   ──► [tools/executor.py]   ── emit AuditEvent("tool_call.*")
      ├─ task   ──► [tasks/engine.py]     ── create Task + Steps
      ├─ gated  ──► [safety/gates.py]     ── create Approval
                                   ── emit AuditEvent("gate.required")
      └─ sandbox──► [workers/manager.py]

pipeline/runner.py is a thin loop that sequences these three stages. The router's internal structure (fast-path DSL → rules engine → LLM fallback) is documented in dev/fast-path-intents and proactive/rules-engine.

Trace ID threading

trace_id is a UUID generated once, by the normaliser, at MessageEvent creation. It never changes and is never regenerated. Every downstream component — router, executor, task engine, tool executor — receives and propagates it.

AuditWriter.emit() requires trace_id as a positional argument, making omission a type error rather than a silent runtime gap.

Audit events by stage

StageEvent typeTrigger
normalizeevent.ingestedMessageEvent successfully created and persisted
normalizeevent.dedupedDuplicate dedupe_key found within dedup window; event discarded
routerouting.decidedRoutingDecision produced and persisted
executegate.requiredGated lane: Approval record created
executegate.approvedOperator approves a pending Approval
executegate.deniedOperator denies a pending Approval
executegate.antiflap_blockAnti-flap override suppresses action
executegate.storm_blockNotification storm override suppresses action
tool_calltool_call.attemptedTool execution started
tool_calltool_call.succeededTool returned success; outcome stored in idempotency_outcomes
tool_calltool_call.failedTool returned error (retryable or non-retryable)
tool_calltool_call.dedupedPrior outcome found for idempotency_key; returned without re-execution
tool_calltool_call.unknownTransport failure mid-call; outcome stored as unknown
tasktask.step_startedStep marked running by step runner
tasktask.step_completedStep succeeded; task advances
tasktask.step_failedStep failed after retries exhausted
tasktask.step_unexpected_errorUnhandled exception in step function
schedulerschedule.firedSchedule emits its MessageEvent
schedulerschedule.skippedSchedule suppressed by quiet hours
schedulerschedule.missedSchedule was due during downtime; catch-up policy applied
watcherwatcher.tickWatcher tick completed
watcherwatcher.errorWatcher tick raised an exception
rulerule.triggeredRule condition matched; actions dispatched
rulerule.suppressedRule matched but suppressed by debounce, dedupe, or quiet hours
operatoroperator.action.*Any operator control action (autonomy change, pause, cancel, etc.)

Deduplication at ingest

The normaliser computes a dedupe_key for every event from a stable hash of channel, connector, and native message ID. If the key exists in the events table within a configurable window (default: 60 seconds), the event is discarded and event.deduped is emitted.

The dedup check is transactional:

INSERT INTO events ... ON CONFLICT (dedupe_key) DO NOTHING

A UNIQUE partial index on events(dedupe_key) WHERE dedupe_key IS NOT NULL enforces uniqueness at the DB level.