Implementation Design

Opinionated implementation design document covering module layout, schemas, pipeline, task engine, tool runtime, scheduler/watchers/rules, observability, and sequencing.

SYRIS — Implementation Design Document

Author: Architecture review session - Claude Opus 4.6 Date: 2026-03-23
Spec source: https://docs.syris.uk
Status: Ready to build from


Understanding Confirmation

SYRIS is a single-user, always-on automation control plane that ingests stimuli from the world, normalises them into versioned MessageEvents, routes them deterministically (rules-first, LLM fallback), and executes actions through a safety-gated, idempotent tool runtime. Everything is traceable end-to-end via trace_id, queryable via API, and designed to survive crashes without duplicating side effects. It is a modular monolith — not a chatbot, not a microservice platform.


Spec Issues and Clarifications

Before designing, these are the problems I found in the spec and the decisions I made.

Issue 1: Secrets Rule 3 is too strict

The spec says you should call get_secret() inline and never assign the return value to a local variable. This means you can't reuse a credential across two calls in the same execute() without fetching it twice. Any adapter needing to construct a client (e.g. OAuth headers + refresh) will fight this.

Design decision: Relax to "the value must not outlive execute(); local variables within the function body are acceptable." The invariant that matters is "never persisted, never logged, never on self" — not "never in a local binding."

Issue 2: ToolResult has no trace_id

ToolResult is joined to traces via tool_calls.trace_id. Querying "all results for a trace" requires a join. This is fine architecturally but needs a composite index on tool_calls(trace_id, tool_call_id) to make it fast.

Issue 3: routing/ boundary is confusing

The component map shows pipeline/ as thin stage orchestrators and routing/ as a separate concern, but routing/ appears nested under pipeline/ in the module tree. These should be explicit peers.

Design decision: routing/ is a top-level package, peer to pipeline/. pipeline/router.py delegates to routing/ functions.

Issue 4: scheduler/ contains unrelated subsystems

The spec nests watchers/ and rules/ under scheduler/. These are three distinct subsystems that happen to be "proactive."

Design decision: Promote watchers/, rules/, and scheduler/ to top-level peers.

Issue 5: sandbox lane is undefined

The pipeline mentions a sandbox lane routing to workers/manager.py, but no spec page defines what distinguishes sandbox from task. Milestone 7 gives a skeleton, but the routing decision isn't specified.

Design decision: The routing decision supports the sandbox enum value, but no code is needed until Milestone 7.

Issue 6: ErrorDetail is never defined

Task.error and Step.error reference ErrorDetail | None but the data contracts page never defines the schema.

Design decision: Added. See section 2.

Issue 7: proj_queue_depth is premature

It requires knowing "tool queue depth" — but no tool queue exists as a first-class concept.

Design decision: Defer until the dashboard needs it.


1. Revised Module Layout

