SurrealDB
SurrealDB Docs Logo

Enter a search query

Dagster

Dagster is a powerful tool for building data pipelines and workflows. It is a popular choice for data engineers and data scientists.

Install

Install
pip install dagster surrealdb openai # or swap out OpenAI for any embedder # optional – launch a local SurrealDB daemon docker run -p 8000:8000 surrealdb/surrealdb:latest \ start --user root --pass root file:/data/db

A Dagster “SurrealResource”

src/dagster_surreal.py
import dagster as dg from surrealdb import Surreal from typing import List, Sequence, Optional import hashlib, json, os, contextlib _EMBED_DIM = 1536 # match your embedder def embed(text: str) -> List[float]: """Tiny helper – replace with your preferred model.""" import openai resp = openai.Embedding.create( model="text-embedding-3-small", input=[text], dimensions=_EMBED_DIM, api_key=os.getenv("OPENAI_API_KEY"), ) return resp["data"][0]["embedding"] class SurrealConfig(dg.Config): url: str = dg.Field(str, default_value="ws://localhost:8000/rpc") namespace: str = dg.Field(str, default_value="dagster") database: str = dg.Field(str, default_value="vector") user: str = dg.Field(str, default_value="root") password: str = dg.Field(str, default_value="root") @dg.resource(config_schema=SurrealConfig) class SurrealResource: """ A very small wrapper that exposes .add() and .query() like dagster-qdrant. """ def __init__(self, context): cfg: SurrealConfig = context.resource_config self.url = cfg.url self.namespace = cfg.namespace self.database = cfg.database self.user = cfg.user self.password = cfg.password # — helpers ---------------------------------------------------------- def _ensure_table(self, table: str): with Surreal(self.url) as db: db.signin({"username": self.user, "password": self.password}) db.use(self.namespace, self.database) db.query( """ DEFINE TABLE $table SCHEMALESS; DEFINE FIELD id ON $table TYPE string; DEFINE FIELD text ON $table TYPE string; DEFINE FIELD embedding ON $table TYPE array; DEFINE INDEX IF NOT EXISTS ${table}_vec ON $table FIELDS embedding HNSW DIMENSION $dim DIST COSINE; """, { "table": table, "dim": _EMBED_DIM } ) # — public API ------------------------------------------------------- def add(self, collection_name: str, documents: Sequence[str]): self._ensure_table(collection_name) with Surreal(self.url) as db: db.signin({"username": self.user, "password": self.password}) db.use(self.namespace, self.database) for doc in documents: rec = { "id": hashlib.sha1(doc.encode()).hexdigest(), "text": doc, "embedding": embed(doc), } db.create(collection_name, rec) def query( self, collection_name: str, query_text: str, limit: int = 3, score_threshold: float = 0.4, ): self._ensure_table(collection_name) vec = embed(query_text) with Surreal(self.url) as db: db.signin({"username": self.user, "password": self.password}) db.use(self.namespace, self.database) result = db.query( """ SELECT text, vector::distance::cosine(embedding, $vec) AS score FROM $table WHERE embedding <|$limit|> $vec ORDER BY score ASC """, { "table": collection_name, "vec": vec, "limit": limit } ) rows = result[0]["result"] return [r for r in rows if r['score'] <= score_threshold or score_threshold == 0] # Use the resource as a context-manager inside assets @contextlib.contextmanager def get_client(self): try: yield self finally: pass # No need to close since we use context managers for each operation
Edit this page on GitHub