Dynamiq is a Python framework for building multi-agent LLM systems with SurrealDB vector storage capabilities.
pip install dynamiq surrealdb openai # swap OpenAI for any embedder you like # one-liner dev DB docker run -p 8000:8000 surrealdb/surrealdb:latest \ start --user root --pass root file:/data/db
SurrealDB ≥ v1.5 has built-in HNSW / M-Tree vector indexes, queried with SurrealQL’s <| K |>
operator and vector::distance::*
functions.
-- run this once in your DB console DEFINE TABLE ai_docs SCHEMALESS; DEFINE FIELD id ON ai_docs TYPE string; DEFINE FIELD text ON ai_docs TYPE string; DEFINE FIELD metadata ON ai_docs TYPE object; DEFINE FIELD embedding ON ai_docs TYPE array; -- Fast ANN (1536-D cosine; change DIST/ DIMENSION to fit your embedder) DEFINE INDEX ai_docs_vec ON ai_docs FIELDS embedding HNSW DIMENSION 1536 DIST COSINE IF NOT EXISTS;
dynamiq_surreal.pyfrom dynamiq.nodes.node import ConnectionNode from typing import List, Dict, Any, Optional from surrealdb import AsyncSurreal import hashlib, json, os import openai # or any local embedding model _EMBED_DIM = 1536 def embed(text: str) -> List[float]: """Tiny helper – replace with your preferred model.""" resp = openai.Embedding.create( model="text-embedding-3-small", input=[text], dimensions=_EMBED_DIM, api_key=os.getenv("OPENAI_API_KEY"), ) return resp["data"][0]["embedding"] # ---------------------------------------------------------------------# # A lightweight Connection object so Dynamiq can share sockets # ---------------------------------------------------------------------# class SurrealConnection: def __init__( self, url: str = "ws://localhost:8000/rpc", namespace: str = "dynamiq", database: str = "rag", user: str = "root", password: str = "root", ): self.url = url self.namespace = namespace self.database = database self.user = user self.password = password async def _ensure_table_and_index(self, table_name: str): async with AsyncSurreal(self.url) as db: await db.signin({"username": self.user, "password": self.password}) await db.use(self.namespace, self.database) await db.query( """ DEFINE TABLE $table SCHEMALESS; DEFINE FIELD id ON $table TYPE string; DEFINE FIELD text ON $table TYPE string; DEFINE FIELD metadata ON $table TYPE object; DEFINE FIELD embedding ON $table TYPE array; DEFINE INDEX ${table}_vec ON $table FIELDS embedding HNSW DIMENSION $dim DIST COSINE IF NOT EXISTS; """, { "table": table_name, "dim": _EMBED_DIM } ) class SurrealDocumentWriter(ConnectionNode): def __init__( self, index_name: str, create_if_not_exist: bool = True, connection: Optional[SurrealConnection] = None, **kwargs, ): super().__init__(**kwargs) self.index_name = index_name self.connection = connection or SurrealConnection() self.create_if_not_exist = create_if_not_exist async def run(self, documents: List[Dict[str, Any]], **_) -> Dict[str, Any]: if self.create_if_not_exist: await self.connection._ensure_table_and_index(self.index_name) rows = [] for doc in documents: rows.append({ "id": hashlib.sha1(doc["text"].encode()).hexdigest(), "text": doc["text"], "metadata": doc.get("metadata", {}), "embedding": doc["embedding"] or embed(doc["text"]), }) async with AsyncSurreal(self.connection.url) as db: await db.signin({"username": self.connection.user, "password": self.connection.password}) await db.use(self.connection.namespace, self.connection.database) for r in rows: await db.create(self.index_name, r) return {"status": "ok", "count": len(rows)} class SurrealDocumentRetriever(ConnectionNode): def __init__( self, index_name: str, top_k: int = 5, filters: Optional[Dict[str, Any]] = None, connection: Optional[SurrealConnection] = None, **kwargs, ): super().__init__(**kwargs) self.index_name = index_name self.top_k = top_k self.filters = filters or {} self.connection = connection or SurrealConnection() async def run(self, embedding: List[float], **_) -> Dict[str, Any]: # Build filter conditions filter_params = {} where_clause = "" if self.filters: conditions = [] for i, (k, v) in enumerate(self.filters.items()): param_name = f"filter_{i}" conditions.append(f"metadata.{k} == ${param_name}") filter_params[param_name] = v where_clause = " AND " + " AND ".join(conditions) async with AsyncSurreal(self.connection.url) as db: await db.signin({"username": self.connection.user, "password": self.connection.password}) await db.use(self.connection.namespace, self.connection.database) result = await db.query( f""" SELECT text, metadata, vector::distance::cosine(embedding, $vec) AS score FROM {self.index_name} WHERE embedding <|$top_k|> $vec{where_clause} ORDER BY score ASC """, { "vec": embedding, "top_k": self.top_k, **filter_params } ) return {"results": result[0]["result"]}
dynamiq_surreal.pyimport asyncio from dynamiq import Workflow from dynamiq_surreal import ( SurrealDocumentWriter, SurrealDocumentRetriever, ) from dynamiq.nodes.llms import OpenAIChat async def main(): # --- 1. Writer node ingests knowledge --------------------------------- writer = SurrealDocumentWriter( index_name="snack_mem", create_if_not_exist=True, ) docs = [ {"text": "Hummus is traditionally made from chickpeas.", "embedding": None}, {"text": "Trail mix usually combines nuts, dried fruit, and chocolate.", "embedding": None}, {"text": "Edamame are young soybeans, often served steamed.", "embedding": None}, ] # --- 2. Retriever node pulls context for the LLM ---------------------- retriever = SurrealDocumentRetriever( index_name="snack_mem", top_k=2, ) # --- 3. LLM node ------------------------------------------------------ llm = OpenAIChat( prompt_template=""" Answer the question using ONLY the context below. Context: {{retrieved_docs}} Question: {{question}} """.strip() ) # --- 4. Build workflow ------------------------------------------------ wf = Workflow() wf.flow.add_nodes(writer, retriever, llm) # wire outputs/inputs wf.flow.connect(writer, retriever, mapping={"output": "documents"}) # passive write wf.flow.connect(retriever, llm, mapping={"results": "retrieved_docs"}) wf.flow.set_entry_nodes(retriever, llm) # --- 5. Run ----------------------------------------------------------- # step-A: ingest once await writer.run(input_data={"documents": docs}) # step-B: query + answer query = "What legumes are eaten steamed in their pods?" emb = embed(query) # same helper as above answer = await wf.run(input_data={ "embedding": emb, "question": query }) print(answer["OpenAIChat"]) if __name__ == "__main__": asyncio.run(main())
What happens
SurrealDocumentWriter
embeds & inserts snack facts into snack_mem
with an HNSW index.SurrealDocumentRetriever
performs a cosine k-NN query on embedding <|2|>
and passes the top-2 passages to the LLM.Feature | Benefit |
---|---|
Unified store | Keep vectors, relational data, graph edges & real-time queries in one DB. |
Native ANN | DEFINE INDEX … HNSW avoids an extra service. |
SurrealQL filters | Mix vector similarity with rich metadata predicates (WHERE metadata.cuisine = ‘asian’ AND … ). |
Copy the snippets above into your Dynamiq repo and you’re ready to build end-to-end RAG and agent flows with SurrealDB as the memory spine. Happy hacking!