syris/
├── pyproject.toml
├── alembic.ini
├── src/
│   └── syris/
│       ├── __init__.py
│       ├── main.py                          # Boot: wire all components, start loops
│       ├── config.py                        # Typed settings (Pydantic BaseSettings)
│       │
│       ├── schemas/                         # Pure Pydantic v2 — no DB, no logic
│       │   ├── __init__.py
│       │   ├── common.py                    # Enums: ChannelEnum, ActorType, RiskLevel,
│       │   │                                #   AutonomyLevel, Sensitivity, etc.
│       │   │                                # Value types: ErrorDetail
│       │   ├── events.py                    # MessageEvent, RoutingDecision, GateSpec
│       │   ├── tasks.py                     # Task, Step, RetryPolicy, StepOutcome
│       │   ├── tools.py                     # ToolCall, ToolResult, ToolCallContext,
│       │   │                                #   RegisteredTool, ToolHealth, TrustPolicy
│       │   ├── audit.py                     # AuditEvent
│       │   ├── approvals.py                 # Approval
│       │   ├── schedules.py                 # Schedule
│       │   ├── rules.py                     # Rule, Condition, Action types
│       │   └── health.py                    # SystemHealth, Alarm
│       │
│       ├── storage/                         # DB layer
│       │   ├── __init__.py
│       │   ├── db.py                        # Engine, session factory, connection config
│       │   ├── models.py                    # SQLAlchemy ORM models (all tables)
│       │   ├── repos/                       # One repo per aggregate root
│       │   │   ├── __init__.py
│       │   │   ├── events.py
│       │   │   ├── audit.py
│       │   │   ├── tasks.py
│       │   │   ├── tools.py                 # tool_calls, tool_results,
│       │   │   │                            #   idempotency_outcomes
│       │   │   ├── approvals.py
│       │   │   ├── schedules.py
│       │   │   ├── watchers.py
│       │   │   ├── rules.py
│       │   │   ├── health.py                # system_health, alarms
│       │   │   └── projections.py           # proj_* tables read/write
│       │   └── migrations/                  # Alembic
│       │       ├── env.py
│       │       └── versions/
│       │
│       ├── pipeline/                        # The three-stage pipeline
│       │   ├── __init__.py
│       │   ├── runner.py                    # Main loop: normalise → route → execute
│       │   ├── normaliser.py                # Raw payload → MessageEvent + audit
│       │   ├── router.py                    # Orchestrates routing layers in order
│       │   └── executor.py                  # Dispatches to fast/task/gated/sandbox
│       │
│       ├── routing/                         # Router internals (peer to pipeline/)
│       │   ├── __init__.py
│       │   ├── filters.py                   # Hard filters: dedup, spam, quiet hours
│       │   ├── fastpath.py                  # Deterministic intent matching
│       │   ├── rules_eval.py                # Condition evaluator for Rule objects
│       │   └── llm_fallback.py              # LLM-based routing (last resort)
│       │
│       ├── tasks/                           # Task engine
│       │   ├── __init__.py
│       │   ├── engine.py                    # Claim → execute → checkpoint loop
│       │   ├── step_runner.py               # Execute one step, handle outcomes
│       │   ├── state.py                     # State machine enforcement
│       │   └── recovery.py                  # Startup reconciliation
│       │
│       ├── tools/                           # Tool runtime
│       │   ├── __init__.py
│       │   ├── registry.py                  # ToolRegistry: register, lookup, health
│       │   ├── executor.py                  # Scope → risk → gate → idempotency → call
│       │   ├── idempotency.py               # Outcome store operations
│       │   ├── base.py                      # BaseTool ABC
│       │   └── builtin/
│       │       └── noop.py                  # NoopTool for testing
│       │
│       ├── safety/                          # Safety layer
│       │   ├── __init__.py
│       │   ├── autonomy.py                  # Read/write current level + history
│       │   ├── risk.py                      # Risk classifier + adjusters
│       │   ├── gates.py                     # Gate matrix + override evaluation
│       │   └── dryrun.py                    # Preview protocol
│       │
│       ├── scheduler/                       # Schedules only (promoted to peer)
│       │   ├── __init__.py
│       │   └── loop.py                      # Scheduler tick loop
│       │
│       ├── watchers/                        # Promoted to peer
│       │   ├── __init__.py
│       │   ├── loop.py                      # Watcher tick loop
│       │   ├── base.py                      # BaseWatcher ABC
│       │   └── heartbeat.py                 # HeartbeatWatcher
│       │
│       ├── rules/                           # Promoted to peer
│       │   ├── __init__.py
│       │   ├── engine.py                    # Load rules, evaluate, dispatch actions
│       │   └── cache.py                     # In-memory cache with TTL + invalidation
│       │
│       ├── mcp/                             # MCP integration (promoted to peer)
│       │   ├── __init__.py
│       │   ├── connection.py                # MCPConnectionManager
│       │   ├── provider.py                  # MCPProvider: discovery → registry sync
│       │   ├── adapter.py                   # MCPToolAdapter(BaseTool)
│       │   └── trust.py                     # TrustPolicy schema + loader
│       │
│       ├── adapters/                        # Inbound/outbound adapters
│       │   ├── __init__.py
│       │   ├── inbound/
│       │   │   ├── __init__.py
│       │   │   └── base.py                  # InboundAdapter ABC
│       │   └── outbound/
│       │       ├── __init__.py
│       │       └── base.py                  # OutboundAdapter ABC
│       │
│       ├── observability/                   # Audit, projections, health
│       │   ├── __init__.py
│       │   ├── audit.py                     # AuditWriter — sole emit point
│       │   ├── projections.py               # Sync projection updaters
│       │   ├── health.py                    # SystemHealth writer
│       │   └── alarms.py                    # Alarm creation + dedup + lifecycle
│       │
│       ├── secrets/
│       │   ├── __init__.py
│       │   └── store.py                     # SecretsStore protocol + Fernet impl
│       │
│       ├── api/                             # FastAPI
│       │   ├── __init__.py
│       │   ├── app.py                       # FastAPI app factory
│       │   ├── deps.py                      # Dependency injection (session, services)
│       │   └── routes/
│       │       ├── __init__.py
│       │       ├── status.py                # /health, /state
│       │       ├── events.py                # /events
│       │       ├── audit.py                 # /audit, /artifacts/{id}
│       │       ├── tasks.py                 # /tasks CRUD + cancel/pause/resume
│       │       ├── approvals.py             # /approvals + approve/deny
│       │       ├── schedules.py             # /schedules CRUD
│       │       ├── watchers.py              # /watchers
│       │       ├── rules.py                 # /rules
│       │       ├── integrations.py          # /integrations
│       │       ├── controls.py              # /controls/pause, resume, autonomy
│       │       └── alarms.py                # /alarms + ack/resolve
│       │
│       └── workers/                         # Milestone 7 — skeleton only
│           ├── __init__.py
│           └── manager.py
│
└── tests/
    ├── unit/
    ├── integration/
    └── conftest.py

Justification for changes from the spec

Flat src/syris/ instead of core/syris_core/src/syris_core/. The nested monorepo structure adds complexity for a single-developer project with no current extraction need. A single pyproject.toml at the root is sufficient. The extraction seam is the module boundary, not the package nesting.

routing/ promoted to top-level peer. pipeline/router.py is a thin orchestrator that calls into routing/ functions in priority order. The routing internals (filters, fastpath, rules eval, LLM fallback) are a distinct concern from pipeline stage orchestration. Making them a peer package makes the dependency direction explicit: pipeline depends on routing, not the other way around.

watchers/, rules/, scheduler/ as top-level peers. Each has its own loop, its own state model, and its own lifecycle. Nesting them under scheduler/ suggests a containment relationship that doesn't exist — the scheduler doesn't manage watchers or rules.

mcp/ promoted from integrations/mcp/. MCP is a specific, well-defined integration mechanism with four components. The integrations/ package in the spec also contained inbound/ and outbound/ base classes, which are thin ABCs. I've moved the ABCs to adapters/ (clearer name) and given MCP its own top-level package, since it will grow.

tools/adapter.py renamed to tools/base.py. Avoids confusion with mcp/adapter.py. Both define abstract base classes, but base.py is the conventional name.

Added config.py. The spec implies configuration exists (poll intervals, thresholds, quiet hours, etc.) but never shows where it lives. A single config.py with Pydantic BaseSettings is the right place — it reads from env vars and .env files, validates at startup, and is injectable everywhere.


2. Core Schemas

All schemas are Pydantic v2. Changes from the spec are marked [ADDED], [CHANGED], or [DROPPED].

ErrorDetail [ADDED]

Referenced by Task.error, Step.error, and ToolResult.error but never defined in the spec.

ErrorDetail:
  code:       str          — Machine-readable error code (e.g. "tool.timeout",
                             "scope.violation", "gate.denied")
  message:    str          — Human-readable description
  retryable:  bool         — Whether the caller should retry
  details:    dict | None  — Optional structured context; MUST NOT contain secrets

MessageEvent

Preserved exactly as specified. No changes needed.

