CrewAI
CrewAI lets you orchestrate role-playing AI agents that collaborate to complete complex tasks.
This guide shows how to use SurrealDB >v2.0.0 as the memory backend for CrewAI, giving agents:
- Entity memory for domain objects (products, people, places)
- Short-term memory for recent conversations
- Vector search to recall relevant information by semantic similarity
The sample below creates two agents that recommend music festival trips:
- Researcher – finds festivals and saves them to SurrealDB
- Planner – queries those saved festivals to build a weekend itinerary
Install
pip install "crewai[tools]" surrealdb openai
docker run --pull always -p 8000:8000 surrealdb/surrealdb:latest \
start --user root --pass secret file:/data/db
SurrealDB v2 ships native HNSW indexes, so you get ANN vector search without an extra service.
SurrealDB storage adapter
Create src/mycrew/surreal_storage.py:
from __future__ import annotations
import asyncio, hashlib, logging, os, threading
from typing import Any, Dict, List, Optional
import openai
from crewai.memory.storage.rag_storage import RAGStorage
from surrealdb import Surreal, AsyncSurreal
logger = logging.getLogger(__name__)
_EMBED_DIM = 1536
def _embed(text: str,
api_key: Optional[str] = None,
model: str = "text-embedding-3-small",
dimensions: int = _EMBED_DIM) -> List[float]:
"""Return an L2-normalised embedding vector."""
resp = openai.Embedding.create(
model=model,
input=[text],
api_key=api_key or os.getenv("OPENAI_API_KEY"),
dimensions=dimensions,
)
vec = resp["data"][0]["embedding"]
norm = sum(x * x for x in vec) ** 0.5 or 1.0
return [x / norm for x in vec]
class SurrealStorage(RAGStorage):
"""CrewAI RAGStorage backend backed by SurrealDB v2."""
_lock = threading.Lock()
_tables_ready: set[str] = set()
def __init__(
self,
typ: str,
*,
allow_reset: bool = True,
embedder_config: Optional[Dict[str, Any]] = None,
crew: Optional[Any] = None,
url: str = "ws://localhost:8000/rpc",
namespace: str = "crew",
database: str = "memories",
user: str = "root",
password: str = "secret",
):
super().__init__(typ, allow_reset, embedder_config, crew)
self.url, self.ns, self.db = url, namespace, database
self.user, self.pw = user, password
self.table = f"mem_{typ.replace('-', '_')}"
self._adb = AsyncSurreal(url)
self._sdb = Surreal(url)
self._embed_api_key = (embedder_config or {}).get("api_key")
def _run(self, coro):
"""Run *coro* in any context (sync script or async crew)."""
try:
loop = asyncio.get_running_loop()
if loop.is_running():
return asyncio.ensure_future(coro)
except RuntimeError:
pass
return asyncio.run(coro)
def save(self, value: Any, metadata: Dict[str, Any]):
self._run(self._save_async(value, metadata))
def search(
self,
query: str,
limit: int = 3,
filter: Optional[Dict[str, Any]] = None,
score_threshold: float = 0.0,
):
return self._run(
self._search_async(query, limit, filter, score_threshold)
)
def reset(self):
self._run(self._reset_async())
async def _save_async(self, value: Any, metadata: Dict[str, Any]):
await self._ensure_schema()
vec = _embed(str(value), self._embed_api_key)
rec = {
"id": hashlib.sha1(str(value).encode()).hexdigest(),
"text": value,
"metadata": metadata or {},
"embedding": vec,
}
async with self._adb as db:
await db.signin({"username": self.user, "password": self.pw})
await db.use(self.ns, self.db)
await db.create(self.table, rec)
async def _search_async(
self,
query: str,
limit: int,
filter: Optional[Dict[str, Any]],
score_threshold: float,
):
await self._ensure_schema()
vec = _embed(query, self._embed_api_key)
filter_params, where_clause = {}, ""
if filter:
conds = []
for i, (k, v) in enumerate(filter.items()):
pname = f"f{i}"
conds.append(f"metadata.{k} == ${pname}")
filter_params[pname] = v
where_clause = " AND " + " AND ".join(conds)
async with self._adb as db:
await db.signin({"username": self.user, "password": self.pw})
await db.use(self.ns, self.db)
rs = await db.query(
f"""
SELECT *,
vector::distance::knn() AS score
FROM {self.table}
WHERE embedding <|{limit}|> $vec{where_clause}
ORDER BY score ASC;
""",
{"vec": vec, **filter_params},
)
rows = rs[0]["result"]
return [
{
"id": r["id"],
"metadata": r["metadata"],
"context": r["text"],
"score": r["score"],
}
for r in rows
if (score_threshold == 0 or r["score"] <= score_threshold)
]
async def _reset_async(self):
async with self._adb as db:
await db.signin({"username": self.user, "password": self.pw})
await db.use(self.ns, self.db)
await db.query(f"REMOVE TABLE {self.table};")
self._tables_ready.discard(self.table)
async def _ensure_schema(self):
if self.table in self._tables_ready:
return
with self._lock:
if self.table in self._tables_ready:
return
self._sdb.signin({"username": self.user, "password": self.pw})
self._sdb.use(self.ns, self.db)
self._sdb.query(
f"""
DEFINE TABLE {self.table} SCHEMALESS;
DEFINE FIELD id ON {self.table} TYPE string;
DEFINE FIELD text ON {self.table} TYPE string;
DEFINE FIELD metadata ON {self.table} TYPE object;
DEFINE FIELD embedding ON {self.table} TYPE array;
DEFINE INDEX IF NOT EXISTS {self.table}_vec_idx
ON {self.table} FIELDS embedding
HNSW DIMENSION {_EMBED_DIM} DIST COSINE;
"""
)
self._tables_ready.add(self.table)
Wire it into a crew
Create src/mycrew/crew.py:
import asyncio, logging
from typing import Optional
from crewai import Agent, Task, Crew
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from mycrew.surreal_storage import SurrealStorage, _EMBED_DIM
logging.basicConfig(level=logging.INFO)
async def create_crew(
openai_api_key: Optional[str] = None,
surreal_url: str = "ws://localhost:8000/rpc",
) -> Crew:
embed_cfg = {"api_key": openai_api_key}
researcher = Agent(
name="GeoResearcher",
role="Geo-intelligence analyst",
goal="collect up-to-date info on travel destinations",
)
planner = Agent(
name="TripPlanner",
role="Personal itinerary planner",
goal="craft a perfect 48-hour city break",
)
entity_mem = EntityMemory(
storage=SurrealStorage("entity",
url=surreal_url,
embedder_config=embed_cfg)
)
short_mem = ShortTermMemory(
storage=SurrealStorage("short-term",
url=surreal_url,
embedder_config=embed_cfg)
)
t1 = Task(
description="Find vibrant European cities with art festivals in June 2025.",
expected_output="Top 3 candidate cities with festival names and dates.",
agent=researcher,
)
t2 = Task(
description="Design a relaxed 2-day itinerary for the best candidate.",
expected_output="Detailed schedule with cafes, galleries, evening events.",
agent=planner,
context=[t1],
)
crew = Crew(
memory=True,
entity_memory=entity_mem,
short_term_memory=short_mem,
)
crew.add_agents(researcher, planner)
crew.add_tasks(t1, t2)
return crew
async def main():
crew = await create_crew()
result = await crew.run()
print(result)
if __name__ == "__main__":
asyncio.run(main())
Run the crew:
python -m mycrew.crew
Customising further
| Piece | How to tweak it |
|---|
| Embedding model | Swap _embed() for JinaAI, Instructor, or a local transformer; adjust _EMBED_DIM. |
| Similarity metric | Change DIST COSINE to L2 or DOT to match your embeddings. |
| Metadata filters | Extend the simple filter mapping to support CrewAI’s richer operators ($gt, $in, …). |
| Remote / Cloud DB | Replace ws://localhost:8000/rpc with wss://<YOUR-ENDPOINT>/rpc and supply a token/password. |
Resources