SurrealDB
SurrealDB Docs Logo

Enter a search query

CrewAI

CrewAI lets you orchestrate role-playing AI agents that collaborate to complete complex tasks.
This guide shows how to use SurrealDB >v2.0.0 as the memory backend for CrewAI, giving agents:

  • Entity memory for domain objects (products, people, places)
  • Short-term memory for recent conversations
  • Vector search to recall relevant information by semantic similarity

The sample below creates two agents that recommend music festival trips:

  1. Researcher – finds festivals and saves them to SurrealDB
  2. Planner – queries those saved festivals to build a weekend itinerary

Install

# CrewAI, SurrealDB Python SDK, and an embedder (OpenAI here; swap if you like) pip install "crewai[tools]" surrealdb openai # (optional) run SurrealDB locally – single binary, no deps docker run --pull always -p 8000:8000 surrealdb/surrealdb:latest \ start --user root --pass root file:/data/db

SurrealDB v2 ships native HNSW and M-Tree indexes, so you get ANN vector search without an extra service.

SurrealDB storage adapter

Create src/mycrew/surreal_storage.py:

from __future__ import annotations import asyncio, hashlib, logging, os, threading from typing import Any, Dict, List, Optional import openai from crewai.memory.storage.rag_storage import RAGStorage from surrealdb import Surreal, AsyncSurreal logger = logging.getLogger(__name__) _EMBED_DIM = 1536 # OpenAI text-embedding-3-small # ────────────────────────── embeddings ────────────────────────── def _embed(text: str, api_key: Optional[str] = None, model: str = "text-embedding-3-small", dimensions: int = _EMBED_DIM) -> List[float]: """Return an L2-normalised embedding vector.""" resp = openai.Embedding.create( model=model, input=[text], api_key=api_key or os.getenv("OPENAI_API_KEY"), dimensions=dimensions, ) vec = resp["data"][0]["embedding"] norm = sum(x * x for x in vec) ** 0.5 or 1.0 return [x / norm for x in vec] # ─────────────────────────── storage ──────────────────────────── class SurrealStorage(RAGStorage): """CrewAI RAGStorage backend backed by SurrealDB v2.""" _lock = threading.Lock() _tables_ready: set[str] = set() def __init__( self, typ: str, *, allow_reset: bool = True, embedder_config: Optional[Dict[str, Any]] = None, crew: Optional[Any] = None, url: str = "ws://localhost:8000/rpc", namespace: str = "crew", database: str = "memories", user: str = "root", password: str = "root", ): super().__init__(typ, allow_reset, embedder_config, crew) self.url, self.ns, self.db = url, namespace, database self.user, self.pw = user, password self.table = f"mem_{typ.replace('-', '_')}" self._adb = AsyncSurreal(url) # async client self._sdb = Surreal(url) # sync client (DDL) self._embed_api_key = (embedder_config or {}).get("api_key") # ─────────── helpers ─────────── def _run(self, coro): """Run *coro* in any context (sync script or async crew).""" try: loop = asyncio.get_running_loop() if loop.is_running(): return asyncio.ensure_future(coro) except RuntimeError: pass return asyncio.run(coro) # ─────────── sync API (called by CrewAI) ─────────── def save(self, value: Any, metadata: Dict[str, Any]): self._run(self._save_async(value, metadata)) def search( self, query: str, limit: int = 3, filter: Optional[Dict[str, Any]] = None, score_threshold: float = 0.0, ): return self._run( self._search_async(query, limit, filter, score_threshold) ) def reset(self): self._run(self._reset_async()) # ─────────── async internals ─────────── async def _save_async(self, value: Any, metadata: Dict[str, Any]): await self._ensure_schema() vec = _embed(str(value), self._embed_api_key) rec = { "id": hashlib.sha1(str(value).encode()).hexdigest(), "text": value, "metadata": metadata or {}, "embedding": vec, } async with self._adb as db: await db.signin({"username": self.user, "password": self.pw}) await db.use(self.ns, self.db) await db.create(self.table, rec) async def _search_async( self, query: str, limit: int, filter: Optional[Dict[str, Any]], score_threshold: float, ): await self._ensure_schema() vec = _embed(query, self._embed_api_key) filter_params, where_clause = {}, "" if filter: conds = [] for i, (k, v) in enumerate(filter.items()): pname = f"f{i}" conds.append(f"metadata.{k} == ${pname}") filter_params[pname] = v where_clause = " AND " + " AND ".join(conds) async with self._adb as db: await db.signin({"username": self.user, "password": self.pw}) await db.use(self.ns, self.db) rs = await db.query( f""" SELECT *, vector::distance::knn() AS score FROM {self.table} WHERE embedding <|{limit}|> $vec{where_clause} ORDER BY score ASC; """, {"vec": vec, **filter_params}, ) rows = rs[0]["result"] return [ { "id": r["id"], "metadata": r["metadata"], "context": r["text"], "score": r["score"], } for r in rows if (score_threshold == 0 or r["score"] <= score_threshold) ] async def _reset_async(self): async with self._adb as db: await db.signin({"username": self.user, "password": self.pw}) await db.use(self.ns, self.db) await db.query(f"REMOVE TABLE {self.table};") self._tables_ready.discard(self.table) # ─────────── table / index setup ─────────── async def _ensure_schema(self): if self.table in self._tables_ready: return with self._lock: if self.table in self._tables_ready: return self._sdb.signin({"username": self.user, "password": self.pw}) self._sdb.use(self.ns, self.db) self._sdb.query( f""" DEFINE TABLE {self.table} SCHEMALESS; DEFINE FIELD id ON {self.table} TYPE string; DEFINE FIELD text ON {self.table} TYPE string; DEFINE FIELD metadata ON {self.table} TYPE object; DEFINE FIELD embedding ON {self.table} TYPE array; DEFINE INDEX IF NOT EXISTS {self.table}_vec_idx ON {self.table} FIELDS embedding HNSW DIMENSION {_EMBED_DIM} DIST COSINE; """ ) self._tables_ready.add(self.table)