MessageEvent:
  event_id:              UUID       — Generated at ingestion; stable identity
  schema_version:        str        — "1.0"; bump for breaking changes

  occurred_at:           datetime   — Source timestamp (UTC)
  ingested_at:           datetime   — Set by normaliser; used for latency metrics

  source:
    channel:             ChannelEnum  — email | sms | webhook | ha_event |
                                        scheduler | rule_engine | watcher | vision
    connector_id:        str          — Which integration instance
    thread_id:           str | None   — Native thread/conversation ID
    message_id:          str | None   — Native message ID for dedup

  actor:
    actor_type:          ActorType  — user | system | integration
    actor_id:            str        — Stable identity string

  content:
    text:                str | None
    structured:          dict       — Parsed payload or intent hints
    attachment_refs:     list[str]  — IDs of stored attachments; never inline bytes
    links:               list[str]

  context:
    timezone:            str        — IANA timezone string
    locale:              str        — BCP-47
    device_id:           str | None

  correlation:
    trace_id:            UUID       — Created at ingestion; NEVER changes
    parent_event_id:     UUID | None — Set when triggered by another event
    dedupe_key:          str | None  — Derived by normaliser from stable hash
                                       SHA-256(channel + connector_id + message_id)

  security:
    sensitivity:         Sensitivity  — low | med | high
    redaction_policy_id: str

RoutingDecision

Preserved exactly as specified.

RoutingDecision:
  trace_id:              UUID
  event_id:              UUID
  decided_at:            datetime

  execution_mode:        ExecutionMode  — fast | task | gated | sandbox

  match:
    matched_fastpath:    str | None     — e.g. "timer.set"
    matched_rule_ids:    list[str]
    used_llm:            bool

  intent:                str | None     — Normalised intent label
  confidence:            float | None   — None for deterministic matches

  required_scopes:       list[str]
  risk_level:            RiskLevel      — low | medium | high | critical

  gates:                 list[GateSpec]
  notes:                 str | None     — Human-readable routing explanation

GateSpec:
  gate_type:             GateType       — approval | dryrun | scope_check
  reason:                str
  expires_in_seconds:    int

Task

Preserved as specified.

Task:
  task_id:                  UUID
  created_at:               datetime
  updated_at:               datetime
  status:                   TaskStatus   — pending | running | paused |
                                           succeeded | failed | canceled
  trigger_event_id:         UUID
  trace_id:                 UUID
  current_step_id:          UUID | None
  autonomy_level_at_start:  AutonomyLevel  — Snapshot; never changes mid-task
  labels:                   list[str]
  next_wake_time:           datetime | None
  cancel_reason:            str | None
  error:                    ErrorDetail | None

Step

Preserved as specified.

Step:
  step_id:               UUID
  task_id:               UUID
  name:                  str
  status:                StepStatus  — pending | running | succeeded | failed | paused
  attempt:               int         — 0-indexed
  max_attempts:          int

  retry_policy:          RetryPolicy

  input:                 dict        — Immutable at creation
  checkpoint:            dict        — Mutable; written mid-step for resumability
  output:                dict | None — Written on success

  idempotency_key:       str | None  — Required for effectful steps;
                                       enforced by tools/executor.py, not schema

  started_at:            datetime | None
  ended_at:              datetime | None
  error:                 ErrorDetail | None

RetryPolicy:
  strategy:              RetryStrategy  — exponential | fixed | none
  base_delay_ms:         int
  max_delay_ms:          int
  jitter:                bool           — Always true for exponential

ToolCall + ToolResult

Preserved as specified.

ToolCall:
  tool_call_id:          UUID
  created_at:            datetime
  trace_id:              UUID
  task_id:               UUID | None    — None for fast-lane calls
  step_id:               UUID | None
  event_id:              UUID | None
  connector_id:          str
  tool_name:             str
  action:                str
  request_hash:          str            — SHA-256 of REDACTED request
  idempotency_key:       str            — REQUIRED; no exceptions
  status:                ToolCallStatus — attempted | succeeded | failed | unknown
  latency_ms:            int | None
  risk_level:            RiskLevel
  autonomy_level:        AutonomyLevel

ToolResult:
  tool_call_id:          UUID
  status:                ResultStatus   — succeeded | failed
  response_hash:         str            — SHA-256; payload stored in artifact store
  error:                 ErrorDetail | None
  resolved_at:           datetime

AuditEvent

Preserved as specified.

AuditEvent:
  audit_id:              UUID
  timestamp:             datetime       — UTC; set by AuditWriter, never by caller
  trace_id:              UUID           — Required; positional arg to AuditWriter

  stage:                 PipelineStage  — normalize | route | execute | tool_call |
                                          gate | operator | scheduler | watcher | rule

  refs:
    event_id:            UUID | None
    task_id:             UUID | None
    step_id:             UUID | None
    tool_call_id:        UUID | None
    approval_id:         UUID | None

  type:                  str            — e.g. "tool_call.succeeded"
  summary:               str            — Human-readable; indexed for search
  outcome:               AuditOutcome   — success | failure | suppressed | info
  latency_ms:            int | None

  tool_name:             str | None
  connector_id:          str | None
  risk_level:            RiskLevel | None
  autonomy_level:        AutonomyLevel | None
  payload_ref:           str | None     — Artifact store ID; never inline payloads

Approval

Preserved as specified.

Approval:
  approval_id:           UUID
  created_at:            datetime
  expires_at:            datetime
  status:                ApprovalStatus — pending | approved | denied | expired
  trace_id:              UUID

  refs:
    event_id:            UUID | None
    task_id:             UUID | None
    step_id:             UUID | None

  risk_level:            RiskLevel
  autonomy_level:        AutonomyLevel  — Snapshot at gate decision time

  what:                  dict           — Exact serialised action payload
  why:                   str            — Human-readable gate reason
  how_to_approve:        str            — e.g. "POST /approvals/{id}/approve"

  decision:
    by:                  str | None     — "operator"
    at:                  datetime | None
    reason:              str | None

Enums (in common.py)

