Dynamiq

Dynamiq is a Python framework for building multi-agent LLM systems with SurrealDB vector storage capabilities.

Installation

pip install dynamiq surrealdb openai         # swap OpenAI for any embedder you like
# one-liner dev DB
docker run -p 8000:8000 surrealdb/surrealdb:latest \
start --user root --pass secret file:/data/db

SurrealDB ≥ v1.5 has built-in HNSW vector indexes, queried with SurrealQL's <| K |> operator and vector::distance::* functions.

Define the one-time vector table & index

-- run this once in your DB console
DEFINE TABLE ai_docs SCHEMALESS;
DEFINE FIELD id ON ai_docs TYPE string;
DEFINE FIELD text ON ai_docs TYPE string;
DEFINE FIELD metadata ON ai_docs TYPE object;
DEFINE FIELD embedding ON ai_docs TYPE array;

-- Fast ANN (1536-D cosine; change DIST/ DIMENSION to fit your embedder)
DEFINE INDEX IF NOT EXISTS ai_docs_vec
ON ai_docs FIELDS embedding
HNSW DIMENSION 1536 DIST COSINE;

SurrealDB nodes for Dynamiq

dynamiq_surreal.py

from dynamiq.nodes.node import ConnectionNode
from typing import List, Dict, Any, Optional
from surrealdb import AsyncSurreal
import hashlib, json, os
import openai # or any local embedding model

_EMBED_DIM = 1536

def embed(text: str) -> List[float]:
"""Tiny helper – replace with your preferred model."""
resp = openai.Embedding.create(
model="text-embedding-3-small",
input=[text],
dimensions=_EMBED_DIM,
api_key=os.getenv("OPENAI_API_KEY"),
)
return resp["data"][0]["embedding"]


# ---------------------------------------------------------------------#
# A lightweight Connection object so Dynamiq can share sockets
# ---------------------------------------------------------------------#
class SurrealConnection:
def __init__(
self,
url: str = "ws://localhost:8000/rpc",
namespace: str = "dynamiq",
database: str = "rag",
user: str = "root",
password: str = "secret",
):
self.url = url
self.namespace = namespace
self.database = database
self.user = user
self.password = password

async def _ensure_table_and_index(self, table_name: str):
async with AsyncSurreal(self.url) as db:
await db.signin({"username": self.user, "password": self.password})
await db.use(self.namespace, self.database)
await db.query(
"""
DEFINE TABLE $table SCHEMALESS;
DEFINE FIELD id ON $table TYPE string;
DEFINE FIELD text ON $table TYPE string;
DEFINE FIELD metadata ON $table TYPE object;
DEFINE FIELD embedding ON $table TYPE array;
DEFINE INDEX IF NOT EXISTS ${table}_vec
ON $table FIELDS embedding
HNSW DIMENSION $dim DIST COSINE;
""",
{
"table": table_name,
"dim": _EMBED_DIM
}
)


class SurrealDocumentWriter(ConnectionNode):
def __init__(
self,
index_name: str,
create_if_not_exist: bool = True,
connection: Optional[SurrealConnection] = None,
**kwargs,
):
super().__init__(**kwargs)
self.index_name = index_name
self.connection = connection or SurrealConnection()
self.create_if_not_exist = create_if_not_exist

async def run(self, documents: List[Dict[str, Any]], **_) -> Dict[str, Any]:
if self.create_if_not_exist:
await self.connection._ensure_table_and_index(self.index_name)

rows = []
for doc in documents:
rows.append({
"id": hashlib.sha1(doc["text"].encode()).hexdigest(),
"text": doc["text"],
"metadata": doc.get("metadata", {}),
"embedding": doc["embedding"] or embed(doc["text"]),
})

async with AsyncSurreal(self.connection.url) as db:
await db.signin({"username": self.connection.user, "password": self.connection.password})
await db.use(self.connection.namespace, self.connection.database)
for r in rows:
await db.create(self.index_name, r)
return {"status": "ok", "count": len(rows)}


