Build Your Own Checkpoint Store¶
Spectra ships with InMemoryCheckpointStore and FileCheckpointStore for development and testing.
For production, you will usually want a durable backend such as Redis, Postgres, Cosmos DB, or your own storage system.
This guide shows the contract, the storage model, and the behaviors your checkpoint store must support.
When to build a custom checkpoint store¶
Build a custom checkpoint store when you need:
- durable workflow recovery
- resume after process restarts
- shared checkpoint storage across multiple app instances
- checkpoint history for debugging or audit
- time travel and forking in production
A simple rule:
- use built-in stores for local development
- build your own store for real production persistence
Step 1 — Implement ICheckpointStore¶
A checkpoint store does more than save the latest state.
It also supports:
- full checkpoint history
- loading checkpoints by index
- forking a run
- tracing lineage back to parent runs
using Spectra.Contracts.Checkpointing;
using Spectra.Contracts.State;
public class RedisCheckpointStore : ICheckpointStore
{
// --- Core methods ---
public Task SaveAsync(Checkpoint checkpoint, CancellationToken ct = default)
{
// Serialize with CheckpointSerializer.Serialize(checkpoint)
// Append to a list keyed by checkpoint.RunId (to preserve history)
// Set the Index to the position in the list
}
public Task<Checkpoint?> LoadAsync(string runId, CancellationToken ct = default)
{
// Return the last (most recent) checkpoint for this runId
}
public Task<Checkpoint?> LoadLatestAsync(string workflowId, CancellationToken ct = default)
{
// Query across all runs by workflowId, order by UpdatedAt descending, return first
}
public Task DeleteAsync(string runId, CancellationToken ct = default)
{
// Remove the latest checkpoint entry for runId
}
public Task<IReadOnlyList<Checkpoint>> ListAsync(
string? workflowId = null, CancellationToken ct = default)
{
// Return all (or filtered) checkpoints, ordered by UpdatedAt descending
}
// --- Time travel methods ---
public Task<Checkpoint?> LoadByIndexAsync(string runId, int index, CancellationToken ct = default)
{
// Load a specific checkpoint by its sequential index within a run
}
public Task<IReadOnlyList<Checkpoint>> ListByRunAsync(string runId, CancellationToken ct = default)
{
// Return all checkpoints for a run, ordered by index ascending
}
// --- Fork methods ---
public Task<Checkpoint> ForkAsync(
string sourceRunId, int checkpointIndex, string newRunId,
WorkflowState? stateOverrides = null, CancellationToken ct = default)
{
// Load checkpoint at sourceRunId/checkpointIndex
// Deep-clone the state, apply overrides if provided
// Create a new checkpoint under newRunId with ParentRunId and ParentCheckpointIndex set
// Save and return the forked checkpoint
}
public Task<IReadOnlyList<Checkpoint>> GetLineageAsync(string runId, CancellationToken ct = default)
{
// Walk ParentRunId chain from this run back to the root, return ancestor list
}
public Task PurgeAsync(string runId, CancellationToken ct = default)
{
// Remove ALL checkpoints for a run (entire history)
}
}
Step 2 — Understand what a checkpoint store must preserve¶
A checkpoint store is not just a key-value lookup for "latest run state".
It must preserve:
- the latest checkpoint for resume
- the ordered history of checkpoints in a run
- enough metadata for time travel and forking
- parent-child lineage between runs
A good mental model is:
- one run
- many checkpoints
- each checkpoint has an index
- forked runs point back to a parent run
Step 3 — Design the storage model¶
A practical storage design usually looks like this:
| Concern | Recommendation |
|---|---|
| Latest checkpoint lookup | Index by RunId |
| Full history | Store checkpoints per run in index order |
| Workflow-level lookup | Index by WorkflowId and UpdatedAt |
| Time travel | Support load by RunId + Index |
| Fork lineage | Store ParentRunId and parent checkpoint index |
| Serialization | Store the full checkpoint as JSON or versioned binary |
The most important rule is:
preserve history, do not overwrite it
Checkpoint history is required for:
- resume
- debugging
- time travel
- forking
- lineage queries
Step 4 — Implement the core methods¶
SaveAsync¶
Save a checkpoint as the next entry in the run history.
public Task SaveAsync(Checkpoint checkpoint, CancellationToken ct = default)
{
// Append to run history
}
A good implementation should:
- append, not replace
- assign the correct sequential
Index - preserve
UpdatedAt - make the checkpoint visible to latest-run queries
LoadAsync¶
Return the most recent checkpoint for a run.
public Task<Checkpoint?> LoadAsync(string runId, CancellationToken ct = default)
{
// Load latest checkpoint for run
}
This is the method the runner uses most often for resume-from-latest behavior.
LoadLatestAsync¶
Return the newest checkpoint across runs for a workflow.
public Task<Checkpoint?> LoadLatestAsync(string workflowId, CancellationToken ct = default)
{
// Latest checkpoint for workflow
}
This is useful for workflow-level inspection or operational tools.
DeleteAsync¶
Delete only the latest checkpoint entry for a run.
public Task DeleteAsync(string runId, CancellationToken ct = default)
{
// Remove latest checkpoint only
}
This is different from PurgeAsync, which deletes the full history.
ListAsync¶
Return checkpoints, optionally filtered by workflow.
public Task<IReadOnlyList<Checkpoint>> ListAsync(
string? workflowId = null, CancellationToken ct = default)
{
// List checkpoints ordered by UpdatedAt descending
}
Step 5 — Support time travel¶
Time travel depends on the ability to load historical checkpoints, not just the latest one.
LoadByIndexAsync¶
Load one specific checkpoint in a run.
public Task<Checkpoint?> LoadByIndexAsync(string runId, int index, CancellationToken ct = default)
{
// Load checkpoint by index
}
ListByRunAsync¶
Return the ordered history for one run.
public Task<IReadOnlyList<Checkpoint>> ListByRunAsync(string runId, CancellationToken ct = default)
{
// Return all checkpoints in ascending index order
}
Use ascending index order so the run history reads naturally from start to latest checkpoint.
Step 6 — Implement forking¶
Forking creates a new run from a historical checkpoint.
public Task<Checkpoint> ForkAsync(
string sourceRunId, int checkpointIndex, string newRunId,
WorkflowState? stateOverrides = null, CancellationToken ct = default)
{
// Create a new run from an old checkpoint
}
A correct fork implementation should:
- load the source checkpoint
- deep-clone its state
- apply any overrides
- assign the new run ID
- set
ParentRunId - save the new run's first checkpoint
The cloned state must be independent from the source run. Do not share mutable references across runs.
Warning
Forking should never mutate the original run. Treat the source checkpoint as immutable history.
Step 7 — Implement lineage¶
Lineage lets you understand where a run came from.
GetLineageAsync¶
Walk back through parent runs until you reach the root.
public Task<IReadOnlyList<Checkpoint>> GetLineageAsync(string runId, CancellationToken ct = default)
{
// Walk ParentRunId chain back to the root
}
This is useful for:
- audit trails
- production debugging
- tracing sandbox runs back to real runs
- experimentation history
PurgeAsync¶
Delete the entire history for one run.
public Task PurgeAsync(string runId, CancellationToken ct = default)
{
// Remove all checkpoints for the run
}
Use this when you want a hard cleanup of the run's checkpoint history.
Step 8 — Serialize carefully¶
A checkpoint contains full workflow state, not just a few fields.
That means your store should serialize and deserialize consistently.
A practical approach is:
- serialize the full checkpoint
- preserve all fields
- preserve index ordering
- keep format evolution in mind if your store will survive upgrades
If you use JSON, treat the stored checkpoint as an opaque payload plus queryable metadata fields.
A good split is:
- metadata columns or keys for indexing
- full serialized checkpoint for full restore
Step 9 — Register the store¶
Once implemented, register it with Spectra:
You can also configure checkpoint behavior at registration time:
services.AddSpectra(builder =>
{
builder.AddCheckpoints(new RedisCheckpointStore(), options =>
{
options.Frequency = CheckpointFrequency.EveryNode;
options.CheckpointOnFailure = true;
options.CheckpointOnInterrupt = true;
options.CheckpointOnAwaitingInput = true;
});
});
After that, the runner uses your store for checkpoint save/load behavior automatically.
Testing your store¶
At minimum, test these scenarios:
- saving then loading the latest checkpoint
- saving multiple checkpoints preserves index order
- loading by index returns the expected historical checkpoint
- listing by run returns the full ordered history
- forking creates a new run without mutating the source
- lineage returns the ancestor chain correctly
- purge deletes the full run history
- cancellation tokens are honored
Example test shape:
[Fact]
public async Task SaveAsync_Then_LoadAsync_Returns_Latest_Checkpoint()
{
var store = new RedisCheckpointStore();
var checkpoint = new Checkpoint
{
RunId = "run-1",
WorkflowId = "wf-1",
Index = 0,
UpdatedAt = DateTimeOffset.UtcNow,
Status = CheckpointStatus.InProgress,
State = new WorkflowState()
};
await store.SaveAsync(checkpoint);
var loaded = await store.LoadAsync("run-1");
Assert.NotNull(loaded);
Assert.Equal("run-1", loaded!.RunId);
}
Tip
Forking and lineage are the two easiest areas to get subtly wrong. Test them explicitly, not just the latest-checkpoint path.
Storage design tips¶
A few defaults work well for most backends:
| Concern | Recommendation |
|---|---|
| Run partitioning | Group checkpoints by RunId |
| History order | Use monotonically increasing Index |
| Latest lookup | Cache or index the newest checkpoint per run |
| Workflow lookup | Index by WorkflowId and UpdatedAt |
| Fork traceability | Store ParentRunId and parent checkpoint index |
| Serialization | Keep full checkpoint payload intact |
Quick reference¶
| Task | How |
|---|---|
| Build a checkpoint store | Implement ICheckpointStore |
| Save latest state | SaveAsync(checkpoint) |
| Load latest checkpoint | LoadAsync(runId) |
| Load latest by workflow | LoadLatestAsync(workflowId) |
| Browse history | ListByRunAsync(runId) |
| Load historical checkpoint | LoadByIndexAsync(runId, index) |
| Fork a run | ForkAsync(sourceRunId, checkpointIndex, newRunId, overrides) |
| Trace lineage | GetLineageAsync(runId) |
| Delete latest checkpoint | DeleteAsync(runId) |
| Delete full run history | PurgeAsync(runId) |
| Register in Spectra | builder.AddCheckpoints(new YourStore()) |
A simple mental model¶
A checkpoint store is a durable execution history.
It must answer:
- where was this run last?
- what did this run look like at checkpoint N?
- can I resume from there?
- can I fork from there?
- where did this forked run come from?
If your store can answer those reliably, it is a good Spectra checkpoint backend.