ChannelEnum:     email | sms | webhook | ha_event | scheduler | rule_engine | watcher | vision
ActorType:       user | system | integration
Sensitivity:     low | med | high
RiskLevel:       low | medium | high | critical
AutonomyLevel:   A0 | A1 | A2 | A3 | A4
TaskStatus:      pending | running | paused | succeeded | failed | canceled
StepStatus:      pending | running | succeeded | failed | paused
ToolCallStatus:  attempted | succeeded | failed | unknown
ResultStatus:    succeeded | failed
ApprovalStatus:  pending | approved | denied | expired
AuditOutcome:    success | failure | suppressed | info
PipelineStage:   normalize | route | execute | tool_call | gate | operator |
                 scheduler | watcher | rule
ExecutionMode:   fast | task | gated | sandbox
GateType:        approval | dryrun | scope_check
GateAction:      ALLOW | CONFIRM | PREVIEW | HARD_BLOCK
HealthStatus:    healthy | degraded | unavailable | down
RetryStrategy:   exponential | fixed | none
ProviderType:    native | mcp

3. Pipeline Design

Three-stage separation

Each stage is a function that receives typed input and returns typed output. No shared mutable state between stages.

normalise(raw_payload, channel, connector_id, session) → MessageEvent
route(event: MessageEvent, session) → RoutingDecision
execute(event: MessageEvent, decision: RoutingDecision, session) → None

pipeline/runner.py is a thin loop:

async def process_one(raw_payload, channel, connector_id):
    async with session_scope() as session:
        event = normalise(raw_payload, channel, connector_id, session)
        decision = route(event, session)
        execute(event, decision, session)

Each stage commits its own writes within the session. If any stage fails, the session rolls back.

For the fast lane, the entire pipeline runs in a single request-response cycle. For the task lane, execute() only creates the Task and its Steps — the task engine loop picks it up separately.

Data flow between stages

Normalise produces a MessageEvent with a fresh trace_id (UUID4). It computes dedupe_key from SHA-256(channel + connector_id + message_id) and attempts an INSERT ... ON CONFLICT (dedupe_key) DO NOTHING. If the insert is a no-op, the event is a duplicate: emit event.deduped and return early. Otherwise, persist the event and emit event.ingested. The MessageEvent is returned as a Python object — no serialization boundary between stages.

Route receives the MessageEvent in-memory and calls into routing/ layers in strict priority order:

  1. filters.py — hard filters (quiet-hours check, spam detection). If filtered, return a RoutingDecision with execution_mode reflecting suppression.
  2. fastpath.py — deterministic intent matching via a DSL of patterns. If matched, return immediately with confidence = None, used_llm = False.
  3. rules_eval.py — evaluate all enabled rules against the event. If matched, dispatch rule actions. Return with matched_rule_ids populated.
  4. llm_fallback.py — LLM-based routing. Only reached if nothing above matched. Return with used_llm = True, confidence set.

First match wins. Route persists the RoutingDecision and emits routing.decided.

Execute receives both MessageEvent and RoutingDecision and dispatches based on execution_mode:

  • fast → call tools/executor.py directly in the same request cycle.
  • task → call tasks/engine.py to create a Task with Steps. The engine loop picks it up.
  • gated → call safety/gates.py to create an Approval. Emit gate.required.
  • sandbox → (Milestone 7) submit to workers/manager.py.

Audit event emission points

normaliser.py           → event.ingested | event.deduped
router.py               → routing.decided
tools/executor.py       → tool_call.attempted | tool_call.succeeded |
                           tool_call.failed | tool_call.deduped | tool_call.unknown
tasks/engine.py         → task.step_started | task.step_completed |
                           task.step_failed | task.step_unexpected_error
safety/gates.py         → gate.required | gate.approved | gate.denied |
                           gate.antiflap_block | gate.storm_block
scheduler/loop.py       → schedule.fired | schedule.skipped | schedule.missed
watchers/loop.py        → watcher.tick | watcher.error
rules/engine.py         → rule.triggered | rule.suppressed
api/routes/controls.py  → operator.action.*

How trace_id threads through

trace_id is generated once in normaliser.py as uuid4(). It is set on the MessageEvent and then passed explicitly as a parameter to every downstream function. AuditWriter.emit() takes trace_id as a required positional argument — omission is a type error, not a silent runtime gap.

For child events (emitted by rules or scheduler), trace_id is inherited from the parent event, and parent_event_id is set to the parent's event_id. This creates a tree of events sharing one trace.


4. Task Engine Design

Resumable checkpointed tasks

A Task is a sequence of Steps. Each Step has:

  • input: dict — immutable, set at creation.
  • checkpoint: dict — mutable, written mid-step for resumability.
  • output: dict | None — written on success.

The checkpoint is the resumability mechanism. A step writes to its checkpoint before any effectful call, so that on restart it can determine what has already happened.

Step runner pattern (pseudocode):

def run_step(task, step, session):
    # 1. Load checkpoint
    checkpoint = step.checkpoint

    # 2. If checkpoint says "about to call tool X with key K":
    if checkpoint.get("phase") == "calling_tool":
        key = checkpoint["idempotency_key"]
        outcome = idempotency_store.get(key)
        if outcome:
            # Tool already ran — skip to post-processing
            return process_outcome(outcome)
        else:
            # Unknown outcome — this is a crash recovery case
            # Reset step, increment attempt, new key
            return StepOutcome("retry")

    # 3. Do pre-work (compute parameters, validate inputs)
    params = compute_params(step.input, checkpoint)

    # 4. Write checkpoint before effectful call
    step.checkpoint = {"phase": "calling_tool",
                       "idempotency_key": generate_key(task, step)}
    session.flush()  # Persist checkpoint within transaction

    # 5. Call tool via tools/executor.py
    result = tool_executor.call(tool_name, action, params, context)

    # 6. Write post-tool checkpoint
    step.checkpoint = {"phase": "post_tool", "result_hash": result.response_hash}
    session.flush()

    # 7. Do post-work, return outcome
    return StepOutcome("succeeded", output=result)

State machines

Task state machine

           CREATE
              │
              ▼
          [pending] ──► [running] ──────────────► [succeeded]
                            │
                            ├── operator pause ──► [paused]
                            │        └── resume ──────────────► [running]
                            │
                            ├── operator cancel ──────────────► [canceled]
                            │
                            └── retries exhausted ────────────► [failed]

