Patterns

Stateful workflows

Diff-friendly state for workflow UIs.

Spectron is not limited to conversational memory. Its entity-attribute model and temporal validity system make it a natural fit for tracking the state of long-running, multi-step agent workflows – processes that span hours or days, survive restarts, and need to avoid repeating completed steps.

Traditional approaches to workflow state – Redis keys, database records, task queue metadata – require purpose-built state management. Spectron adds:

  • Scoped isolation – workflow state for project A cannot interfere with project B.

  • Temporal validity – step results can expire, triggering re-execution.

  • Provenance – every state change traces back to the turn or operation that caused it.

  • State diff – query what changed since a checkpoint without building your own diffing logic.

  • Observability – inspect the full state of any workflow at any point in time.

Map your workflow steps to entity types and attributes. A research workflow might look like:

Entity typeExample entitiesKey attributes
ResearchTaskmarket_analysis_q1status, assigned_model, deadline
SearchResultresult_001query, url, summary, relevance_score
Reportmarket_report_draftstatus, word_count, sections_complete

Use a project or task scope dimension to isolate each workflow's state:

from spectron import Spectron

client = Spectron(api_key="sk-...")
memory = client.memory(context_id="workflows")

# Each workflow run gets its own scope
workflow_scope = {"org": "acme", "project": "market-analysis-q1-2025"}

session = await memory.sessions.create(scope=workflow_scope)
import { Spectron } from "spectron";

const client = new Spectron({ apiKey: "sk-..." });
const memory = client.memory({ contextId: "workflows" });

const workflowScope = { org: "acme", project: "market-analysis-q1-2025" };
const session = await memory.sessions.create({ scope: workflowScope });

Use turns to record what each step did. The extraction pipeline stores the results as Context-category attributes on the relevant entities.

async def run_search_step(session, query: str) -> list[dict]:
results = await your_search_api(query)

# Record the step as an agent turn – extraction captures the results
await session.turn(
role="assistant",
content=f"Completed search for '{query}'. Found {len(results)} results. "
f"Top result: {results[0]['url']}{results[0]['summary'][:200]}",
)

return results
async function runSearchStep(session: Session, query: string): Promise<SearchResult[]> {
const results = await yourSearchApi(query);

await session.turn({
role: "assistant",
content: `Completed search for '${query}'. Found ${results.length} results. `
+ `Top result: ${results[0].url}${results[0].summary.slice(0, 200)}`,
});

return results;
}

Before running a step, check whether it has already been completed. This makes the workflow idempotent – safe to restart without duplicating work.

async def should_run_step(memory, scope: dict, step_name: str) -> bool:
ctx = await memory.context(
scope=scope,
query=f"Has the {step_name} step been completed?",
top_k=3,
)

# Check if there is an existing completion record for this step
for item in ctx.items:
if item.entity_type == "WorkflowStep" and item.attributes.get("name") == step_name:
if item.attributes.get("status") == "completed":
return False

return True
async function shouldRunStep(
memory: Memory,
scope: Record<string, string>,
stepName: string,
): Promise<boolean> {
const ctx = await memory.context({
scope,
query: `Has the ${stepName} step been completed?`,
topK: 3,
});

for (const item of ctx.items) {
if (
item.entityType === "WorkflowStep"
&& item.attributes.name === stepName
&& item.attributes.status === "completed"
) {
return false;
}
}

return true;
}

Some workflow steps have results that go stale – a price lookup, a news summary, a resource availability check. Set valid_until on the turn to give the extracted attributes a time-to-live:

POST /api/v1/{context_id}/sessions/{session_id}/turns
Content-Type: application/json

{
"role": "assistant",
"content": "Fetched current gold price: $2,340/oz",
"metadata": {
"valid_until": "2025-11-16T00:00:00Z"
}
}

When the validity period expires, the attribute no longer appears in context retrievals and should_run_step returns true again, triggering a re-fetch.

The state diff endpoint returns the delta between two points in time for a scope. Use it to build progress tracking UIs or detect stalled workflows:

# Get all state changes in the last hour
diff = await memory.state.diff(
scope=workflow_scope,
since="2025-11-15T09:00:00Z",
)

print(f"Steps completed: {diff.attributes_updated}")
print(f"New entities: {diff.entities_created}")
print(f"Open uncertainties: {diff.uncertainties_added}")
const diff = await memory.state.diff({
scope: workflowScope,
since: "2025-11-15T09:00:00Z",
});

console.log(`Steps completed: ${diff.attributesUpdated}`);
console.log(`New entities: ${diff.entitiesCreated}`);
console.log(`Open uncertainties: ${diff.uncertaintiesAdded}`);

The diff is useful for:

  • Progress bars – count completed steps vs total expected.

  • Stall detection – alert if no state changes have occurred in N minutes.

  • Audit trails – record what changed during a workflow run for compliance.

async def run_research_workflow(user_id: str, topic: str):
scope = {"org": "acme", "project": f"research-{topic.replace(' ', '-')}"}
session = await memory.sessions.create(scope=scope)

steps = [
("web_search", lambda: run_search_step(session, topic)),
("summarise", lambda: run_summarise_step(session, topic)),
("draft_report", lambda: run_draft_step(session, topic)),
]

for step_name, step_fn in steps:
if await should_run_step(memory, scope, step_name):
await step_fn()
else:
print(f"Skipping {step_name} – already completed.")

# Final state summary
state = await memory.state.get(scope=scope)
return state
async function runResearchWorkflow(userId: string, topic: string) {
const scope = { org: "acme", project: `research-${topic.replace(/ /g, "-")}` };
const session = await memory.sessions.create({ scope });

const steps = [
{ name: "web_search", fn: () => runSearchStep(session, topic) },
{ name: "summarise", fn: () => runSummariseStep(session, topic) },
{ name: "draft_report", fn: () => runDraftStep(session, topic) },
];

for (const { name, fn } of steps) {
if (await shouldRunStep(memory, scope, name)) {
await fn();
} else {
console.log(`Skipping ${name} – already completed.`);
}
}

const state = await memory.state.get({ scope });
return state;
}

Was this page helpful?