The task engine manages multi-step, checkpointed workflows. It is the execution path for events routed to the task lane.
CREATE
│
▼
[pending] ──► [running] ──────────────► [succeeded]
│
├── operator pause ──► [paused]
│ └── resume ──────────────► [running]
│
├── operator cancel ──────────────► [canceled]
│
└── retries exhausted ────────────► [failed]Terminal states: succeeded, failed, canceled. No transitions out of terminal states.
[pending] ─► [running] ─► [succeeded]
│
├── retryable error ─► [pending] (attempt++)
├── non-retryable ─► [failed]
└── operator pause ─► [paused] ─► [running]Steps have no canceled state. Task cancellation abandons the current step in place.
The task engine runs on a configurable poll interval:
-- Claim runnable tasks (safe for future multi-worker via SKIP LOCKED)
SELECT * FROM tasks
WHERE status = 'running'
AND (next_wake_time IS NULL OR next_wake_time <= now())
LIMIT batch_size
FOR UPDATE SKIP LOCKEDFor each claimed task:
step_runner.run(task, step).step_runner returns one of: succeeded, retry, failed, waiting, paused.succeeded → advance current_step_id or mark task succeededretry → increment attempt, set next_wake_time with computed backofffailed → mark task failedwaiting → set task.next_wake_time to wait_untilpaused → mark step pausedEach step execution and outcome is persisted in a single DB transaction. See architecture/data-contracts for the full Step schema.
Every effectful step must have an idempotency_key. Generation strategy:
idempotency_key = SHA-256(task_id + step_id + attempt + action + stable_request_hash)The key incorporates attempt so that each retry attempt generates a distinct key, preventing a previously-stored unknown outcome from blocking a legitimate retry.
Outcomes are stored in the idempotency_outcomes table:
idempotency_outcomes
idempotency_key: str PK — the key
tool_call_id: str — associated tool call
status: str — succeeded | failed | unknown
response_hash: str | None
resolved_at: datetime | NoneOn every startup, tasks/recovery.py runs before the pipeline loop starts:
Approval records with status = pending — restore approval-wait state.status = running:
step.status = running:
idempotency_outcomes for the step's idempotency_key.unknown. Reset step to pending, increment attempt, emit AuditEvent("tool_call.unknown").next_run_at, apply catch_up_policy for missed schedules.An approval-wait loop runs alongside the main engine loop every 5 seconds:
SELECT tasks WHERE status = 'running'
AND current step checkpoint contains approval_idFor each waiting task, load the Approval by approval_id from the step checkpoint:
approved → resume step (mark running, re-invoke with approval in context)denied → StepOutcome("failed", error="approval denied")expired → per configuration: fail the step, or create a new Approval and notify operatorpending → wait for next loop iterationThis loop is crash-safe: on restart, the engine reloads all waiting tasks, re-reads their checkpoints, and re-enters the loop automatically.