Terminal states: succeeded, failed, canceled. No transitions out of terminal states.

Step state machine

[pending] ─► [running] ─► [succeeded]
                 │
                 ├── retryable error ─► [pending]  (attempt++)
                 ├── non-retryable  ─► [failed]
                 └── operator pause ─► [paused] ─► [running]

Steps have no canceled state. Task cancellation abandons the current step in place.

Enforcement: A pure function validate_transition(current: Status, target: Status) -> bool in tasks/state.py is called by the repo layer before every status update. Invalid transitions raise IllegalStateTransition and are never persisted. This means the state machine can be tested without a database.

Engine loop

The task engine runs on a configurable poll interval (default: 1 second):

SELECT * FROM tasks
WHERE status = 'running'
  AND (next_wake_time IS NULL OR next_wake_time <= now())
LIMIT :batch_size
FOR UPDATE SKIP LOCKED

For each claimed task:

  1. Load the current step.
  2. Call step_runner.run(task, step).
  3. Based on outcome:
    • succeeded → advance current_step_id to next step, or mark task succeeded if no more steps.
    • retry → increment attempt, compute backoff delay, set next_wake_time.
    • failed → mark task failed.
    • waiting → set task.next_wake_time to the wait-until timestamp.
    • paused → mark step paused.
  4. Each step execution and outcome is persisted in a single DB transaction.

Approval-wait loop

A separate loop runs every 5 seconds to check tasks waiting for approval:

SELECT tasks WHERE status = 'running'
  AND current step checkpoint contains approval_id

For each, load the Approval:

  • approved → resume step execution.
  • deniedStepOutcome("failed", error="approval denied").
  • expired → per config: fail step, or create new Approval and re-notify.
  • pending → wait for next loop iteration.

Idempotency key generation

idempotency_key = SHA-256(task_id || step_id || attempt || action || stable_request_hash)

Including attempt ensures that each retry produces a new key. This prevents a stored unknown outcome from blocking a legitimate retry. The stable_request_hash is the SHA-256 of the canonicalized (sorted keys), redacted (no secrets) request body.

Keys are stored in idempotency_outcomes:

idempotency_outcomes:
  idempotency_key:  str     PK
  tool_call_id:     str
  status:           str     — succeeded | failed | unknown
  response_hash:    str | None
  resolved_at:      datetime | None

Crash recovery

tasks/recovery.py runs at startup, before any loops start:

  1. Reconcile running tasks. Load all tasks with status = running. For each, find the current step. If step.status = running:

    • Check idempotency_outcomes for the step's key.
    • Outcome found → apply it (mark step succeeded/failed).
    • No outcome → tool call either never happened or outcome was lost. Store unknown, reset step to pending, increment attempt, emit tool_call.unknown.
    • Never assume success from absence of failure.
  2. Restore pending approvals. Load all Approval records with status = pending. No action needed — the approval-wait loop will pick them up.

  3. Recover scheduler. For each enabled schedule, recompute next_run_at from last_run_at + spec. Apply catch_up_policy for missed firings.

  4. Recover watchers. Load all watcher_state rows. Reset consecutive_errors only if healthy before crash.

  5. Start all loops. Pipeline, task engine, scheduler, watcher, approval-wait.


5. Tool Runtime + MCP Adapter Design

Tool Registry

ToolRegistry is an in-memory dictionary: tool_name → (BaseTool, RegisteredTool). Populated at startup from DB records and updated when MCP providers sync. The DB is the source of truth; the registry is a read cache.

ToolRegistry:
  register(tool: BaseTool, config: RegisteredTool) → None
  get(tool_name: str) → tuple[BaseTool, RegisteredTool] | None
  list_healthy() → list[RegisteredTool]
  update_health(tool_name: str, health: ToolHealth) → None
  unregister(tool_name: str) → None

BaseTool interface

class BaseTool(ABC):
    @property
    def tool_name(self) -> str: ...

    @property
    def capabilities(self) -> list[str]: ...

    @property
    def scopes_required(self) -> list[str]: ...

    def execute(self, action, request, context: ToolCallContext) -> ToolResult: ...

    def preview(self, action, request, context: ToolCallContext) -> PreviewResult | None: ...

ToolCallContext carries: trace_id, task_id, step_id, idempotency_key, autonomy_level, granted_scopes. The tool implementation reads nothing from system state directly — everything arrives via context.

Tool executor flow

tools/executor.pycall(tool_name, action, request, context):

  1. Registry lookup. Fail fast if tool not found or health.status = unavailable.

  2. Scope check. Assert tool.scopes_required ⊆ context.granted_scopes. Failure raises ScopeViolation (non-retryable). Emit audit.

  3. Risk classification. Start from tool.risk_map.get(action, tool.risk_default). Apply adjusters (each can only increase risk by one level):

    • +1 if target is broadcast/group channel
    • +1 if action is destructive (delete, wipe, reset)
    • +1 if blast_radius > threshold
    • +1 if current time within quiet hours
    • Final risk = min(base + sum(adjusters), CRITICAL)
  4. Hard safety overrides. Evaluated before the gate matrix; can only increase gate strictness:

    • Secrets scope required → CONFIRM always
    • Quiet hours + medium/high risk → CONFIRM
    • Anti-flap (same tool+action+target within cooldown) → BLOCK
    • Notification storm (outbound count/hour > max) → BLOCK
  5. Gate matrix lookup. (autonomy_level, final_risk) → ALLOW | CONFIRM | PREVIEW | HARD_BLOCK:

    LOWMEDIUMHIGHCRITICAL
    A0PREVIEWPREVIEWPREVIEWPREVIEW
    A1CONFIRMCONFIRMCONFIRMHARD BLOCK
    A2ALLOWCONFIRMCONFIRMHARD BLOCK
    A3ALLOWALLOWCONFIRMHARD BLOCK
    A4ALLOWALLOWALLOWCONFIRM

    If CONFIRM: create Approval, emit gate.required, raise ApprovalRequired. If PREVIEW: call tool.preview(), raise PreviewRequired. If HARD_BLOCK: emit audit, raise HardBlocked.

  6. Idempotency check. Query idempotency_outcomes for context.idempotency_key. If found with succeeded or failed: emit tool_call.deduped, return cached result. If found with unknown: caller handles (typically retry with new attempt/key).

  7. Execute. Emit tool_call.attempted. Call tool.execute():

    • Success: write outcome to idempotency_outcomes, emit tool_call.succeeded.
    • Retryable error: emit tool_call.failed.
    • Transport failure (outcome unknown): write unknown to idempotency_outcomes, emit tool_call.unknown.

