Author: Architecture review session - Claude Opus 4.6
Date: 2026-03-23
Spec source: https://docs.syris.uk
Status: Ready to build from
SYRIS is a single-user, always-on automation control plane that ingests stimuli from the world, normalises them into versioned MessageEvents, routes them deterministically (rules-first, LLM fallback), and executes actions through a safety-gated, idempotent tool runtime. Everything is traceable end-to-end via trace_id, queryable via API, and designed to survive crashes without duplicating side effects. It is a modular monolith — not a chatbot, not a microservice platform.
Before designing, these are the problems I found in the spec and the decisions I made.
The spec says you should call get_secret() inline and never assign the return value to a local variable. This means you can't reuse a credential across two calls in the same execute() without fetching it twice. Any adapter needing to construct a client (e.g. OAuth headers + refresh) will fight this.
Design decision: Relax to "the value must not outlive execute(); local variables within the function body are acceptable." The invariant that matters is "never persisted, never logged, never on self" — not "never in a local binding."
ToolResult is joined to traces via tool_calls.trace_id. Querying "all results for a trace" requires a join. This is fine architecturally but needs a composite index on tool_calls(trace_id, tool_call_id) to make it fast.
The component map shows pipeline/ as thin stage orchestrators and routing/ as a separate concern, but routing/ appears nested under pipeline/ in the module tree. These should be explicit peers.
Design decision: routing/ is a top-level package, peer to pipeline/. pipeline/router.py delegates to routing/ functions.
The spec nests watchers/ and rules/ under scheduler/. These are three distinct subsystems that happen to be "proactive."
Design decision: Promote watchers/, rules/, and scheduler/ to top-level peers.
The pipeline mentions a sandbox lane routing to workers/manager.py, but no spec page defines what distinguishes sandbox from task. Milestone 7 gives a skeleton, but the routing decision isn't specified.
Design decision: The routing decision supports the sandbox enum value, but no code is needed until Milestone 7.
Task.error and Step.error reference ErrorDetail | None but the data contracts page never defines the schema.
Design decision: Added. See section 2.
It requires knowing "tool queue depth" — but no tool queue exists as a first-class concept.
Design decision: Defer until the dashboard needs it.
syris/
├── pyproject.toml
├── alembic.ini
├── src/
│ └── syris/
│ ├── __init__.py
│ ├── main.py # Boot: wire all components, start loops
│ ├── config.py # Typed settings (Pydantic BaseSettings)
│ │
│ ├── schemas/ # Pure Pydantic v2 — no DB, no logic
│ │ ├── __init__.py
│ │ ├── common.py # Enums: ChannelEnum, ActorType, RiskLevel,
│ │ │ # AutonomyLevel, Sensitivity, etc.
│ │ │ # Value types: ErrorDetail
│ │ ├── events.py # MessageEvent, RoutingDecision, GateSpec
│ │ ├── tasks.py # Task, Step, RetryPolicy, StepOutcome
│ │ ├── tools.py # ToolCall, ToolResult, ToolCallContext,
│ │ │ # RegisteredTool, ToolHealth, TrustPolicy
│ │ ├── audit.py # AuditEvent
│ │ ├── approvals.py # Approval
│ │ ├── schedules.py # Schedule
│ │ ├── rules.py # Rule, Condition, Action types
│ │ └── health.py # SystemHealth, Alarm
│ │
│ ├── storage/ # DB layer
│ │ ├── __init__.py
│ │ ├── db.py # Engine, session factory, connection config
│ │ ├── models.py # SQLAlchemy ORM models (all tables)
│ │ ├── repos/ # One repo per aggregate root
│ │ │ ├── __init__.py
│ │ │ ├── events.py
│ │ │ ├── audit.py
│ │ │ ├── tasks.py
│ │ │ ├── tools.py # tool_calls, tool_results,
│ │ │ │ # idempotency_outcomes
│ │ │ ├── approvals.py
│ │ │ ├── schedules.py
│ │ │ ├── watchers.py
│ │ │ ├── rules.py
│ │ │ ├── health.py # system_health, alarms
│ │ │ └── projections.py # proj_* tables read/write
│ │ └── migrations/ # Alembic
│ │ ├── env.py
│ │ └── versions/
│ │
│ ├── pipeline/ # The three-stage pipeline
│ │ ├── __init__.py
│ │ ├── runner.py # Main loop: normalise → route → execute
│ │ ├── normaliser.py # Raw payload → MessageEvent + audit
│ │ ├── router.py # Orchestrates routing layers in order
│ │ └── executor.py # Dispatches to fast/task/gated/sandbox
│ │
│ ├── routing/ # Router internals (peer to pipeline/)
│ │ ├── __init__.py
│ │ ├── filters.py # Hard filters: dedup, spam, quiet hours
│ │ ├── fastpath.py # Deterministic intent matching
│ │ ├── rules_eval.py # Condition evaluator for Rule objects
│ │ └── llm_fallback.py # LLM-based routing (last resort)
│ │
│ ├── tasks/ # Task engine
│ │ ├── __init__.py
│ │ ├── engine.py # Claim → execute → checkpoint loop
│ │ ├── step_runner.py # Execute one step, handle outcomes
│ │ ├── state.py # State machine enforcement
│ │ └── recovery.py # Startup reconciliation
│ │
│ ├── tools/ # Tool runtime
│ │ ├── __init__.py
│ │ ├── registry.py # ToolRegistry: register, lookup, health
│ │ ├── executor.py # Scope → risk → gate → idempotency → call
│ │ ├── idempotency.py # Outcome store operations
│ │ ├── base.py # BaseTool ABC
│ │ └── builtin/
│ │ └── noop.py # NoopTool for testing
│ │
│ ├── safety/ # Safety layer
│ │ ├── __init__.py
│ │ ├── autonomy.py # Read/write current level + history
│ │ ├── risk.py # Risk classifier + adjusters
│ │ ├── gates.py # Gate matrix + override evaluation
│ │ └── dryrun.py # Preview protocol
│ │
│ ├── scheduler/ # Schedules only (promoted to peer)
│ │ ├── __init__.py
│ │ └── loop.py # Scheduler tick loop
│ │
│ ├── watchers/ # Promoted to peer
│ │ ├── __init__.py
│ │ ├── loop.py # Watcher tick loop
│ │ ├── base.py # BaseWatcher ABC
│ │ └── heartbeat.py # HeartbeatWatcher
│ │
│ ├── rules/ # Promoted to peer
│ │ ├── __init__.py
│ │ ├── engine.py # Load rules, evaluate, dispatch actions
│ │ └── cache.py # In-memory cache with TTL + invalidation
│ │
│ ├── mcp/ # MCP integration (promoted to peer)
│ │ ├── __init__.py
│ │ ├── connection.py # MCPConnectionManager
│ │ ├── provider.py # MCPProvider: discovery → registry sync
│ │ ├── adapter.py # MCPToolAdapter(BaseTool)
│ │ └── trust.py # TrustPolicy schema + loader
│ │
│ ├── adapters/ # Inbound/outbound adapters
│ │ ├── __init__.py
│ │ ├── inbound/
│ │ │ ├── __init__.py
│ │ │ └── base.py # InboundAdapter ABC
│ │ └── outbound/
│ │ ├── __init__.py
│ │ └── base.py # OutboundAdapter ABC
│ │
│ ├── observability/ # Audit, projections, health
│ │ ├── __init__.py
│ │ ├── audit.py # AuditWriter — sole emit point
│ │ ├── projections.py # Sync projection updaters
│ │ ├── health.py # SystemHealth writer
│ │ └── alarms.py # Alarm creation + dedup + lifecycle
│ │
│ ├── secrets/
│ │ ├── __init__.py
│ │ └── store.py # SecretsStore protocol + Fernet impl
│ │
│ ├── api/ # FastAPI
│ │ ├── __init__.py
│ │ ├── app.py # FastAPI app factory
│ │ ├── deps.py # Dependency injection (session, services)
│ │ └── routes/
│ │ ├── __init__.py
│ │ ├── status.py # /health, /state
│ │ ├── events.py # /events
│ │ ├── audit.py # /audit, /artifacts/{id}
│ │ ├── tasks.py # /tasks CRUD + cancel/pause/resume
│ │ ├── approvals.py # /approvals + approve/deny
│ │ ├── schedules.py # /schedules CRUD
│ │ ├── watchers.py # /watchers
│ │ ├── rules.py # /rules
│ │ ├── integrations.py # /integrations
│ │ ├── controls.py # /controls/pause, resume, autonomy
│ │ └── alarms.py # /alarms + ack/resolve
│ │
│ └── workers/ # Milestone 7 — skeleton only
│ ├── __init__.py
│ └── manager.py
│
└── tests/
├── unit/
├── integration/
└── conftest.py
Flat src/syris/ instead of core/syris_core/src/syris_core/. The nested monorepo structure adds complexity for a single-developer project with no current extraction need. A single pyproject.toml at the root is sufficient. The extraction seam is the module boundary, not the package nesting.
routing/ promoted to top-level peer. pipeline/router.py is a thin orchestrator that calls into routing/ functions in priority order. The routing internals (filters, fastpath, rules eval, LLM fallback) are a distinct concern from pipeline stage orchestration. Making them a peer package makes the dependency direction explicit: pipeline depends on routing, not the other way around.
watchers/, rules/, scheduler/ as top-level peers. Each has its own loop, its own state model, and its own lifecycle. Nesting them under scheduler/ suggests a containment relationship that doesn't exist — the scheduler doesn't manage watchers or rules.
mcp/ promoted from integrations/mcp/. MCP is a specific, well-defined integration mechanism with four components. The integrations/ package in the spec also contained inbound/ and outbound/ base classes, which are thin ABCs. I've moved the ABCs to adapters/ (clearer name) and given MCP its own top-level package, since it will grow.
tools/adapter.py renamed to tools/base.py. Avoids confusion with mcp/adapter.py. Both define abstract base classes, but base.py is the conventional name.
Added config.py. The spec implies configuration exists (poll intervals, thresholds, quiet hours, etc.) but never shows where it lives. A single config.py with Pydantic BaseSettings is the right place — it reads from env vars and .env files, validates at startup, and is injectable everywhere.
All schemas are Pydantic v2. Changes from the spec are marked [ADDED], [CHANGED], or [DROPPED].
Referenced by Task.error, Step.error, and ToolResult.error but never defined in the spec.
ErrorDetail:
code: str — Machine-readable error code (e.g. "tool.timeout",
"scope.violation", "gate.denied")
message: str — Human-readable description
retryable: bool — Whether the caller should retry
details: dict | None — Optional structured context; MUST NOT contain secrets
Preserved exactly as specified. No changes needed.
MessageEvent:
event_id: UUID — Generated at ingestion; stable identity
schema_version: str — "1.0"; bump for breaking changes
occurred_at: datetime — Source timestamp (UTC)
ingested_at: datetime — Set by normaliser; used for latency metrics
source:
channel: ChannelEnum — email | sms | webhook | ha_event |
scheduler | rule_engine | watcher | vision
connector_id: str — Which integration instance
thread_id: str | None — Native thread/conversation ID
message_id: str | None — Native message ID for dedup
actor:
actor_type: ActorType — user | system | integration
actor_id: str — Stable identity string
content:
text: str | None
structured: dict — Parsed payload or intent hints
attachment_refs: list[str] — IDs of stored attachments; never inline bytes
links: list[str]
context:
timezone: str — IANA timezone string
locale: str — BCP-47
device_id: str | None
correlation:
trace_id: UUID — Created at ingestion; NEVER changes
parent_event_id: UUID | None — Set when triggered by another event
dedupe_key: str | None — Derived by normaliser from stable hash
SHA-256(channel + connector_id + message_id)
security:
sensitivity: Sensitivity — low | med | high
redaction_policy_id: str
Preserved exactly as specified.
RoutingDecision:
trace_id: UUID
event_id: UUID
decided_at: datetime
execution_mode: ExecutionMode — fast | task | gated | sandbox
match:
matched_fastpath: str | None — e.g. "timer.set"
matched_rule_ids: list[str]
used_llm: bool
intent: str | None — Normalised intent label
confidence: float | None — None for deterministic matches
required_scopes: list[str]
risk_level: RiskLevel — low | medium | high | critical
gates: list[GateSpec]
notes: str | None — Human-readable routing explanation
GateSpec:
gate_type: GateType — approval | dryrun | scope_check
reason: str
expires_in_seconds: int
Preserved as specified.
Task:
task_id: UUID
created_at: datetime
updated_at: datetime
status: TaskStatus — pending | running | paused |
succeeded | failed | canceled
trigger_event_id: UUID
trace_id: UUID
current_step_id: UUID | None
autonomy_level_at_start: AutonomyLevel — Snapshot; never changes mid-task
labels: list[str]
next_wake_time: datetime | None
cancel_reason: str | None
error: ErrorDetail | None
Preserved as specified.
Step:
step_id: UUID
task_id: UUID
name: str
status: StepStatus — pending | running | succeeded | failed | paused
attempt: int — 0-indexed
max_attempts: int
retry_policy: RetryPolicy
input: dict — Immutable at creation
checkpoint: dict — Mutable; written mid-step for resumability
output: dict | None — Written on success
idempotency_key: str | None — Required for effectful steps;
enforced by tools/executor.py, not schema
started_at: datetime | None
ended_at: datetime | None
error: ErrorDetail | None
RetryPolicy:
strategy: RetryStrategy — exponential | fixed | none
base_delay_ms: int
max_delay_ms: int
jitter: bool — Always true for exponential
Preserved as specified.
ToolCall:
tool_call_id: UUID
created_at: datetime
trace_id: UUID
task_id: UUID | None — None for fast-lane calls
step_id: UUID | None
event_id: UUID | None
connector_id: str
tool_name: str
action: str
request_hash: str — SHA-256 of REDACTED request
idempotency_key: str — REQUIRED; no exceptions
status: ToolCallStatus — attempted | succeeded | failed | unknown
latency_ms: int | None
risk_level: RiskLevel
autonomy_level: AutonomyLevel
ToolResult:
tool_call_id: UUID
status: ResultStatus — succeeded | failed
response_hash: str — SHA-256; payload stored in artifact store
error: ErrorDetail | None
resolved_at: datetime
Preserved as specified.
AuditEvent:
audit_id: UUID
timestamp: datetime — UTC; set by AuditWriter, never by caller
trace_id: UUID — Required; positional arg to AuditWriter
stage: PipelineStage — normalize | route | execute | tool_call |
gate | operator | scheduler | watcher | rule
refs:
event_id: UUID | None
task_id: UUID | None
step_id: UUID | None
tool_call_id: UUID | None
approval_id: UUID | None
type: str — e.g. "tool_call.succeeded"
summary: str — Human-readable; indexed for search
outcome: AuditOutcome — success | failure | suppressed | info
latency_ms: int | None
tool_name: str | None
connector_id: str | None
risk_level: RiskLevel | None
autonomy_level: AutonomyLevel | None
payload_ref: str | None — Artifact store ID; never inline payloads
Preserved as specified.
Approval:
approval_id: UUID
created_at: datetime
expires_at: datetime
status: ApprovalStatus — pending | approved | denied | expired
trace_id: UUID
refs:
event_id: UUID | None
task_id: UUID | None
step_id: UUID | None
risk_level: RiskLevel
autonomy_level: AutonomyLevel — Snapshot at gate decision time
what: dict — Exact serialised action payload
why: str — Human-readable gate reason
how_to_approve: str — e.g. "POST /approvals/{id}/approve"
decision:
by: str | None — "operator"
at: datetime | None
reason: str | None
ChannelEnum: email | sms | webhook | ha_event | scheduler | rule_engine | watcher | vision
ActorType: user | system | integration
Sensitivity: low | med | high
RiskLevel: low | medium | high | critical
AutonomyLevel: A0 | A1 | A2 | A3 | A4
TaskStatus: pending | running | paused | succeeded | failed | canceled
StepStatus: pending | running | succeeded | failed | paused
ToolCallStatus: attempted | succeeded | failed | unknown
ResultStatus: succeeded | failed
ApprovalStatus: pending | approved | denied | expired
AuditOutcome: success | failure | suppressed | info
PipelineStage: normalize | route | execute | tool_call | gate | operator |
scheduler | watcher | rule
ExecutionMode: fast | task | gated | sandbox
GateType: approval | dryrun | scope_check
GateAction: ALLOW | CONFIRM | PREVIEW | HARD_BLOCK
HealthStatus: healthy | degraded | unavailable | down
RetryStrategy: exponential | fixed | none
ProviderType: native | mcp
Each stage is a function that receives typed input and returns typed output. No shared mutable state between stages.
normalise(raw_payload, channel, connector_id, session) → MessageEvent
route(event: MessageEvent, session) → RoutingDecision
execute(event: MessageEvent, decision: RoutingDecision, session) → None
pipeline/runner.py is a thin loop:
async def process_one(raw_payload, channel, connector_id):
async with session_scope() as session:
event = normalise(raw_payload, channel, connector_id, session)
decision = route(event, session)
execute(event, decision, session)
Each stage commits its own writes within the session. If any stage fails, the session rolls back.
For the fast lane, the entire pipeline runs in a single request-response cycle. For the task lane, execute() only creates the Task and its Steps — the task engine loop picks it up separately.
Normalise produces a MessageEvent with a fresh trace_id (UUID4). It computes dedupe_key from SHA-256(channel + connector_id + message_id) and attempts an INSERT ... ON CONFLICT (dedupe_key) DO NOTHING. If the insert is a no-op, the event is a duplicate: emit event.deduped and return early. Otherwise, persist the event and emit event.ingested. The MessageEvent is returned as a Python object — no serialization boundary between stages.
Route receives the MessageEvent in-memory and calls into routing/ layers in strict priority order:
filters.py — hard filters (quiet-hours check, spam detection). If filtered, return a RoutingDecision with execution_mode reflecting suppression.fastpath.py — deterministic intent matching via a DSL of patterns. If matched, return immediately with confidence = None, used_llm = False.rules_eval.py — evaluate all enabled rules against the event. If matched, dispatch rule actions. Return with matched_rule_ids populated.llm_fallback.py — LLM-based routing. Only reached if nothing above matched. Return with used_llm = True, confidence set.First match wins. Route persists the RoutingDecision and emits routing.decided.
Execute receives both MessageEvent and RoutingDecision and dispatches based on execution_mode:
fast → call tools/executor.py directly in the same request cycle.task → call tasks/engine.py to create a Task with Steps. The engine loop picks it up.gated → call safety/gates.py to create an Approval. Emit gate.required.sandbox → (Milestone 7) submit to workers/manager.py.normaliser.py → event.ingested | event.deduped
router.py → routing.decided
tools/executor.py → tool_call.attempted | tool_call.succeeded |
tool_call.failed | tool_call.deduped | tool_call.unknown
tasks/engine.py → task.step_started | task.step_completed |
task.step_failed | task.step_unexpected_error
safety/gates.py → gate.required | gate.approved | gate.denied |
gate.antiflap_block | gate.storm_block
scheduler/loop.py → schedule.fired | schedule.skipped | schedule.missed
watchers/loop.py → watcher.tick | watcher.error
rules/engine.py → rule.triggered | rule.suppressed
api/routes/controls.py → operator.action.*
trace_id is generated once in normaliser.py as uuid4(). It is set on the MessageEvent and then passed explicitly as a parameter to every downstream function. AuditWriter.emit() takes trace_id as a required positional argument — omission is a type error, not a silent runtime gap.
For child events (emitted by rules or scheduler), trace_id is inherited from the parent event, and parent_event_id is set to the parent's event_id. This creates a tree of events sharing one trace.
A Task is a sequence of Steps. Each Step has:
input: dict — immutable, set at creation.checkpoint: dict — mutable, written mid-step for resumability.output: dict | None — written on success.The checkpoint is the resumability mechanism. A step writes to its checkpoint before any effectful call, so that on restart it can determine what has already happened.
Step runner pattern (pseudocode):
def run_step(task, step, session):
# 1. Load checkpoint
checkpoint = step.checkpoint
# 2. If checkpoint says "about to call tool X with key K":
if checkpoint.get("phase") == "calling_tool":
key = checkpoint["idempotency_key"]
outcome = idempotency_store.get(key)
if outcome:
# Tool already ran — skip to post-processing
return process_outcome(outcome)
else:
# Unknown outcome — this is a crash recovery case
# Reset step, increment attempt, new key
return StepOutcome("retry")
# 3. Do pre-work (compute parameters, validate inputs)
params = compute_params(step.input, checkpoint)
# 4. Write checkpoint before effectful call
step.checkpoint = {"phase": "calling_tool",
"idempotency_key": generate_key(task, step)}
session.flush() # Persist checkpoint within transaction
# 5. Call tool via tools/executor.py
result = tool_executor.call(tool_name, action, params, context)
# 6. Write post-tool checkpoint
step.checkpoint = {"phase": "post_tool", "result_hash": result.response_hash}
session.flush()
# 7. Do post-work, return outcome
return StepOutcome("succeeded", output=result)
CREATE
│
▼
[pending] ──► [running] ──────────────► [succeeded]
│
├── operator pause ──► [paused]
│ └── resume ──────────────► [running]
│
├── operator cancel ──────────────► [canceled]
│
└── retries exhausted ────────────► [failed]
Terminal states: succeeded, failed, canceled. No transitions out of terminal states.
[pending] ─► [running] ─► [succeeded]
│
├── retryable error ─► [pending] (attempt++)
├── non-retryable ─► [failed]
└── operator pause ─► [paused] ─► [running]
Steps have no canceled state. Task cancellation abandons the current step in place.
Enforcement: A pure function validate_transition(current: Status, target: Status) -> bool in tasks/state.py is called by the repo layer before every status update. Invalid transitions raise IllegalStateTransition and are never persisted. This means the state machine can be tested without a database.
The task engine runs on a configurable poll interval (default: 1 second):
SELECT * FROM tasks
WHERE status = 'running'
AND (next_wake_time IS NULL OR next_wake_time <= now())
LIMIT :batch_size
FOR UPDATE SKIP LOCKEDFor each claimed task:
step_runner.run(task, step).succeeded → advance current_step_id to next step, or mark task succeeded if no more steps.retry → increment attempt, compute backoff delay, set next_wake_time.failed → mark task failed.waiting → set task.next_wake_time to the wait-until timestamp.paused → mark step paused.A separate loop runs every 5 seconds to check tasks waiting for approval:
SELECT tasks WHERE status = 'running'
AND current step checkpoint contains approval_idFor each, load the Approval:
approved → resume step execution.denied → StepOutcome("failed", error="approval denied").expired → per config: fail step, or create new Approval and re-notify.pending → wait for next loop iteration.idempotency_key = SHA-256(task_id || step_id || attempt || action || stable_request_hash)
Including attempt ensures that each retry produces a new key. This prevents a stored unknown outcome from blocking a legitimate retry. The stable_request_hash is the SHA-256 of the canonicalized (sorted keys), redacted (no secrets) request body.
Keys are stored in idempotency_outcomes:
idempotency_outcomes:
idempotency_key: str PK
tool_call_id: str
status: str — succeeded | failed | unknown
response_hash: str | None
resolved_at: datetime | None
tasks/recovery.py runs at startup, before any loops start:
Reconcile running tasks. Load all tasks with status = running. For each, find the current step. If step.status = running:
idempotency_outcomes for the step's key.unknown, reset step to pending, increment attempt, emit tool_call.unknown.Restore pending approvals. Load all Approval records with status = pending. No action needed — the approval-wait loop will pick them up.
Recover scheduler. For each enabled schedule, recompute next_run_at from last_run_at + spec. Apply catch_up_policy for missed firings.
Recover watchers. Load all watcher_state rows. Reset consecutive_errors only if healthy before crash.
Start all loops. Pipeline, task engine, scheduler, watcher, approval-wait.
ToolRegistry is an in-memory dictionary: tool_name → (BaseTool, RegisteredTool). Populated at startup from DB records and updated when MCP providers sync. The DB is the source of truth; the registry is a read cache.
ToolRegistry:
register(tool: BaseTool, config: RegisteredTool) → None
get(tool_name: str) → tuple[BaseTool, RegisteredTool] | None
list_healthy() → list[RegisteredTool]
update_health(tool_name: str, health: ToolHealth) → None
unregister(tool_name: str) → None
class BaseTool(ABC):
@property
def tool_name(self) -> str: ...
@property
def capabilities(self) -> list[str]: ...
@property
def scopes_required(self) -> list[str]: ...
def execute(self, action, request, context: ToolCallContext) -> ToolResult: ...
def preview(self, action, request, context: ToolCallContext) -> PreviewResult | None: ...
ToolCallContext carries: trace_id, task_id, step_id, idempotency_key, autonomy_level, granted_scopes. The tool implementation reads nothing from system state directly — everything arrives via context.
tools/executor.py → call(tool_name, action, request, context):
Registry lookup. Fail fast if tool not found or health.status = unavailable.
Scope check. Assert tool.scopes_required ⊆ context.granted_scopes. Failure raises ScopeViolation (non-retryable). Emit audit.
Risk classification. Start from tool.risk_map.get(action, tool.risk_default). Apply adjusters (each can only increase risk by one level):
Hard safety overrides. Evaluated before the gate matrix; can only increase gate strictness:
Gate matrix lookup. (autonomy_level, final_risk) → ALLOW | CONFIRM | PREVIEW | HARD_BLOCK:
| LOW | MEDIUM | HIGH | CRITICAL | |
|---|---|---|---|---|
| A0 | PREVIEW | PREVIEW | PREVIEW | PREVIEW |
| A1 | CONFIRM | CONFIRM | CONFIRM | HARD BLOCK |
| A2 | ALLOW | CONFIRM | CONFIRM | HARD BLOCK |
| A3 | ALLOW | ALLOW | CONFIRM | HARD BLOCK |
| A4 | ALLOW | ALLOW | ALLOW | CONFIRM |
If CONFIRM: create Approval, emit gate.required, raise ApprovalRequired.
If PREVIEW: call tool.preview(), raise PreviewRequired.
If HARD_BLOCK: emit audit, raise HardBlocked.
Idempotency check. Query idempotency_outcomes for context.idempotency_key. If found with succeeded or failed: emit tool_call.deduped, return cached result. If found with unknown: caller handles (typically retry with new attempt/key).
Execute. Emit tool_call.attempted. Call tool.execute():
idempotency_outcomes, emit tool_call.succeeded.tool_call.failed.unknown to idempotency_outcomes, emit tool_call.unknown.MCPConnectionManager (mcp/connection.py):
mcp.connected, mcp.disconnected, mcp.tools_synced).MCPProvider (mcp/provider.py):
MCPConnectionManager, discovers tools.MCPToolAdapter and registers it in ToolRegistry with provider_type = mcp.TrustPolicy to assign risk levels, scopes, and idempotency contracts.MCPToolAdapter (mcp/adapter.py):
BaseTool for a single MCP tool.execute(): calls MCPConnectionManager.call(tool_name, args), translates response to ToolResult.unknown in idempotency_outcomes rather than assuming success or failure.TrustPolicy (mcp/trust.py):
TrustPolicy:
risk_override: RiskLevel | None
scopes_mapping: dict[str, list[str]] — MCP tool name → SYRIS scopes
idempotency_contract: IdempotencyContract — none | provider_managed | syris_managed
supports_preview: bool
MCP annotations are untrusted by default. risk_override is authoritative. All MCP tools default to high risk until explicitly allowlisted.
SecretsStore is a protocol:
class SecretsStore(Protocol):
def get_secret(self, connector_id: str, key: str) -> str: ...
Injected into tool adapters at construction. Adapters call it inside execute() and use the value within function scope. The value must not outlive the function call — it must never be stored on self, in checkpoints, in logs, or in audit payloads.
config_ref on RegisteredTool points to a DB reference (connector_id + key name), not the credential value. The value is fetched at execution time only.
Initial implementation: Fernet-encrypted local file, encryption key from env var or OS keyring. Swappable to Vault or OS keyring later — callers depend only on the protocol.
All three are event sources. They emit MessageEvents that enter the pipeline at the normalisation stage, exactly like any inbound adapter. They do not bypass the pipeline.
Scheduler tick → MessageEvent(channel="scheduler") → normalise → route → execute
Watcher tick → MessageEvent(channel="watcher") → normalise → route → execute
Rule trigger → MessageEvent(channel="rule_engine") → normalise → route → execute
This means all proactive actions get full pipeline treatment: dedup, routing, safety gating, audit.
The scheduler loop runs every 5 seconds:
SELECT * FROM schedules
WHERE enabled = 1 AND next_run_at <= now()
FOR UPDATE SKIP LOCKEDFor each due schedule:
schedule.skipped, advance next_run_at, commit.catch_up_policy if last_run_at is stale:
skip: advance to next future slot, emit schedule.missed per skipped slot.run_once: fire once for the entire missed window, note slot count in audit.run_all_capped: fire up to N times, skip remainder.MessageEvent from the schedule's payload template.last_run_at, compute new next_run_at.Schedule types: cron (cron expression), interval (every N seconds), one_shot (ISO-8601 datetime, auto-disables after firing).
The watcher loop iterates registered watchers based on their tick_interval_seconds:
WatcherState from DB.now - last_tick_at >= tick_interval_seconds.watcher.suppressed.tick(now, state).WatcherState and emit any returned MessageEvents — in a single transaction.consecutive_errors, emit watcher.error, raise alarm if threshold exceeded.The watcher's dedupe_window: dict is opaque to the loop — each watcher manages its own dedup strategy within it (e.g. last-seen device states, last inbox message ID).
HeartbeatWatcher is the reference implementation. On each tick it writes a SystemHealth record to the DB with status, uptime, version, and next_expected_at. It does not emit MessageEvents.
Rules are evaluated by the router as part of the routing stage — not as a separate loop. When a MessageEvent arrives:
routing/rules_eval.py loads the rule cache (in-memory, invalidated on CRUD or every 5 minutes).MessageEvent.now - last_fired_at < debounce_ms, suppress (rule.suppressed, reason: debounce).dedupe_key matches stored last_dedupe_key within dedupe_window_ms, suppress.EmitEventAction, StartTaskAction, or NotifyAction.rule.triggered or rule.suppressed audit event.Condition DSL: JSON structures with logical combinators (all, any, none), field comparison operators (eq, neq, in, contains, matches using glob only — no regex), and system-state conditions (time windows, autonomy level checks). Safety limits: max nesting depth 5, max conditions 20, 10ms evaluation timeout.
The spec references quiet hours policies but doesn't define storage. Add a quiet_hours_policies table:
quiet_hours_policies:
policy_id: str PK
name: str
start_time: str — HH:MM
end_time: str — HH:MM
timezone: str — IANA
days: str — JSON list: ["Mon","Tue","Wed","Thu","Fri"]
created_at: datetime
updated_at: datetime
Referenced by schedules.quiet_hours_policy_id and rules.quiet_hours_policy_id.
Projections are updated in the same DB transaction as the primary record write. This guarantees consistency with zero lag.
# Called inside the transaction that writes a Task status update:
projections.on_task_status_changed(session, task)
# Called inside the transaction that writes an AuditEvent:
projections.on_audit_event(session, event)
# Called inside the transaction that writes an Approval:
projections.on_approval_changed(session, approval)
If the projection update fails, the entire transaction rolls back — including the primary record. The projection is always consistent with the source data.
| Table | Contains |
|---|---|
proj_active_tasks | Running and paused tasks with current step, status, wake time |
proj_schedules | Enabled schedules with next_run, last_run, missed-run count |
proj_watchers | Watchers with enabled, last_tick, outcome, suppression count |
proj_rules | Rules with enabled, hit count, last suppression reason |
proj_integrations | Integration health, auth status, last success, last error |
proj_approvals | Pending approvals with context and expiry |
proj_autonomy_history | Current autonomy level and change history |
proj_alarms | Open, acked, and recently resolved alarms |
Deferred: proj_queue_depth (requires a "tool queue" concept that doesn't exist yet).
The API layer reads from projection tables, not from audit_events or raw entity tables, for all dashboard queries.
The synchronous model is the correct starting point. The seam for async is explicit:
projections.py functions take a session argument and are pure with respect to the DB.| Endpoint | Reads from | Answers |
|---|---|---|
GET /health | system_health | Is SYRIS alive? |
GET /state | Multiple proj_* | What is SYRIS doing right now? |
GET /audit?trace_id=X | audit_events | What happened for this event? |
GET /tasks?status=running | proj_active_tasks | What's in flight? |
GET /approvals?status=pending | proj_approvals | What needs my attention? |
GET /alarms?status=open | proj_alarms | What's broken? |
GET /integrations | proj_integrations | Are my tools healthy? |
Build these first. Add the rest as the dashboard demands them.
HeartbeatWatcher writes to system_health every 30 seconds:
SystemHealth:
status: HealthStatus — healthy | degraded | down
uptime_seconds: int
version: str
restart_reason: str | None
last_heartbeat_at: datetime
next_expected_at: datetime — if now > this + grace, heartbeat missed
degraded_subsystems: list[str] — names of unhealthy subsystems
Alarm system: Alarms are persisted entities with lifecycle open → acked → resolved. They use dedupe_key to prevent storms: if an open alarm exists for the same key, no duplicate is created.
Trigger conditions (from the spec):
| Trigger | Condition | Severity |
|---|---|---|
| Missed heartbeat | now > next_expected_at + grace_period | critical |
| Integration auth failed | Consecutive auth errors ≥ threshold | error |
| Integration auth expiring | Credential expiry within warning window | warning |
| Repeated tool errors | tool.consecutive_errors ≥ threshold | error |
| Stuck task | status=running AND updated_at < now - threshold | warning |
| Rule storm | triggered_count_per_minute > threshold | warning |
| Schedule backlog | next_run_at < now - threshold AND enabled | warning |
| Notification storm | Outbound notifications/hour > max | warning |
| Secret in audit payload | Redaction miss detected | critical |
Timeline: Days 1–2
Deliverables:
pyproject.toml with dependencies (fastapi, uvicorn, sqlalchemy, alembic, pydantic)config.py with typed settingsaudit_events, system_health tablesAuditWriter implemented and unit-testedGET /health returns hardcoded heartbeat JSONGET /audit returns audit events (empty list)Done when: uv run python -m syris.main starts. Hit /health and get a JSON response. Write an audit event via test helper. See it at GET /audit. All unit-tested.
Why first: AuditWriter is the most load-bearing infrastructure. Every subsequent milestone depends on it.
Timeline: Week 1
Deliverables:
schemas/ package: MessageEvent, RoutingDecision, AuditEvent (Pydantic v2)storage/models.py + repos for events, audit, routing_decisionsnormaliser.py: accepts raw dict + channel, returns MessageEvent, persists + auditsrouter.py: hard filter + one fast path intent (e.g. timer.set), returns RoutingDecisionpipeline/executor.py: fast lane only, calls tool executortools/executor.py: scope check + idempotency + NoopTool + auditGET /events, GET /audit, GET /audit?trace_id=XDone when: POST a raw event → full trace at /audit?trace_id=X shows exactly 4 audit events: event.ingested, routing.decided, tool_call.attempted, tool_call.succeeded. POST same event again → event.deduped. Zero log-spelunking required.
Timeline: Week 2
Deliverables:
tasks/engine.py: claim → execute → checkpoint loop with FOR UPDATE SKIP LOCKEDtasks/step_runner.py: run step, handle retries, write checkpointtasks/state.py: state machine enforcement with validate_transition()tasks/recovery.py: startup reconciliationGET /tasks, GET /tasks/{id}, POST /tasks/{id}/cancel|pause|resumeDone when: Create a 3-step task with NoopTool at step 2. Kill process mid-step-2. Restart. Task resumes from step 2 (not step 1). Audit shows the interruption and recovery. Run 10 times — no duplicated side effects.
Timeline: Week 3
Deliverables:
safety/autonomy.py: read/write current level, persist historysafety/risk.py: classify tool action → risk level with adjusterssafety/gates.py: gate matrix + hard safety overridessafety/dryrun.py: preview protocolGET /approvals, POST /approvals/{id}/approve|denyPOST /controls/autonomyDone when: Set autonomy to A1. Trigger medium-risk tool call. Approval created at /approvals, tool not executed, gate.required in audit. Approve via API. Tool executes. gate.approved + tool_call.succeeded in audit. Full trace queryable.
Timeline: Week 4
Deliverables:
scheduler/loop.py: cron + interval + one-shotwatchers/loop.py + watchers/base.py + HeartbeatWatcherGET /health now uses real heartbeat dataGET /schedules, POST /schedules, PATCH /schedules/{id}GET /watchers, PATCH /watchers/{id}Done when: Create 30-second interval schedule. Observe schedule.fired in audit every ~30s. Heartbeat appears at /health with real uptime. Disable watcher via API → confirm it stops ticking. Kill process, restart → schedule catches up per policy.
Timeline: Week 5
Deliverables:
quiet_hours_policies table)GET /rules, PATCH /rules/{id}Done when: Rule matching source.channel == "ha_event" fires → emits child event with parent_event_id set → both in audit with same trace_id. Same event fired 5× in 1 second with 10s debounce → 1 rule.triggered + 4 rule.suppressed in audit.
Timeline: Weeks 6–7
Deliverables:
mcp/connection.py: MCPConnectionManager with reconnect + backoffmcp/provider.py: tool discovery → registry syncmcp/adapter.py: MCPToolAdapter implementing BaseToolmcp/trust.py: TrustPolicy schema + loaderGET /integrations showing MCP server healthDone when: Connect a real MCP server. Tools appear in GET /integrations. Execute one tool → full audit trail (scope check, risk, idempotency, gate, result). Disconnect server → health degrades within 30 seconds.
Timeline: Week 8
Deliverables:
workers/manager.py: spawn/progress/cancel via OS process isolationGET /state shows job countDone when: Submit stub long-running job via API. See it in /state. Cancel it. See cancellation in audit.
Timeline: Week 9+
Deliverables:
Done when: A real-world event (HA state change or incoming email) flows through the full pipeline, routes correctly, executes a real tool, and requires/passes approval if risk demands it. SYRIS does something useful in the real world.
For the schema to perform well, these indexes are critical:
events:
idx_events_trace_id ON events(trace_id)
idx_events_ingested_at ON events(ingested_at)
idx_events_channel ON events(channel)
uq_events_dedupe_key ON events(dedupe_key) WHERE dedupe_key IS NOT NULL -- UNIQUE partial
audit_events:
idx_audit_trace_id ON audit_events(trace_id)
idx_audit_timestamp ON audit_events(timestamp)
idx_audit_type ON audit_events(type)
idx_audit_outcome ON audit_events(outcome)
idx_audit_ref_task_id ON audit_events(ref_task_id) WHERE ref_task_id IS NOT NULL
tasks:
idx_tasks_status ON tasks(status)
idx_tasks_trace_id ON tasks(trace_id)
idx_tasks_wake ON tasks(next_wake_time) WHERE status = 'running'
steps:
idx_steps_task_id ON steps(task_id)
idx_steps_idemp ON steps(idempotency_key) WHERE idempotency_key IS NOT NULL
tool_calls:
idx_toolcalls_trace_id ON tool_calls(trace_id)
idx_toolcalls_idemp ON tool_calls(idempotency_key)
idx_toolcalls_created ON tool_calls(created_at)
approvals:
idx_approvals_status ON approvals(status)
idx_approvals_trace_id ON approvals(trace_id)
idx_approvals_expires ON approvals(expires_at)
schedules:
idx_schedules_next_run ON schedules(next_run_at) WHERE enabled = 1 -- partial
alarms:
uq_alarms_dedupe_open ON alarms(dedupe_key) WHERE status = 'open' -- UNIQUE partial
idx_alarms_status ON alarms(status)
schemas/ ← depends on nothing
storage/ ← depends on schemas/
observability/ ← depends on schemas/, storage/
secrets/ ← depends on nothing
safety/ ← depends on schemas/, storage/, observability/
tools/ ← depends on schemas/, storage/, observability/, safety/, secrets/
routing/ ← depends on schemas/, rules/
pipeline/ ← depends on schemas/, routing/, tools/, tasks/, safety/, observability/
tasks/ ← depends on schemas/, storage/, tools/, observability/
scheduler/ ← depends on schemas/, storage/, pipeline/, observability/
watchers/ ← depends on schemas/, storage/, pipeline/, observability/
rules/ ← depends on schemas/, storage/
mcp/ ← depends on schemas/, tools/, storage/, observability/
adapters/ ← depends on schemas/, secrets/
api/ ← depends on everything (thin layer)
No circular dependencies. Each module has a clear dependency direction pointing downward toward schemas/ and storage/.
On This Page
SYRIS — Implementation Design DocumentUnderstanding ConfirmationSpec Issues and ClarificationsIssue 1: Secrets Rule 3 is too strictIssue 2: ToolResult has no trace_idIssue 3: routing/ boundary is confusingIssue 4: scheduler/ contains unrelated subsystemsIssue 5: sandbox lane is undefinedIssue 6: ErrorDetail is never definedIssue 7: proj_queue_depth is premature1. Revised Module LayoutJustification for changes from the spec2. Core SchemasErrorDetail [ADDED]MessageEventRoutingDecisionTaskStepToolCall + ToolResultAuditEventApprovalEnums (in common.py)3. Pipeline DesignThree-stage separationData flow between stagesAudit event emission pointsHow trace_id threads through4. Task Engine DesignResumable checkpointed tasksState machinesTask state machineStep state machineEngine loopApproval-wait loopIdempotency key generationCrash recovery5. Tool Runtime + MCP Adapter DesignTool RegistryBaseTool interfaceTool executor flowMCPProvider + MCPToolAdapterSecrets boundary6. Scheduler + Watchers + Rules EngineFundamental principleSchedulerWatchersRules engineQuiet hours storage7. Observability + ProjectionsSynchronous projection modelProjection tablesExtraction seamMinimum viable dashboard APIHealth model8. Implementation SequencingMilestone 0: SkeletonMilestone 1: Pipeline + Fast LaneMilestone 2: Task EngineMilestone 3: Safety Layer + ApprovalsMilestone 4: Scheduler + WatchersMilestone 5: Rules EngineMilestone 6: MCP IntegrationMilestone 7: Worker SkeletonMilestone 8: First Real IntegrationAppendix: Key IndexesAppendix: Dependency Graph