Build a Custom Step¶
This guide walks you through creating a custom step from scratch — from a minimal implementation to advanced features like streaming, interrupts, and memory access.
When to Build a Custom Step¶
Spectra's built-in steps cover LLM completions, agent loops, sessions, subgraphs, and memory. Build a custom step when you need to:
- Call an external API or database
- Transform, validate, or enrich data
- Run domain-specific business logic
- Integrate with a system that isn't an LLM
- Implement a custom orchestration pattern
Rule of Thumb
If your logic doesn't need an LLM, you almost certainly need a custom step. If it does need an LLM but with non-standard behavior, a custom step wrapping ILlmClient directly gives you full control.
Minimal Implementation¶
Here's the simplest possible step — it counts words in a text input:
using Spectra.Contracts.Steps;
public class WordCountStep : IStep
{
public string StepType => "word_count";
public Task<StepResult> ExecuteAsync(StepContext context)
{
var text = context.Inputs.TryGetValue("text", out var v)
? v?.ToString() ?? ""
: "";
var count = text.Split(' ', StringSplitOptions.RemoveEmptyEntries).Length;
return Task.FromResult(StepResult.Success(new Dictionary<string, object?>
{
["count"] = count,
["text"] = text
}));
}
}
Key points:
StepTypeis a string identifier. Pick something unique and descriptive.- Read inputs from
context.Inputs— these are already resolved (template expressions replaced with real values). - Return
StepResult.Success(outputs)with a dictionary of output values. - The method is
Task-returning. For synchronous work, useTask.FromResult.
Registration¶
Register your step with the DI container so Spectra knows about it:
This makes the step available by its StepType for both programmatic and JSON-defined workflows.
Using Your Step in a Workflow¶
Builder API¶
var workflow = Spectra.Workflow("analyze-text")
.AddStep("count", new WordCountStep(), inputs: new
{
text = "{{inputs.content}}"
})
.AddPromptStep("summarize", agent: "openai",
prompt: "The text has {{nodes.count.output.count}} words. Summarize: {{inputs.content}}")
.Edge("count", "summarize")
.Build();
JSON Workflow¶
{
"nodes": [
{
"id": "count",
"stepType": "word_count",
"inputs": {
"text": "{{inputs.content}}"
}
}
]
}
Template expressions like {{inputs.content}} and {{nodes.count.output.count}} are resolved by the StateMapper before your step executes.
Reading Inputs Safely¶
The Inputs dictionary contains object? values. Here are patterns for extracting typed data safely:
public Task<StepResult> ExecuteAsync(StepContext context)
{
// String — with fallback
var name = context.Inputs.TryGetValue("name", out var n)
? n?.ToString() ?? "default"
: "default";
// Integer — with parsing
var limit = 10;
if (context.Inputs.TryGetValue("limit", out var l) && l is not null
&& int.TryParse(l.ToString(), out var parsed))
{
limit = parsed;
}
// Boolean
var verbose = context.Inputs.TryGetValue("verbose", out var vb) && vb is true;
// List of strings
var tags = context.Inputs.TryGetValue("tags", out var t) && t is IEnumerable<object> items
? items.Select(x => x?.ToString() ?? "").Where(s => s.Length > 0).ToList()
: new List<string>();
// ... your logic ...
}
Don't Assume Types
Inputs arrive as object?. When workflows are loaded from JSON, a number might be a JsonElement, not an int. Always parse defensively.
Error Handling¶
Expected Errors — Return StepResult.Fail¶
For errors that are part of normal operation (API returned 404, validation failed, data missing):
if (string.IsNullOrEmpty(apiKey))
return StepResult.Fail("API key is required. Set the 'apiKey' input.");
try
{
var result = await _client.FetchAsync(url, context.CancellationToken);
return StepResult.Success(new() { ["data"] = result });
}
catch (HttpRequestException ex)
{
return StepResult.Fail($"API call failed: {ex.Message}", ex);
}
Unexpected Errors — Let Them Propagate¶
The workflow runner wraps step execution in a try/catch. Unhandled exceptions are caught, logged, and turned into failures automatically. You don't need to catch everything.
Cancellation — Always Honor It¶
public async Task<StepResult> ExecuteAsync(StepContext context)
{
foreach (var batch in batches)
{
context.CancellationToken.ThrowIfCancellationRequested();
await ProcessBatchAsync(batch, context.CancellationToken);
}
return StepResult.Success();
}
Advanced: Streaming¶
If your step produces text incrementally, you can stream tokens to the caller:
public async Task<StepResult> ExecuteAsync(StepContext context)
{
var result = new StringBuilder();
if (context.IsStreaming)
{
await foreach (var chunk in GenerateChunksAsync(context.CancellationToken))
{
result.Append(chunk);
await context.OnToken!(chunk, context.CancellationToken);
}
}
else
{
result.Append(await GenerateFullAsync(context.CancellationToken));
}
return StepResult.Success(new() { ["text"] = result.ToString() });
}
When IsStreaming is false, OnToken is null — always check before calling it.
Advanced: Interrupts (Human-in-the-Loop)¶
Pause execution and wait for external input:
public async Task<StepResult> ExecuteAsync(StepContext context)
{
var amount = (decimal)context.Inputs["amount"]!;
if (amount > 10_000)
{
var response = await context.InterruptAsync("large-expense", b => b
.WithTitle("Large Expense Approval")
.WithPayload(new { amount, vendor = context.Inputs["vendor"] }));
if (response.Payload.TryGetValue("approved", out var approved) && approved is false)
return StepResult.Fail("Expense rejected by approver.");
}
// Continue processing...
return StepResult.Success(new() { ["processed"] = true });
}
The engine checkpoints the workflow, suspends, and resumes your step when the response arrives. Your code reads as straight-line logic.
Advanced: Memory Access¶
Read and write to long-term memory that persists across workflow runs:
public async Task<StepResult> ExecuteAsync(StepContext context)
{
if (context.Memory is null)
return StepResult.Fail("Memory store is not configured.");
// Recall
var entry = await context.Memory.GetAsync("preferences", "theme",
context.CancellationToken);
// Store
await context.Memory.SetAsync("preferences", "theme", new MemoryEntry
{
Key = "theme",
Namespace = "preferences",
Content = "dark",
UpdatedAt = DateTimeOffset.UtcNow
}, context.CancellationToken);
return StepResult.Success(new() { ["theme"] = entry?.Content ?? "light" });
}
Advanced: Tracing¶
Add custom spans and tags to the OpenTelemetry trace:
public async Task<StepResult> ExecuteAsync(StepContext context)
{
context.TracingActivity?.SetTag("step.custom.batch_size", batches.Count);
using var childSpan = SpectraActivitySource.Source.StartActivity("process-batches");
// ... work ...
return StepResult.Success();
}
Testing Your Step¶
Custom steps are easy to unit test — just create a StepContext with the inputs you want:
[Fact]
public async Task WordCountStep_Counts_Words()
{
var step = new WordCountStep();
var context = new StepContext
{
RunId = "test-run",
WorkflowId = "test-workflow",
NodeId = "count",
State = new WorkflowState { WorkflowId = "test" },
CancellationToken = CancellationToken.None,
Inputs = new Dictionary<string, object?>
{
["text"] = "hello world from spectra"
}
};
var result = await step.ExecuteAsync(context);
Assert.Equal(StepStatus.Succeeded, result.Status);
Assert.Equal(4, result.Outputs["count"]);
}
Test Failure Cases Too
Test missing inputs, null values, and cancellation. Your step should handle these gracefully with StepResult.Fail, not with unhandled exceptions.
Quick Reference¶
| Concept | How |
|---|---|
| Identify your step | Set StepType to a unique string |
| Read inputs | context.Inputs.TryGetValue(...) |
| Return success | StepResult.Success(outputs) |
| Return failure | StepResult.Fail(message, exception?) |
| Register | builder.AddStep<YourStep>() |
| Use in workflow | .AddStep("nodeId", instance, inputs: ...) |
| Stream tokens | Check context.IsStreaming, call context.OnToken!() |
| Pause for human | await context.InterruptAsync(reason, config) |
| Access memory | context.Memory?.GetAsync(...) |
| Add tracing | context.TracingActivity?.SetTag(...) |
| Cancel cleanly | context.CancellationToken.ThrowIfCancellationRequested() |