MCPProvider + MCPToolAdapter

MCPConnectionManager (mcp/connection.py):

  • Persistent connection to one MCP server.
  • Reconnects with exponential backoff on disconnect.
  • Caches tool list; refreshes on reconnect or explicit sync.
  • Emits audit events on all lifecycle events (mcp.connected, mcp.disconnected, mcp.tools_synced).

MCPProvider (mcp/provider.py):

  • Connects via MCPConnectionManager, discovers tools.
  • For each discovered tool, creates an MCPToolAdapter and registers it in ToolRegistry with provider_type = mcp.
  • Applies TrustPolicy to assign risk levels, scopes, and idempotency contracts.
  • Handles tool list drift: adds new tools, removes stale entries on reconnect.

MCPToolAdapter (mcp/adapter.py):

  • Implements BaseTool for a single MCP tool.
  • execute(): calls MCPConnectionManager.call(tool_name, args), translates response to ToolResult.
  • On transport failure mid-call: stores unknown in idempotency_outcomes rather than assuming success or failure.

TrustPolicy (mcp/trust.py):

TrustPolicy:
  risk_override:          RiskLevel | None
  scopes_mapping:         dict[str, list[str]]    — MCP tool name → SYRIS scopes
  idempotency_contract:   IdempotencyContract     — none | provider_managed | syris_managed
  supports_preview:       bool

MCP annotations are untrusted by default. risk_override is authoritative. All MCP tools default to high risk until explicitly allowlisted.

Secrets boundary

SecretsStore is a protocol:

class SecretsStore(Protocol):
    def get_secret(self, connector_id: str, key: str) -> str: ...

Injected into tool adapters at construction. Adapters call it inside execute() and use the value within function scope. The value must not outlive the function call — it must never be stored on self, in checkpoints, in logs, or in audit payloads.

config_ref on RegisteredTool points to a DB reference (connector_id + key name), not the credential value. The value is fetched at execution time only.

Initial implementation: Fernet-encrypted local file, encryption key from env var or OS keyring. Swappable to Vault or OS keyring later — callers depend only on the protocol.


6. Scheduler + Watchers + Rules Engine

Fundamental principle

All three are event sources. They emit MessageEvents that enter the pipeline at the normalisation stage, exactly like any inbound adapter. They do not bypass the pipeline.

Scheduler tick  → MessageEvent(channel="scheduler")    → normalise → route → execute
Watcher tick    → MessageEvent(channel="watcher")      → normalise → route → execute
Rule trigger    → MessageEvent(channel="rule_engine")   → normalise → route → execute

This means all proactive actions get full pipeline treatment: dedup, routing, safety gating, audit.

Scheduler

The scheduler loop runs every 5 seconds:

SELECT * FROM schedules
WHERE enabled = 1 AND next_run_at <= now()
FOR UPDATE SKIP LOCKED

For each due schedule:

  1. Check quiet hours policy. If suppressed: emit schedule.skipped, advance next_run_at, commit.
  2. Apply catch_up_policy if last_run_at is stale:
    • skip: advance to next future slot, emit schedule.missed per skipped slot.
    • run_once: fire once for the entire missed window, note slot count in audit.
    • run_all_capped: fire up to N times, skip remainder.
  3. Emit MessageEvent from the schedule's payload template.
  4. Update last_run_at, compute new next_run_at.
  5. Commit atomically.

Schedule types: cron (cron expression), interval (every N seconds), one_shot (ISO-8601 datetime, auto-disables after firing).

Watchers

The watcher loop iterates registered watchers based on their tick_interval_seconds:

  1. Load WatcherState from DB.
  2. Check if due: now - last_tick_at >= tick_interval_seconds.
  3. Apply global throttle: if total watcher ticks in the last minute exceed system-wide max, skip and emit watcher.suppressed.
  4. Call tick(now, state).
  5. Persist updated WatcherState and emit any returned MessageEvents — in a single transaction.
  6. On exception: increment consecutive_errors, emit watcher.error, raise alarm if threshold exceeded.

The watcher's dedupe_window: dict is opaque to the loop — each watcher manages its own dedup strategy within it (e.g. last-seen device states, last inbox message ID).

HeartbeatWatcher is the reference implementation. On each tick it writes a SystemHealth record to the DB with status, uptime, version, and next_expected_at. It does not emit MessageEvents.

Rules engine

Rules are evaluated by the router as part of the routing stage — not as a separate loop. When a MessageEvent arrives:

  1. routing/rules_eval.py loads the rule cache (in-memory, invalidated on CRUD or every 5 minutes).
  2. For each enabled rule, evaluate conditions against a flattened view of the MessageEvent.
  3. If matched, check suppression policies before dispatching:
    • Debounce: if now - last_fired_at < debounce_ms, suppress (rule.suppressed, reason: debounce).
    • Dedupe: if dedupe_key matches stored last_dedupe_key within dedupe_window_ms, suppress.
    • Quiet hours: if configured quiet hours policy is active, suppress.
  4. If not suppressed, dispatch actions: EmitEventAction, StartTaskAction, or NotifyAction.
  5. Emit rule.triggered or rule.suppressed audit event.

Condition DSL: JSON structures with logical combinators (all, any, none), field comparison operators (eq, neq, in, contains, matches using glob only — no regex), and system-state conditions (time windows, autonomy level checks). Safety limits: max nesting depth 5, max conditions 20, 10ms evaluation timeout.

Quiet hours storage

The spec references quiet hours policies but doesn't define storage. Add a quiet_hours_policies table:

