Integrate Kreuzberg document intelligence extraction pipelines with SurrealDB.
Kreuzberg is a polyglot document intelligence framework that allows you to extract text, metadata, images, and structured information from PDFs, Office documents, images, and 91+ formats.
The kreuzberg-surrealdb package connects Kreuzberg's document extraction pipeline to SurrealDB. It handles schema creation, content deduplication, optional chunking and embedding, and index configuration.
How it works
Extract — Kreuzberg parses the source documents and runs OCR where needed.
Connect — The connector receives the extracted output and manages the SurrealDB connection.
Store — Each document is hashed (SHA-256) for deduplication, optionally chunked and embedded, then written to SurrealDB under an auto-generated schema.
Search — Full-text (BM25), vector (HNSW), and hybrid (RRF) search are available immediately after ingestion.
Key capabilities
Schema management — setup_schema() creates tables, indexes, and analyzers. No manual DDL required.
Deduplication — Deterministic record IDs derived from content hashes prevent duplicate rows across ingestion runs.
Flexible ingestion — Single files, file lists, directories (with glob), or raw bytes.
Extraction control — Pass Kreuzberg's ExtractionConfig to set OCR behavior, output format, and quality processing.
Batch tuning — Adjust insert_batch_size to balance throughput against memory usage.
As Kreuzberg is written in Rust and has its own crate, code can be written directly with much more boilerplate but also quite a bit of manual customisation. The following example shows a setup somewhat similar to the Python extension. It demonstrates a number of file types used to populate the same SurrealDB instance, in this case a JSON file demo.json, a markdownfile demo.md, and even an HWPX file demo.hwpx file that are assumed to be in a folder /assets. Most LLMs are able to generate sample files of these types such as the content below in JSON.
Sample data
{ "documents":[ { "title":"English Quarterly Revenue", "language":"en", "content":"Quarterly revenue increased significantly this quarter." }, { "title":"English Product", "language":"en", "content":"Product launch includes search and analytics." }, { "title":"한국어 문서", "language":"ko", "content":"이 문서는 분기 매출과 검색 기능을 설명합니다." }, { "title":"日本語 文書", "language":"ja", "content":"この文書は四半期売上と検索について説明します。" }, { "title":"Mixed Doc", "language":"mixed", "content":"Quarterly revenue 데이터 日本語 検索 mixed language example." } ] }
The code can be run with either cargo run -- memory or cargo run -- local, depending on if you prefer a one-time embedded instance or a local instance that can be connected to via Surrealist to manually query the data after the Rust code has run.
For more ideas on how to redo this example to suit your own needs, see the API reference section of the kreuzberg-surrealdb repo.
// Required features to run: // kreuzberg = { version = "4.9.4", default-features = true, features = ["archives", "language-detection", "xml"] } // surrealdb = { version = "3.1.0-beta.1", default-features = true, features = ["protocol-ws", "kv-mem"] } useanyhow::{Context,Result}; usekreuzberg::{extract_file,ExtractionConfig,LanguageDetectionConfig}; usesha2::{Digest,Sha256}; usestd::{ fs::read_to_string, path::{Path,PathBuf}, }; usesurrealdb::{ engine::any::connect, opt::auth::Root, types::{ToSql,Value}, };
/// HWPX is a ZIP package; Kreuzberg maps `.hwpx`to `application/haansofthwpx`, which is only /// handled when the `hwp` feature is enabled (legacy CFB `.hwp` path is wrong for HWPX, which is essentially a .zip of .xml files). /// Force ZIP so `archives` + `xml` handle the package. constHWPX_ZIP_MIME: &str="application/zip";
pubfnconnector_schema(table: &str)->String{ format!( "DEFINE ANALYZER IF NOT EXISTS doc_analyzer TOKENIZERS class FILTERS lowercase,snowball(english); DEFINE TABLE IF NOT EXISTS {table} SCHEMAFULL; DEFINE FIELD IF NOT EXISTS source ON TABLE {table} TYPE string; DEFINE FIELD IF NOT EXISTS content ON TABLE {table} TYPE string; DEFINE FIELD IF NOT EXISTS mime_type ON TABLE {table} TYPE string; DEFINE FIELD IF NOT EXISTS title ON TABLE {table} TYPE option<string>; DEFINE FIELD IF NOT EXISTS authors ON TABLE {table} TYPE option<array<string>>; DEFINE FIELD IF NOT EXISTS created_at ON TABLE {table} TYPE option<string>; DEFINE FIELD IF NOT EXISTS ingested_at ON TABLE {table} TYPE datetime DEFAULT time::now(); DEFINE FIELD IF NOT EXISTS metadata ON TABLE {table} TYPE object FLEXIBLE; DEFINE FIELD IF NOT EXISTS quality_score ON TABLE {table} TYPE option<float>; DEFINE FIELD IF NOT EXISTS content_hash ON TABLE {table} TYPE string; DEFINE FIELD IF NOT EXISTS detected_languages ON TABLE {table} TYPE option<array<string>>; DEFINE FIELD IF NOT EXISTS keywords ON TABLE {table} TYPE option<array<string>>; DEFINE INDEX IF NOT EXISTS idx_doc_source ON TABLE {table} FIELDS source UNIQUE; DEFINE INDEX IF NOT EXISTS idx_doc_hash ON TABLE {table} FIELDS content_hash UNIQUE; DEFINE INDEX IF NOT EXISTS idx_doc_content ON TABLE {table} FIELDS content FULLTEXT ANALYZER doc_analyzer BM25(1.2,0.75) HIGHLIGHTS;" ) }
// Add files such as demo.md, demo.hwpx, demo.json to /assets folder fndefault_asset_paths()->Vec<PathBuf> { letassets=PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("assets"); ["demo.md","demo.hwpx","demo.json"] .into_iter() .map(|name| assets.join(name)) .collect() }
letrecords: Value=client .query( "SELECT source, content, mime_type, title, metadata.language AS language, search::score(0) AS score FROM documents WHERE content @0@ 'product' ORDER BY score DESC LIMIT 10;", ) .await? .check()? .take(0)?;