Dagster
Dagster is a powerful tool for building data pipelines and workflows. It is a popular choice for data engineers and data scientists.
Install
Install
pip install dagster surrealdb openai
docker run -p 8000:8000 surrealdb/surrealdb:latest \
start --user root --pass secret file:/data/db
A Dagster “SurrealResource”
src/dagster_surreal.py
import dagster as dg
from surrealdb import Surreal
from typing import List, Sequence, Optional
import hashlib, json, os, contextlib
_EMBED_DIM = 1536
def embed(text: str) -> List[float]:
"""Tiny helper – replace with your preferred model."""
import openai
resp = openai.Embedding.create(
model="text-embedding-3-small",
input=[text],
dimensions=_EMBED_DIM,
api_key=os.getenv("OPENAI_API_KEY"),
)
return resp["data"][0]["embedding"]
class SurrealConfig(dg.Config):
url: str = dg.Field(str, default_value="ws://localhost:8000/rpc")
namespace: str = dg.Field(str, default_value="dagster")
database: str = dg.Field(str, default_value="vector")
user: str = dg.Field(str, default_value="root")
password: str = dg.Field(str, default_value="secret")
@dg.resource(config_schema=SurrealConfig)
class SurrealResource:
"""
A very small wrapper that exposes .add() and .query() like dagster-qdrant.
"""
def __init__(self, context):
cfg: SurrealConfig = context.resource_config
self.url = cfg.url
self.namespace = cfg.namespace
self.database = cfg.database
self.user = cfg.user
self.password = cfg.password
def _ensure_table(self, table: str):
with Surreal(self.url) as db:
db.signin({"username": self.user, "password": self.password})
db.use(self.namespace, self.database)
db.query(
"""
DEFINE TABLE $table SCHEMALESS;
DEFINE FIELD id ON $table TYPE string;
DEFINE FIELD text ON $table TYPE string;
DEFINE FIELD embedding ON $table TYPE array;
DEFINE INDEX IF NOT EXISTS ${table}_vec
ON $table FIELDS embedding
HNSW DIMENSION $dim DIST COSINE;
""",
{
"table": table,
"dim": _EMBED_DIM
}
)
def add(self, collection_name: str, documents: Sequence[str]):
self._ensure_table(collection_name)
with Surreal(self.url) as db:
db.signin({"username": self.user, "password": self.password})
db.use(self.namespace, self.database)
for doc in documents:
rec = {
"id": hashlib.sha1(doc.encode()).hexdigest(),
"text": doc,
"embedding": embed(doc),
}
db.create(collection_name, rec)
def query(
self,
collection_name: str,
query_text: str,
limit: int = 3,
score_threshold: float = 0.4,
):
self._ensure_table(collection_name)
vec = embed(query_text)
with Surreal(self.url) as db:
db.signin({"username": self.user, "password": self.password})
db.use(self.namespace, self.database)
result = db.query(
"""
SELECT text,
vector::distance::cosine(embedding, $vec) AS score
FROM $table
WHERE embedding <|$limit|> $vec
ORDER BY score ASC
""",
{
"table": collection_name,
"vec": vec,
"limit": limit
}
)
rows = result[0]["result"]
return [r for r in rows if r['score'] <= score_threshold or score_threshold == 0]
@contextlib.contextmanager
def get_client(self):
try:
yield self
finally:
pass