quiet_hours_policies:
  policy_id:     str     PK
  name:          str
  start_time:    str     — HH:MM
  end_time:      str     — HH:MM
  timezone:      str     — IANA
  days:          str     — JSON list: ["Mon","Tue","Wed","Thu","Fri"]
  created_at:    datetime
  updated_at:    datetime

Referenced by schedules.quiet_hours_policy_id and rules.quiet_hours_policy_id.


7. Observability + Projections

Synchronous projection model

Projections are updated in the same DB transaction as the primary record write. This guarantees consistency with zero lag.

# Called inside the transaction that writes a Task status update:
projections.on_task_status_changed(session, task)

# Called inside the transaction that writes an AuditEvent:
projections.on_audit_event(session, event)

# Called inside the transaction that writes an Approval:
projections.on_approval_changed(session, approval)

If the projection update fails, the entire transaction rolls back — including the primary record. The projection is always consistent with the source data.

Projection tables

TableContains
proj_active_tasksRunning and paused tasks with current step, status, wake time
proj_schedulesEnabled schedules with next_run, last_run, missed-run count
proj_watchersWatchers with enabled, last_tick, outcome, suppression count
proj_rulesRules with enabled, hit count, last suppression reason
proj_integrationsIntegration health, auth status, last success, last error
proj_approvalsPending approvals with context and expiry
proj_autonomy_historyCurrent autonomy level and change history
proj_alarmsOpen, acked, and recently resolved alarms

