Connectors

Snowflake and Databricks

Sync structured knowledge from data warehouses into Spectron.

Data warehouse connectors let you pull structured records from Snowflake or Databricks directly into Spectron's authoritative knowledge store. Each record in the query result becomes a typed knowledge node, giving your agents instant access to product catalogues, customer records, pricing tables, or any other structured data your warehouse holds.

When a warehouse connector syncs, Spectron:

  1. Connects to the warehouse using the credentials in config

  2. Executes the configured SQL query (or scans the referenced table)

  3. Maps each record to a knowledge node – column names become field keys, column values are stored with their native types

  4. Computes a deterministic node ID from the primary-key columns to enable idempotent re-syncs

  5. Updates or creates nodes as needed; records present in the previous sync but absent from the current one are soft-deleted

Because warehouse data is already structured, no chunking or embedding pipeline is applied. Retrieval against warehouse-sourced nodes uses direct field-level lookup and BM25 keyword search; vector search is available if you configure embedding for specific text columns.

FieldTypeDescription
accountstringSnowflake account identifier (e.g. xy12345.eu-west-1)
warehousestringVirtual warehouse to use for query execution
databasestringTarget database
schemastringTarget schema
usernamestringSnowflake username
passwordstringSnowflake password (store in your secrets manager; Spectron does not log credential values)
querystringSQL query whose result set is synced
primary_keysarray of stringsColumn names that uniquely identify a record; used to compute node IDs
import os
from spectron import Spectron

memory = Spectron(
context="acme-prod",
base_url="https://api.spectron.dev",
api_key=os.environ["SPECTRON_API_KEY"],
)

connector = await memory.connectors.create(
kind="snowflake",
config={
"account": "xy12345.eu-west-1",
"warehouse": "COMPUTE_WH",
"database": "PROD",
"schema": "PUBLIC",
"username": os.environ["SNOWFLAKE_USER"],
"password": os.environ["SNOWFLAKE_PASSWORD"],
"query": "SELECT * FROM products WHERE active = TRUE",
"primary_keys": ["product_id"],
},
scope={"org": "acme"},
sync_schedule="0 */6 * * *",
)
POST /api/v1/acme-prod/connectors
Content-Type: application/json

{
"kind": "snowflake",
"config": {
"account": "xy12345.eu-west-1",
"warehouse": "COMPUTE_WH",
"database": "PROD",
"schema": "PUBLIC",
"username": "spectron_user",
"password": "...",
"query": "SELECT * FROM products WHERE active = TRUE",
"primary_keys": ["product_id"]
},
"scope": { "org": "acme" },
"sync_schedule": "0 */6 * * *"
}
FieldTypeDescription
hoststringDatabricks workspace hostname (e.g. adb-1234567890.12.azuredatabricks.net)
http_pathstringHTTP path to the SQL warehouse (e.g. /sql/1.0/warehouses/abc123)
tokenstringDatabricks personal access token or service principal token
querystringSQL query whose result set is synced
primary_keysarray of stringsColumn names that uniquely identify a record
catalogstring(optional) Unity Catalog name
schemastring(optional) Schema name
connector = await memory.connectors.create(
kind="databricks",
config={
"host": "adb-1234567890.12.azuredatabricks.net",
"http_path": "/sql/1.0/warehouses/abc123",
"token": os.environ["DATABRICKS_TOKEN"],
"query": "SELECT sku, name, description, price, category FROM catalogue.products",
"primary_keys": ["sku"],
},
scope={"org": "acme"},
sync_schedule="0 3 * * *",
)
POST /api/v1/acme-prod/connectors
Content-Type: application/json

{
"kind": "databricks",
"config": {
"host": "adb-1234567890.12.azuredatabricks.net",
"http_path": "/sql/1.0/warehouses/abc123",
"token": "...",
"query": "SELECT sku, name, description, price, category FROM catalogue.products",
"primary_keys": ["sku"]
},
"scope": { "org": "acme" },
"sync_schedule": "0 3 * * *"
}

Given a query result like:

product_idnamepricecategory
P001Wireless Headphones149.99Electronics
P002Standing Desk599.00Furniture

Spectron creates two knowledge nodes:

{
"id": "knowledge:[\"connector:01jt...\", \"P001\"]",
"source": "connector:01jt...",
"fields": {
"product_id": "P001",
"name": "Wireless Headphones",
"price": 149.99,
"category": "Electronics"
},
"scope": { "org": "acme" }
}

The node ID is stable across syncs: if record P001 changes in the warehouse, the existing node is updated rather than replaced, preserving any cross-references from experiential memory.

For large tables, retrieve only records that have changed since the last sync by parameterising the query with the connector's last_sync_at timestamp. Spectron exposes this value in the connector status response:

status = await memory.connectors.status(connector.id)
last_sync = status.last_sync_at # ISO 8601 datetime or None

Use it to build an incremental query:

if last_sync:
query = f"SELECT * FROM orders WHERE updated_at > '{last_sync.isoformat()}'"
else:
query = "SELECT * FROM orders"

await memory.connectors.update(connector.id, config={"query": query})
await memory.connectors.sync(connector.id)

A cleaner pattern is to store the watermark in your warehouse query itself using a Snowflake dynamic table or a Databricks Delta table with TIMESTAMP_DIFF filtering. This keeps the connector configuration static while the query automatically narrows its window.

Data typeRecommended schedule
Reference data (products, categories)Every 6–24 hours
Transactional records (orders, events)Every 30–60 minutes
Real-time operational dataUse the REST API for push-based ingestion instead

Warehouse connectors are pull-based and incur compute cost on the warehouse side. Avoid polling intervals shorter than 15 minutes unless the data volume is small and the warehouse is lightly loaded.

# List all connectors
connectors = await memory.connectors.list()

# Update the query
await memory.connectors.update(
connector.id,
config={"query": "SELECT * FROM products WHERE active = TRUE AND price > 0"},
)

# Pause automatic sync while retaining configuration
await memory.connectors.update(connector.id, sync_schedule=None)

# Trigger a manual sync
await memory.connectors.sync(connector.id)

# Delete the connector; pass retain_data=False to also remove synced knowledge nodes
await memory.connectors.delete(connector.id)

Was this page helpful?