Watchers are proactive components that tick on a configurable interval, check some external or internal state, and emit MessageEvents when something changes. Watcher state is persisted between ticks.
All watchers implement BaseWatcher in scheduler/watchers/base.py:
class BaseWatcher(ABC):
watcher_id: str
enabled: bool
tick_interval_seconds: int
def tick(
self,
now: datetime,
state: WatcherState,
) -> list[MessageEvent]: ...
# tick() MUST be idempotent: safe to call repeatedly with the same inputs.
# Returns the list of MessageEvents to inject into the pipeline (may be empty).The watcher loop calls tick() and persists the updated WatcherState in a single transaction. Emitted events are handed to the normaliser.
WatcherState is persisted in the watcher_state table — one row per watcher.
watcher_state
watcher_id: str PK
enabled: bool
last_tick_at: datetime | None — UTC timestamp of last completed tick
last_outcome: str — ok | error | suppressed
dedupe_window: dict — Watcher-specific dedup state (JSON)
suppression_count: int — Cumulative suppression events
consecutive_errors: int — Resets to 0 on successful tick
updated_at: datetimededupe_window is opaque to the watcher loop; each watcher implementation manages its own structure within it. This allows per-watcher dedup strategies (e.g., last-seen device states, last inbox message ID).
The watcher loop runs each watcher on its tick_interval_seconds schedule:
WatcherState from DB.now - last_tick_at >= tick_interval_seconds.AuditEvent("watcher.suppressed").tick(now, state).WatcherState and emit any returned MessageEvents — in a single transaction.consecutive_errors; emit AuditEvent("watcher.error"); raise alarm if threshold exceeded.HeartbeatWatcher is the concrete reference implementation in scheduler/watchers/heartbeat.py.
On each tick it writes a SystemHealth record to the DB with the current status, uptime, version, and next_expected_at. The health and alarms subsystem uses next_expected_at to detect missed heartbeats.
HeartbeatWatcher does not emit MessageEvents — it writes directly to system_health. It is the simplest possible watcher and serves as the implementation template for all others.
See observability/health-and-alarms for the health record schema and alarm trigger conditions.