Wire it into a crew

Create src/mycrew/crew.py:

import asyncio, logging from typing import Optional from crewai import Agent, Task, Crew from crewai.memory.entity.entity_memory import EntityMemory from crewai.memory.short_term.short_term_memory import ShortTermMemory from mycrew.surreal_storage import SurrealStorage, _EMBED_DIM logging.basicConfig(level=logging.INFO) async def create_crew( openai_api_key: Optional[str] = None, surreal_url: str = "ws://localhost:8000/rpc", ) -> Crew: embed_cfg = {"api_key": openai_api_key} researcher = Agent( name="GeoResearcher", role="Geo-intelligence analyst", goal="collect up-to-date info on travel destinations", ) planner = Agent( name="TripPlanner", role="Personal itinerary planner", goal="craft a perfect 48-hour city break", ) entity_mem = EntityMemory( storage=SurrealStorage("entity", url=surreal_url, embedder_config=embed_cfg) ) short_mem = ShortTermMemory( storage=SurrealStorage("short-term", url=surreal_url, embedder_config=embed_cfg) ) t1 = Task( description="Find vibrant European cities with art festivals in June 2025.", expected_output="Top 3 candidate cities with festival names and dates.", agent=researcher, ) t2 = Task( description="Design a relaxed 2-day itinerary for the best candidate.", expected_output="Detailed schedule with cafes, galleries, evening events.", agent=planner, context=[t1], ) crew = Crew( memory=True, entity_memory=entity_mem, short_term_memory=short_mem, ) crew.add_agents(researcher, planner) crew.add_tasks(t1, t2) return crew async def main(): crew = await create_crew() result = await crew.run() print(result) if __name__ == "__main__": asyncio.run(main())

Run the crew:

python -m mycrew.crew

Customising further

PieceHow to tweak it
Embedding modelSwap _embed() for JinaAI, Instructor, or a local transformer; adjust _EMBED_DIM.
Similarity metricChange DIST COSINE to L2 or DOT to match your embeddings.
Metadata filtersExtend the simple filter mapping to support CrewAI’s richer operators ($gt, $in, …).
Remote / Cloud DBReplace ws://localhost:8000/rpc with wss://<YOUR-ENDPOINT>/rpc and supply a token/password.

Resources

Edit this page on GitHub