Ingestion

Bulk import

Importing large quantities of documents or knowledge nodes into Spectron.

When populating a new Context or migrating from an existing system, you typically need to ingest many documents or knowledge nodes at once. Spectron's ingestion pipeline is designed for concurrent usage, and both the document upload endpoint and the node batch upsert endpoint support high-throughput ingestion patterns.

POST /api/v1/{context_id}/documents accepts one document per request. For bulk uploads, issue multiple requests concurrently and track their status independently.

import asyncio
from spectron import Spectron

memory = Spectron(context="acme-prod", api_key=os.environ["SPECTRON_API_KEY"])

files = [
("returns-policy.pdf", "Returns Policy"),
("shipping-guide.pdf", "Shipping Guide"),
("warranty-terms.pdf", "Warranty Terms"),
("product-manual.pdf", "Product Manual"),
]

async def upload_file(path, title):
with open(path, "rb") as f:
return await memory.knowledge.upload(
file=f,
title=title,
profile="text_only",
scope={"org": "acme"},
)

docs = await asyncio.gather(*[upload_file(path, title) for path, title in files])
doc_ids = [doc.id for doc in docs]
print(f"Queued {len(doc_ids)} documents")
import { Spectron } from "spectron";

const memory = new Spectron({ context: "acme-prod", apiKey: process.env.SPECTRON_API_KEY });

const files = [
{ path: "returns-policy.pdf", title: "Returns Policy" },
{ path: "shipping-guide.pdf", title: "Shipping Guide" },
];

const uploads = files.map(({ file, title }) =>
memory.knowledge.upload({ file, title, profile: "text_only", scope: { org: "acme" } })
);

const docs = await Promise.all(uploads);
const docIds = docs.map(d => d.id);

After queuing a batch, poll all document IDs until every document is ready or failed:

async def wait_for_all(doc_ids):
pending = set(doc_ids)
failed = []

while pending:
await asyncio.sleep(5)
for doc_id in list(pending):
doc = await memory.knowledge.get(doc_id)
if doc.status == "ready":
pending.discard(doc_id)
print(f"{doc_id}: ready")
elif doc.status == "failed":
pending.discard(doc_id)
failed.append((doc_id, doc.error))
print(f"{doc_id}: failed – {doc.error}")

return failed

failed = await wait_for_all(doc_ids)
if failed:
print(f"{len(failed)} documents failed processing")

Spectron applies rate limits per API key. If you are ingesting thousands of documents, introduce a semaphore to limit concurrency:

semaphore = asyncio.Semaphore(10)  # max 10 concurrent uploads

async def upload_with_limit(path, title):
async with semaphore:
return await upload_file(path, title)

docs = await asyncio.gather(*[upload_with_limit(path, title) for path, title in files])

The recommended concurrency ceiling for standard deployments is 10–20 concurrent uploads. Self-hosted deployments can be tuned according to your infrastructure capacity.

For structured catalogue or policy data you already trust, use POST /api/v1/{context_id}/facts with infer: "triples" (or batch multiple utterances via /facts/batch). The reconciler persists entities, attributes, and relations in the unified graph with source.kind = "document" or operator-provided provenance.

POST /api/v1/{context_id}/facts
Content-Type: application/json
API-KEY: <key>

{
"text": "Product Widget A (sku_001) costs 29.99 and belongs to category widgets.",
"infer": "triples",
"scope": { "org": "acme" }
}

Response:

{
"created": 3,
"updated": 0,
"relations_created": 2
}
# Build nodes and relations from your data source
nodes = [
{"kind": "product", "slug": record["sku"], "title": record["name"], "content": record}
for record in product_catalogue
]

relations = [
{"in": ("product", record["sku"]), "out": ("category", record["category_slug"]), "label": "belongs_to"}
for record in product_catalogue
if record.get("category_slug")
]

# Upsert in batches of 1000
BATCH_SIZE = 1000
for i in range(0, len(nodes), BATCH_SIZE):
batch_nodes = nodes[i:i + BATCH_SIZE]
batch_relations = [r for r in relations if any(
r["in"][1] == n["slug"] for n in batch_nodes
)]
result = await memory.knowledge.nodes.upsert(
nodes=batch_nodes,
relations=batch_relations,
)
print(f"Batch {i // BATCH_SIZE + 1}: created={result.created}, updated={result.updated}")
const BATCH_SIZE = 1000;

for (let i = 0; i < nodes.length; i += BATCH_SIZE) {
const batchNodes = nodes.slice(i, i + BATCH_SIZE);
const result = await memory.knowledge.nodes.upsert({ nodes: batchNodes, relations: [] });
console.log(`Batch ${i / BATCH_SIZE + 1}: created=${result.created}`);
}

Document uploads are automatically deduplicated by content hash. If the same file is submitted multiple times during a bulk import – for example, because a script is re-run after a partial failure – each duplicate returns the existing document ID with deduplicated: true and no reprocessing occurs.

Node upserts are deduplicated by (kind, slug). Resubmitting a node with the same kind and slug updates its title and content in place.

These properties make bulk imports safe to re-run. A failed or interrupted import can be restarted from the beginning without creating duplicate records.

All documents uploaded in a bulk import share the same scope unless you specify it per-document. For mixed-scope imports – for example, some documents are org-level and others are user-level – structure your upload loop to set scope per file:

async def upload_with_scope(item):
with open(item["path"], "rb") as f:
return await memory.knowledge.upload(
file=f,
title=item["title"],
profile=item.get("profile", "text_only"),
scope=item["scope"],
)

items = [
{"path": "handbook.pdf", "title": "Employee Handbook", "scope": {"org": "acme"}},
{"path": "preferences.json", "title": "User Prefs", "scope": {"user": "alice", "org": "acme"}},
]

docs = await asyncio.gather(*[upload_with_scope(item) for item in items])

For imports of tens of thousands of documents, track overall progress by listing documents with a status filter:

async def import_progress():
ready = await memory.knowledge.list(status="ready", scope={"org": "acme"})
queued = await memory.knowledge.list(status="queued", scope={"org": "acme"})
failed = await memory.knowledge.list(status="failed", scope={"org": "acme"})

print(f"Ready: {len(ready)} Queued: {len(queued)} Failed: {len(failed)}")

Failed documents should be inspected individually to determine whether the failure is transient (pipeline overload) or permanent (corrupt file, unsupported format):

failed_docs = await memory.knowledge.list(status="failed", scope={"org": "acme"})
for doc in failed_docs:
full = await memory.knowledge.get(doc.id)
print(f"{doc.id} {full.title} {full.error}")

Was this page helpful?