Watchers

BaseWatcher interface, WatcherState fields, the watcher loop, and HeartbeatWatcher as the reference implementation.

Watchers are proactive components that tick on a configurable interval, check some external or internal state, and emit MessageEvents when something changes. Watcher state is persisted between ticks.

BaseWatcher interface

All watchers implement BaseWatcher in scheduler/watchers/base.py:

class BaseWatcher(ABC):
    watcher_id:            str
    enabled:               bool
    tick_interval_seconds: int
 
    def tick(
        self,
        now: datetime,
        state: WatcherState,
    ) -> list[MessageEvent]: ...
    # tick() MUST be idempotent: safe to call repeatedly with the same inputs.
    # Returns the list of MessageEvents to inject into the pipeline (may be empty).

The watcher loop calls tick() and persists the updated WatcherState in a single transaction. Emitted events are handed to the normaliser.

WatcherState fields

WatcherState is persisted in the watcher_state table — one row per watcher.

watcher_state
  watcher_id:          str     PK
  enabled:             bool
  last_tick_at:        datetime | None   — UTC timestamp of last completed tick
  last_outcome:        str               — ok | error | suppressed
  dedupe_window:       dict              — Watcher-specific dedup state (JSON)
  suppression_count:   int               — Cumulative suppression events
  consecutive_errors:  int               — Resets to 0 on successful tick
  updated_at:          datetime

dedupe_window is opaque to the watcher loop; each watcher implementation manages its own structure within it. This allows per-watcher dedup strategies (e.g., last-seen device states, last inbox message ID).

Watcher loop

The watcher loop runs each watcher on its tick_interval_seconds schedule:

  1. Load WatcherState from DB.
  2. Check if now - last_tick_at >= tick_interval_seconds.
  3. Apply global throttle: if total watcher ticks in the last minute exceed the system-wide maximum, skip and emit AuditEvent("watcher.suppressed").
  4. Check for dedup: if the watcher's own dedupe state indicates no change since last tick, skip without emitting.
  5. Call tick(now, state).
  6. Persist updated WatcherState and emit any returned MessageEvents — in a single transaction.
  7. On exception: increment consecutive_errors; emit AuditEvent("watcher.error"); raise alarm if threshold exceeded.

HeartbeatWatcher

HeartbeatWatcher is the concrete reference implementation in scheduler/watchers/heartbeat.py.

On each tick it writes a SystemHealth record to the DB with the current status, uptime, version, and next_expected_at. The health and alarms subsystem uses next_expected_at to detect missed heartbeats.

HeartbeatWatcher does not emit MessageEvents — it writes directly to system_health. It is the simplest possible watcher and serves as the implementation template for all others.

See observability/health-and-alarms for the health record schema and alarm trigger conditions.