Cyclium.Recovery (Cyclium v0.1.12)

Copy Markdown View Source

Recovers orphaned episodes after server restarts.

After a deploy or crash, in-flight episodes are left as :running in the DB. sweep/1 finds these stale episodes and either restarts or fails them based on the expectation's recovery_policy.

Multi-node coordination

All nodes in the cluster run the same sweep after boot. Coordination is DB-based via optimistic claims — Episodes.claim_for_recovery/1 sets the episode's phase to "recovering" only if it's still :running. First node to claim wins; others silently skip.

Policy resolution order

  1. Compiled actor registry — looks up recovery_policy from the actor module's compiled expectations
  2. DB agent definitions — falls back to cyclium_agent_definitions table for dynamic actors not in the compiled registry
  3. Default — :fail if the actor is unknown in both

Workflow reconciliation

reconcile_workflows/0 handles the inverse problem: workflow instances stuck in :running because the WorkflowEngine missed a Bus event during a restart. It finds steps marked "running" in step_states whose episodes have already reached a terminal state, and re-broadcasts the Bus event so the engine can advance the workflow.

Call reconcile_workflows/0 after sweep/1 and after workflow configs are registered.

Usage

Typically called from the host app's supervisor with a startup delay:

{Task, fn ->
  Process.sleep(:timer.minutes(2))
  Cyclium.Recovery.sweep()
  Cyclium.Recovery.reconcile_workflows()
end}

Options

  • :stale_after_ms — consider an episode stale if no step journal activity for this long (default: 2 minutes)
  • :actor_registry — map of %{"actor_id" => ActorModule} for compiled actors. Must match the identifier() declared in each actor's DSL block. Dynamic actors not in this map are resolved from the DB automatically.
  • :resolve_policy(episode -> :fail | :restart) callback for custom policy resolution. Overrides :actor_registry if both are provided.

  • :source_stack — restrict the sweep to episodes originated by this stack. Defaults to Application.get_env(:cyclium, :stack_slug). Pass nil explicitly to sweep globally (single-stack mode).

Examples

# Recommended: pass compiled actors, dynamic actors resolved from DB
Cyclium.Recovery.sweep(
  actor_registry: %{
    "project_health_actor" => MyApp.Actors.ProjectHealthActor
  }
)

# No registry — dynamic actors still resolved from DB
Cyclium.Recovery.sweep()

# Custom: pass a resolve_policy function
Cyclium.Recovery.sweep(
  resolve_policy: fn _episode -> :restart end
)

Summary

Functions

Reconcile running workflow instances after a restart.

Sweep for orphaned :running episodes and recover them.

Functions

reconcile_workflows(opts \\ [])

Reconcile running workflow instances after a restart.

Finds workflow instances in :running or :blocked status whose step_states contain steps marked "running" but whose episodes have already reached a terminal state (:done, :failed, :canceled). For each stale step, re-broadcasts the appropriate Bus event so the WorkflowEngine can advance the workflow.

Should be called after sweep/1 and after workflow configs are registered (compiled modules booted, dynamic workflows loaded).

Options

  • :source_stack — restrict reconciliation to instances originated by this stack. Defaults to Application.get_env(:cyclium, :stack_slug). Pass nil explicitly to reconcile globally.
  • :source_env — restrict reconciliation to instances originated by this env, matched by strict equality. Defaults to Cyclium.Env.current() (so each env reconciles only its own instances; legacy NULL rows belong to the unset/default env).

Returns {:ok, %{replayed: n, skipped: n}}.

sweep(opts \\ [])

Sweep for orphaned :running episodes and recover them.

Scoped to the node's stack (source_stack) and env (source_env, defaulting to Cyclium.Env.current()) so a node only recovers orphans it could have created. Override either via opts; source_env is matched by strict equality, so an env-tagged node never resurrects the default node's work.

Returns {:ok, %{restarted: n, failed: n, skipped: n}}.