class SurrealDocumentRetriever(ConnectionNode):
def __init__(
self,
index_name: str,
top_k: int = 5,
filters: Optional[Dict[str, Any]] = None,
connection: Optional[SurrealConnection] = None,
**kwargs,
):
super().__init__(**kwargs)
self.index_name = index_name
self.top_k = top_k
self.filters = filters or {}
self.connection = connection or SurrealConnection()

async def run(self, embedding: List[float], **_) -> Dict[str, Any]:
# Build filter conditions
filter_params = {}
where_clause = ""
if self.filters:
conditions = []
for i, (k, v) in enumerate(self.filters.items()):
param_name = f"filter_{i}"
conditions.append(f"metadata.{k} == ${param_name}")
filter_params[param_name] = v
where_clause = " AND " + " AND ".join(conditions)

async with AsyncSurreal(self.connection.url) as db:
await db.signin({"username": self.connection.user, "password": self.connection.password})
await db.use(self.connection.namespace, self.connection.database)
result = await db.query(
f"""
SELECT text, metadata,
vector::distance::cosine(embedding, $vec) AS score
FROM {self.index_name}
WHERE embedding <|$top_k|> $vec{where_clause}
ORDER BY score ASC
""",
{
"vec": embedding,
"top_k": self.top_k,
**filter_params
}
)
return {"results": result[0]["result"]}

Unique workflow example: "Snack-Bot"

dynamiq_surreal.py

import asyncio
from dynamiq import Workflow
from dynamiq_surreal import (
SurrealDocumentWriter,
SurrealDocumentRetriever,
)
from dynamiq.nodes.llms import OpenAIChat

async def main():
# --- 1. Writer node ingests knowledge ---------------------------------
writer = SurrealDocumentWriter(
index_name="snack_mem",
create_if_not_exist=True,
)

docs = [
{"text": "Hummus is traditionally made from chickpeas.", "embedding": None},
{"text": "Trail mix usually combines nuts, dried fruit, and chocolate.", "embedding": None},
{"text": "Edamame are young soybeans, often served steamed.", "embedding": None},
]

# --- 2. Retriever node pulls context for the LLM ----------------------
retriever = SurrealDocumentRetriever(
index_name="snack_mem",
top_k=2,
)

# --- 3. LLM node ------------------------------------------------------
llm = OpenAIChat(
prompt_template="""
Answer the question using ONLY the context below.
Context: {{retrieved_docs}}
Question: {{question}}
""".strip()
)

# --- 4. Build workflow ------------------------------------------------
wf = Workflow()
wf.flow.add_nodes(writer, retriever, llm)

# wire outputs/inputs
wf.flow.connect(writer, retriever, mapping={"output": "documents"}) # passive write
wf.flow.connect(retriever, llm, mapping={"results": "retrieved_docs"})
wf.flow.set_entry_nodes(retriever, llm)

# --- 5. Run -----------------------------------------------------------
# step-A: ingest once
await writer.run(input_data={"documents": docs})

# step-B: query + answer
query = "What legumes are eaten steamed in their pods?"
emb = embed(query) # same helper as above
answer = await wf.run(input_data={
"embedding": emb,
"question": query
})
print(answer["OpenAIChat"])

if __name__ == "__main__":
asyncio.run(main())

What happens

  1. SurrealDocumentWriter embeds & inserts snack facts into snack_mem with an HNSW index.

  2. SurrealDocumentRetriever performs a cosine k-NN query on embedding <|2|> and passes the top-2 passages to the LLM.

  3. The LLM answers "Edamame are steamed young soybeans." grounded by the retrieved context.

Why SurrealDB for Dynamiq?

FeatureBenefit
Unified storeKeep vectors, relational data, graph edges & real-time queries in one DB.
Native ANNDEFINE INDEX … HNSW avoids an extra service.
SurrealQL filtersMix vector similarity with rich metadata predicates (WHERE metadata.cuisine = 'asian' AND …).

Copy the snippets above into your Dynamiq repo and you're ready to build end-to-end RAG and agent flows with SurrealDB as the memory spine. Happy hacking!

Resources