CrewAI is a framework for orchestrating role-playing AI agents. It lets you define a “crew” of specialized agents that work together to accomplish complex tasks - like a researcher gathering data that a planner then uses to make recommendations.
This integration shows how to use SurrealDB as the memory layer for CrewAI agents, giving them:
The example below creates two agents that collaborate to recommend music festivals:
# CrewAI, SurrealDB Python SDK, and an embedder (OpenAI here, swap if you like) pip install "crewai[tools]" surrealdb openai # (optional) spin up SurrealDB locally – single binary, no deps docker run --pull always -p 8000:8000 surrealdb/surrealdb:latest \ start --user root --pass root file:/data/db
SurrealDB V2 has native vector search with HNSW / M-Tree indexes, so you get an ANN-ready database without an extra vector service.
src/mycrew/surreal_storage.pyfrom typing import Any, Dict, List, Optional from crewai.memory.storage.rag_storage import RAGStorage from surrealdb import AsyncSurreal import json, os, hashlib import openai # or any local embedding model _EMBED_DIM = 1536 # OpenAI text-embedding-3-small def embed(text: str) -> List[float]: "Simple embedding helper. Replace with your own provider if needed." resp = openai.Embedding.create( model="text-embedding-3-small", input=[text], dimensions=_EMBED_DIM, api_key=os.getenv("OPENAI_API_KEY"), ) return resp["data"][0]["embedding"] class SurrealStorage(RAGStorage): """ CrewAI → SurrealDB adapter that implements the small RAGStorage API: save(), search(), reset(). One Surreal table per memory type ("short-term", "entity", …). """ def __init__( self, typ: str, allow_reset: bool = True, embedder_config: Optional[Dict[str, Any]] = None, crew: Optional[Any] = None, url: str = "ws://localhost:8000/rpc", namespace: str = "crew", database: str = "memories", user: str = "root", password: str = "root", ): super().__init__(typ, allow_reset, embedder_config, crew) self.url = url self.namespace = namespace self.database = database self.user = user self.password = password self.table = f"mem_{typ.replace('-', '_')}" # ---------- RAGStorage API ------------------------------------------ async def save(self, value: Any, metadata: Dict[str, Any]) -> None: await self._ensure_table_and_index() async with AsyncSurreal(self.url) as db: await db.signin({"username": self.user, "password": self.password}) await db.use(self.namespace, self.database) rec = { "id": hashlib.sha1(str(value).encode()).hexdigest(), # stable PK "text": value, "metadata": metadata or {}, "embedding": embed(str(value)), } await db.create(self.table, rec) # UPSERT semantics async def search( self, query: str, limit: int = 3, filter: Optional[dict] = None, score_threshold: float = 0, ) -> List[Dict[str, Any]]: await self._ensure_table_and_index() vec = embed(query) # Build filter conditions filter_params = {} where_clause = "" if filter: conditions = [] for i, (k, v) in enumerate(filter.items()): param_name = f"filter_{i}" conditions.append(f"metadata.{k} == ${param_name}") filter_params[param_name] = v where_clause = " AND " + " AND ".join(conditions) async with AsyncSurreal(self.url) as db: await db.signin({"username": self.user, "password": self.password}) await db.use(self.namespace, self.database) result = await db.query( f""" SELECT *, vector::distance::cosine(embedding, $vec) AS score FROM {self.table} WHERE embedding <|$limit|> $vec{where_clause} ORDER BY score ASC """, { "vec": vec, "limit": limit, **filter_params } ) rows = result[0]["result"] return [ { "id": r["id"], "metadata": r["metadata"], "context": r["text"], "score": r["score"], } for r in rows if r["score"] <= score_threshold or score_threshold == 0 ] async def reset(self) -> None: async with AsyncSurreal(self.url) as db: await db.signin({"username": self.user, "password": self.password}) await db.use(self.namespace, self.database) await db.query( "REMOVE TABLE $table;", {"table": self.table} ) await self._ensure_table_and_index() # ---------- internals ----------------------------------------------- async def _ensure_table_and_index(self): async with AsyncSurreal(self.url) as db: await db.signin({"username": self.user, "password": self.password}) await db.use(self.namespace, self.database) await db.query( """ DEFINE TABLE $table SCHEMALESS; DEFINE FIELD id ON $table TYPE string; DEFINE FIELD text ON $table TYPE string; DEFINE FIELD metadata ON $table TYPE object; DEFINE FIELD embedding ON $table TYPE array; DEFINE INDEX ${table}_vec_idx ON $table FIELDS embedding HNSW DIMENSION $dim DIST COSINE IF NOT EXISTS; """, { "table": self.table, "dim": _EMBED_DIM } )
Highlights
DEFINE INDEX … HNSW
command.([SurrealDB][4])save / search / reset
surface so the rest of CrewAI is untouched.Below is a tiny “Travel Concierge” crew: two agents plan a weekend city break. Their short-term memory persists the conversation; the entity memory stores facts about destinations they discover.
src/mycrew/crew.pyimport asyncio from crewai import Agent, Task, Crew from crewai.memory.entity.entity_memory import EntityMemory from crewai.memory.short_term.short_term_memory import ShortTermMemory from mycrew.surreal_storage import SurrealStorage # ← our adapter # --- Agents ------------------------------------------------------------ researcher = Agent( name="GeoResearcher", role="Geo-intelligence analyst", goal="collect up-to-date info on travel destinations", ) planner = Agent( name="TripPlanner", role="Personal itinerary planner", goal="craft a perfect 48-hour city break", ) # --- Memory back-end --------------------------------------------------- entity_mem = EntityMemory(storage=SurrealStorage("entity")) short_mem = ShortTermMemory(storage=SurrealStorage("short-term")) # --- Tasks ------------------------------------------------------------- t1 = Task( description="Find vibrant European cities with art festivals in June 2025.", expected_output="Top 3 candidate cities with festival names and dates.", agent=researcher, ) t2 = Task( description="Design a relaxed 2-day itinerary for the best candidate.", expected_output="Detailed schedule with cafes, galleries, evening events.", agent=planner, context=[t1], # planner sees researcher output ) # --- Crew -------------------------------------------------------------- crew = Crew( memory=True, entity_memory=entity_mem, short_term_memory=short_mem, ) crew.add_agents(researcher, planner) crew.add_tasks(t1, t2) async def main(): result = await crew.run() print(result) if __name__ == "__main__": asyncio.run(main())
Run with:
python -m mycrew.crew
What happens behind the scenes:
Researcher ➜ SurrealStorage.save()
writes the three festival candidates into entity memory (table mem_entity
).mem_short_term
) so the planner can reference them mid-run.SurrealStorage.search()
lets either agent recall past entity details (“When did we last look at Barcelona’s Sónar?”) using a vector (k-NN) query plus metadata filters – all inside SurrealDB.Piece | How to tailor it |
---|---|
Embedding model | Swap the embed() helper for JinaAI/Instructor/any local model; adjust _EMBED_DIM . |
Similarity metric | Change the DIST COSINE clause to L2 or DOT to match your embeddings. |
Metadata filters | Expand the simple filter translation to support CrewAI’s richer operators ($gt , $in , etc.). |
Remote / cloud DB | Replace ws://localhost:8000/rpc with wss://<SURREAL-CLOUD-ENDPOINT>/rpc and supply a token/password. |
With this adapter you can plug SurrealDB straight into CrewAI, enjoy a single storage layer for structured data and vector search, and keep your agent code exactly the same. Happy hacking!