Dynamiq
Dynamiq is a Python framework for building multi-agent LLM systems with SurrealDB vector storage capabilities.
Installation
pip install dynamiq surrealdb openai
docker run -p 8000:8000 surrealdb/surrealdb:latest \
start --user root --pass secret file:/data/db
SurrealDB ≥ v1.5 has built-in HNSW vector indexes, queried with SurrealQL’s <| K |> operator and vector::distance::* functions.
Define the one-time vector table & index
DEFINE TABLE ai_docs SCHEMALESS;
DEFINE FIELD id ON ai_docs TYPE string;
DEFINE FIELD text ON ai_docs TYPE string;
DEFINE FIELD metadata ON ai_docs TYPE object;
DEFINE FIELD embedding ON ai_docs TYPE array;
DEFINE INDEX IF NOT EXISTS ai_docs_vec
ON ai_docs FIELDS embedding
HNSW DIMENSION 1536 DIST COSINE;
SurrealDB nodes for Dynamiq
dynamiq_surreal.py
from dynamiq.nodes.node import ConnectionNode
from typing import List, Dict, Any, Optional
from surrealdb import AsyncSurreal
import hashlib, json, os
import openai
_EMBED_DIM = 1536
def embed(text: str) -> List[float]:
"""Tiny helper – replace with your preferred model."""
resp = openai.Embedding.create(
model="text-embedding-3-small",
input=[text],
dimensions=_EMBED_DIM,
api_key=os.getenv("OPENAI_API_KEY"),
)
return resp["data"][0]["embedding"]
class SurrealConnection:
def __init__(
self,
url: str = "ws://localhost:8000/rpc",
namespace: str = "dynamiq",
database: str = "rag",
user: str = "root",
password: str = "secret",
):
self.url = url
self.namespace = namespace
self.database = database
self.user = user
self.password = password
async def _ensure_table_and_index(self, table_name: str):
async with AsyncSurreal(self.url) as db:
await db.signin({"username": self.user, "password": self.password})
await db.use(self.namespace, self.database)
await db.query(
"""
DEFINE TABLE $table SCHEMALESS;
DEFINE FIELD id ON $table TYPE string;
DEFINE FIELD text ON $table TYPE string;
DEFINE FIELD metadata ON $table TYPE object;
DEFINE FIELD embedding ON $table TYPE array;
DEFINE INDEX IF NOT EXISTS ${table}_vec
ON $table FIELDS embedding
HNSW DIMENSION $dim DIST COSINE;
""",
{
"table": table_name,
"dim": _EMBED_DIM
}
)
class SurrealDocumentWriter(ConnectionNode):
def __init__(
self,
index_name: str,
create_if_not_exist: bool = True,
connection: Optional[SurrealConnection] = None,
**kwargs,
):
super().__init__(**kwargs)
self.index_name = index_name
self.connection = connection or SurrealConnection()
self.create_if_not_exist = create_if_not_exist
async def run(self, documents: List[Dict[str, Any]], **_) -> Dict[str, Any]:
if self.create_if_not_exist:
await self.connection._ensure_table_and_index(self.index_name)
rows = []
for doc in documents:
rows.append({
"id": hashlib.sha1(doc["text"].encode()).hexdigest(),
"text": doc["text"],
"metadata": doc.get("metadata", {}),
"embedding": doc["embedding"] or embed(doc["text"]),
})
async with AsyncSurreal(self.connection.url) as db:
await db.signin({"username": self.connection.user, "password": self.connection.password})
await db.use(self.connection.namespace, self.connection.database)
for r in rows:
await db.create(self.index_name, r)
return {"status": "ok", "count": len(rows)}
class SurrealDocumentRetriever(ConnectionNode):
def __init__(
self,
index_name: str,
top_k: int = 5,
filters: Optional[Dict[str, Any]] = None,
connection: Optional[SurrealConnection] = None,
**kwargs,
):
super().__init__(**kwargs)
self.index_name = index_name
self.top_k = top_k
self.filters = filters or {}
self.connection = connection or SurrealConnection()
async def run(self, embedding: List[float], **_) -> Dict[str, Any]:
filter_params = {}
where_clause = ""
if self.filters:
conditions = []
for i, (k, v) in enumerate(self.filters.items()):
param_name = f"filter_{i}"
conditions.append(f"metadata.{k} == ${param_name}")
filter_params[param_name] = v
where_clause = " AND " + " AND ".join(conditions)
async with AsyncSurreal(self.connection.url) as db:
await db.signin({"username": self.connection.user, "password": self.connection.password})
await db.use(self.connection.namespace, self.connection.database)
result = await db.query(
f"""
SELECT text, metadata,
vector::distance::cosine(embedding, $vec) AS score
FROM {self.index_name}
WHERE embedding <|$top_k|> $vec{where_clause}
ORDER BY score ASC
""",
{
"vec": embedding,
"top_k": self.top_k,
**filter_params
}
)
return {"results": result[0]["result"]}
Unique workflow example: “Snack-Bot”
dynamiq_surreal.py
import asyncio
from dynamiq import Workflow
from dynamiq_surreal import (
SurrealDocumentWriter,
SurrealDocumentRetriever,
)
from dynamiq.nodes.llms import OpenAIChat
async def main():
writer = SurrealDocumentWriter(
index_name="snack_mem",
create_if_not_exist=True,
)
docs = [
{"text": "Hummus is traditionally made from chickpeas.", "embedding": None},
{"text": "Trail mix usually combines nuts, dried fruit, and chocolate.", "embedding": None},
{"text": "Edamame are young soybeans, often served steamed.", "embedding": None},
]
retriever = SurrealDocumentRetriever(
index_name="snack_mem",
top_k=2,
)
llm = OpenAIChat(
prompt_template="""
Answer the question using ONLY the context below.
Context: {{retrieved_docs}}
Question: {{question}}
""".strip()
)
wf = Workflow()
wf.flow.add_nodes(writer, retriever, llm)
wf.flow.connect(writer, retriever, mapping={"output": "documents"})
wf.flow.connect(retriever, llm, mapping={"results": "retrieved_docs"})
wf.flow.set_entry_nodes(retriever, llm)
await writer.run(input_data={"documents": docs})
query = "What legumes are eaten steamed in their pods?"
emb = embed(query)
answer = await wf.run(input_data={
"embedding": emb,
"question": query
})
print(answer["OpenAIChat"])
if __name__ == "__main__":
asyncio.run(main())
What happens
SurrealDocumentWriter embeds & inserts snack facts into snack_mem with an HNSW index.SurrealDocumentRetriever performs a cosine k-NN query on embedding <|2|> and passes the top-2 passages to the LLM.- The LLM answers “Edamame are steamed young soybeans.” grounded by the retrieved context.
Why SurrealDB for Dynamiq?
| Feature | Benefit |
|---|
| Unified store | Keep vectors, relational data, graph edges & real-time queries in one DB. |
| Native ANN | DEFINE INDEX … HNSW avoids an extra service. |
| SurrealQL filters | Mix vector similarity with rich metadata predicates (WHERE metadata.cuisine = ‘asian’ AND …). |
Copy the snippets above into your Dynamiq repo and you’re ready to build end-to-end RAG and agent flows with SurrealDB as the memory spine. Happy hacking!
Resources