Deferred: proj_queue_depth (requires a "tool queue" concept that doesn't exist yet).

The API layer reads from projection tables, not from audit_events or raw entity tables, for all dashboard queries.

Extraction seam

The synchronous model is the correct starting point. The seam for async is explicit:

  • projections.py functions take a session argument and are pure with respect to the DB.
  • Callers don't know if projection updates are sync or async.
  • Migration path: replace function bodies with a queue publish; a projector process consumes and writes the same tables.
  • Callers don't change. Tables don't change. Only the write path changes.

Minimum viable dashboard API

EndpointReads fromAnswers
GET /healthsystem_healthIs SYRIS alive?
GET /stateMultiple proj_*What is SYRIS doing right now?
GET /audit?trace_id=Xaudit_eventsWhat happened for this event?
GET /tasks?status=runningproj_active_tasksWhat's in flight?
GET /approvals?status=pendingproj_approvalsWhat needs my attention?
GET /alarms?status=openproj_alarmsWhat's broken?
GET /integrationsproj_integrationsAre my tools healthy?

Build these first. Add the rest as the dashboard demands them.

Health model

HeartbeatWatcher writes to system_health every 30 seconds:

SystemHealth:
  status:               HealthStatus — healthy | degraded | down
  uptime_seconds:       int
  version:              str
  restart_reason:       str | None
  last_heartbeat_at:    datetime
  next_expected_at:     datetime     — if now > this + grace, heartbeat missed
  degraded_subsystems:  list[str]    — names of unhealthy subsystems

Alarm system: Alarms are persisted entities with lifecycle open → acked → resolved. They use dedupe_key to prevent storms: if an open alarm exists for the same key, no duplicate is created.

Trigger conditions (from the spec):

TriggerConditionSeverity
Missed heartbeatnow > next_expected_at + grace_periodcritical
Integration auth failedConsecutive auth errors ≥ thresholderror
Integration auth expiringCredential expiry within warning windowwarning
Repeated tool errorstool.consecutive_errors ≥ thresholderror
Stuck taskstatus=running AND updated_at < now - thresholdwarning
Rule stormtriggered_count_per_minute > thresholdwarning
Schedule backlognext_run_at < now - threshold AND enabledwarning
Notification stormOutbound notifications/hour > maxwarning
Secret in audit payloadRedaction miss detectedcritical

8. Implementation Sequencing

Milestone 0: Skeleton

Timeline: Days 1–2

Deliverables:

  • Repo structure, pyproject.toml with dependencies (fastapi, uvicorn, sqlalchemy, alembic, pydantic)
  • config.py with typed settings
  • PostgreSQL database + Alembic migration 0001: audit_events, system_health tables
  • AuditWriter implemented and unit-tested
  • GET /health returns hardcoded heartbeat JSON
  • GET /audit returns audit events (empty list)

Done when: uv run python -m syris.main starts. Hit /health and get a JSON response. Write an audit event via test helper. See it at GET /audit. All unit-tested.

Why first: AuditWriter is the most load-bearing infrastructure. Every subsequent milestone depends on it.

Milestone 1: Pipeline + Fast Lane

Timeline: Week 1

Deliverables:

  • schemas/ package: MessageEvent, RoutingDecision, AuditEvent (Pydantic v2)
  • storage/models.py + repos for events, audit, routing_decisions
  • normaliser.py: accepts raw dict + channel, returns MessageEvent, persists + audits
  • router.py: hard filter + one fast path intent (e.g. timer.set), returns RoutingDecision
  • pipeline/executor.py: fast lane only, calls tool executor
  • tools/executor.py: scope check + idempotency + NoopTool + audit
  • GET /events, GET /audit, GET /audit?trace_id=X

Done when: POST a raw event → full trace at /audit?trace_id=X shows exactly 4 audit events: event.ingested, routing.decided, tool_call.attempted, tool_call.succeeded. POST same event again → event.deduped. Zero log-spelunking required.

Milestone 2: Task Engine

Timeline: Week 2

Deliverables:

  • Task/Step schemas + repos
  • tasks/engine.py: claim → execute → checkpoint loop with FOR UPDATE SKIP LOCKED
  • tasks/step_runner.py: run step, handle retries, write checkpoint
  • tasks/state.py: state machine enforcement with validate_transition()
  • tasks/recovery.py: startup reconciliation
  • GET /tasks, GET /tasks/{id}, POST /tasks/{id}/cancel|pause|resume

Done when: Create a 3-step task with NoopTool at step 2. Kill process mid-step-2. Restart. Task resumes from step 2 (not step 1). Audit shows the interruption and recovery. Run 10 times — no duplicated side effects.

Milestone 3: Safety Layer + Approvals

Timeline: Week 3

Deliverables:

  • safety/autonomy.py: read/write current level, persist history
  • safety/risk.py: classify tool action → risk level with adjusters
  • safety/gates.py: gate matrix + hard safety overrides
  • safety/dryrun.py: preview protocol
  • Approval schema + repo
  • GET /approvals, POST /approvals/{id}/approve|deny
  • POST /controls/autonomy

Done when: Set autonomy to A1. Trigger medium-risk tool call. Approval created at /approvals, tool not executed, gate.required in audit. Approve via API. Tool executes. gate.approved + tool_call.succeeded in audit. Full trace queryable.

Milestone 4: Scheduler + Watchers

Timeline: Week 4

Deliverables:

  • scheduler/loop.py: cron + interval + one-shot
  • Schedule repo + CRUD API
  • watchers/loop.py + watchers/base.py + HeartbeatWatcher
  • Watcher state repo + API
  • GET /health now uses real heartbeat data
  • GET /schedules, POST /schedules, PATCH /schedules/{id}
  • GET /watchers, PATCH /watchers/{id}

Done when: Create 30-second interval schedule. Observe schedule.fired in audit every ~30s. Heartbeat appears at /health with real uptime. Disable watcher via API → confirm it stops ticking. Kill process, restart → schedule catches up per policy.

Milestone 5: Rules Engine

Timeline: Week 5

Deliverables:

  • Condition evaluator: field comparison + logical combinators + system-state
  • Rule CRUD + in-memory caching with TTL invalidation
  • Debounce + dedupe tracking on rule records
  • Quiet hours enforcement (add quiet_hours_policies table)
  • Action dispatch: EmitEventAction, StartTaskAction, NotifyAction
  • GET /rules, PATCH /rules/{id}

Done when: Rule matching source.channel == "ha_event" fires → emits child event with parent_event_id set → both in audit with same trace_id. Same event fired 5× in 1 second with 10s debounce → 1 rule.triggered + 4 rule.suppressed in audit.

Milestone 6: MCP Integration

Timeline: Weeks 6–7

Deliverables:

  • mcp/connection.py: MCPConnectionManager with reconnect + backoff
  • mcp/provider.py: tool discovery → registry sync
  • mcp/adapter.py: MCPToolAdapter implementing BaseTool
  • mcp/trust.py: TrustPolicy schema + loader
  • GET /integrations showing MCP server health
  • Connection lifecycle audit events

Done when: Connect a real MCP server. Tools appear in GET /integrations. Execute one tool → full audit trail (scope check, risk, idempotency, gate, result). Disconnect server → health degrades within 30 seconds.

Milestone 7: Worker Skeleton

Timeline: Week 8

Deliverables:

  • Job table in DB
  • workers/manager.py: spawn/progress/cancel via OS process isolation
  • GET /state shows job count

Done when: Submit stub long-running job via API. See it in /state. Cancel it. See cancellation in audit.

Milestone 8: First Real Integration

Timeline: Week 9+

Deliverables:

  • First inbound adapter (e.g. HA webhook receiver or email poller)
  • First outbound tool (e.g. HA service call or email send)
  • Secrets store wired to real credentials
  • Approval flow exercised with a real risky action (e.g. device control)

Done when: A real-world event (HA state change or incoming email) flows through the full pipeline, routes correctly, executes a real tool, and requires/passes approval if risk demands it. SYRIS does something useful in the real world.


Appendix: Key Indexes

For the schema to perform well, these indexes are critical:

events:
  idx_events_trace_id       ON events(trace_id)
  idx_events_ingested_at    ON events(ingested_at)
  idx_events_channel        ON events(channel)
  uq_events_dedupe_key      ON events(dedupe_key) WHERE dedupe_key IS NOT NULL  -- UNIQUE partial

audit_events:
  idx_audit_trace_id        ON audit_events(trace_id)
  idx_audit_timestamp       ON audit_events(timestamp)
  idx_audit_type            ON audit_events(type)
  idx_audit_outcome         ON audit_events(outcome)
  idx_audit_ref_task_id     ON audit_events(ref_task_id) WHERE ref_task_id IS NOT NULL

tasks:
  idx_tasks_status          ON tasks(status)
  idx_tasks_trace_id        ON tasks(trace_id)
  idx_tasks_wake            ON tasks(next_wake_time) WHERE status = 'running'

steps:
  idx_steps_task_id         ON steps(task_id)
  idx_steps_idemp           ON steps(idempotency_key) WHERE idempotency_key IS NOT NULL

tool_calls:
  idx_toolcalls_trace_id    ON tool_calls(trace_id)
  idx_toolcalls_idemp       ON tool_calls(idempotency_key)
  idx_toolcalls_created     ON tool_calls(created_at)

approvals:
  idx_approvals_status      ON approvals(status)
  idx_approvals_trace_id    ON approvals(trace_id)
  idx_approvals_expires     ON approvals(expires_at)

schedules:
  idx_schedules_next_run    ON schedules(next_run_at) WHERE enabled = 1  -- partial

alarms:
  uq_alarms_dedupe_open     ON alarms(dedupe_key) WHERE status = 'open'  -- UNIQUE partial
  idx_alarms_status         ON alarms(status)

Appendix: Dependency Graph

schemas/          ← depends on nothing
storage/          ← depends on schemas/
observability/    ← depends on schemas/, storage/
secrets/          ← depends on nothing
safety/           ← depends on schemas/, storage/, observability/
tools/            ← depends on schemas/, storage/, observability/, safety/, secrets/
routing/          ← depends on schemas/, rules/
pipeline/         ← depends on schemas/, routing/, tools/, tasks/, safety/, observability/
tasks/            ← depends on schemas/, storage/, tools/, observability/
scheduler/        ← depends on schemas/, storage/, pipeline/, observability/
watchers/         ← depends on schemas/, storage/, pipeline/, observability/
rules/            ← depends on schemas/, storage/
mcp/              ← depends on schemas/, tools/, storage/, observability/
adapters/         ← depends on schemas/, secrets/
api/              ← depends on everything (thin layer)

No circular dependencies. Each module has a clear dependency direction pointing downward toward schemas/ and storage/.