Recovers orphaned episodes after server restarts.
After a deploy or crash, in-flight episodes are left as :running in the DB.
sweep/1 finds these stale episodes and either restarts or fails them based
on the expectation's recovery_policy.
Multi-node coordination
All nodes in the cluster run the same sweep after boot. Coordination is
DB-based via optimistic claims — Episodes.claim_for_recovery/1 sets the
episode's phase to "recovering" only if it's still :running. First node
to claim wins; others silently skip.
Policy resolution order
- Compiled actor registry — looks up
recovery_policyfrom the actor module's compiled expectations - DB agent definitions — falls back to
cyclium_agent_definitionstable for dynamic actors not in the compiled registry - Default —
:failif the actor is unknown in both
Workflow reconciliation
reconcile_workflows/0 handles the inverse problem: workflow instances
stuck in :running because the WorkflowEngine missed a Bus event during
a restart. It finds steps marked "running" in step_states whose
episodes have already reached a terminal state, and re-broadcasts the
Bus event so the engine can advance the workflow.
Call reconcile_workflows/0 after sweep/1 and after workflow configs
are registered.
Usage
Typically called from the host app's supervisor with a startup delay:
{Task, fn ->
Process.sleep(:timer.minutes(2))
Cyclium.Recovery.sweep()
Cyclium.Recovery.reconcile_workflows()
end}Options
:stale_after_ms— consider an episode stale if no step journal activity for this long (default: 2 minutes):actor_registry— map of%{"actor_id" => ActorModule}for compiled actors. Must match theidentifier()declared in each actor's DSL block. Dynamic actors not in this map are resolved from the DB automatically.:resolve_policy—(episode -> :fail | :restart)callback for custom policy resolution. Overrides:actor_registryif both are provided.:source_stack— restrict the sweep to episodes originated by this stack. Defaults toApplication.get_env(:cyclium, :stack_slug). Passnilexplicitly to sweep globally (single-stack mode).
Examples
# Recommended: pass compiled actors, dynamic actors resolved from DB
Cyclium.Recovery.sweep(
actor_registry: %{
"project_health_actor" => MyApp.Actors.ProjectHealthActor
}
)
# No registry — dynamic actors still resolved from DB
Cyclium.Recovery.sweep()
# Custom: pass a resolve_policy function
Cyclium.Recovery.sweep(
resolve_policy: fn _episode -> :restart end
)
Summary
Functions
Reconcile running workflow instances after a restart.
Sweep for orphaned :running episodes and recover them.
Functions
Reconcile running workflow instances after a restart.
Finds workflow instances in :running or :blocked status whose step_states
contain steps marked "running" but whose episodes have already reached a
terminal state (:done, :failed, :canceled). For each stale step,
re-broadcasts the appropriate Bus event so the WorkflowEngine can advance
the workflow.
Should be called after sweep/1 and after workflow configs are registered
(compiled modules booted, dynamic workflows loaded).
Options
:source_stack— restrict reconciliation to instances originated by this stack. Defaults toApplication.get_env(:cyclium, :stack_slug). Passnilexplicitly to reconcile globally.:source_env— restrict reconciliation to instances originated by this env, matched by strict equality. Defaults toCyclium.Env.current()(so each env reconciles only its own instances; legacyNULLrows belong to the unset/default env).
Returns {:ok, %{replayed: n, skipped: n}}.
Sweep for orphaned :running episodes and recover them.
Scoped to the node's stack (source_stack) and env (source_env,
defaulting to Cyclium.Env.current()) so a node only recovers orphans it
could have created. Override either via opts; source_env is matched by
strict equality, so an env-tagged node never resurrects the default node's
work.
Returns {:ok, %{restarted: n, failed: n, skipped: n}}.