While the Rust SDK for SurrealDB uses the tokio async runtime, the operation of the database itself will only take place concurrently if the code itself uses concurrency. The following example shows how to do this and a comparison of the performance between synchronous and asynchronous usage.
Start a running database using the following command:
surreal start --user root --pass root
To follow along interactively, connect using Surrealist or the following command to open up the CLI:
surrealdb % surreal sql --user root --pass root --ns namespace --db database --pretty
Then use the cargo add
command to add the surrealdb
and tokio
crates. The dependencies inside Cargo.toml
should look something like this:
[dependencies] surrealdb = "2.0.4" tokio = "1.41.0"
The SurrealDB client by nature has no maximum capacity for the number of channels that can be made to perform queries. To change this setting, the .with_capacity()
method can be used. As tokio’s documentation notes, this method is useful if your client is running so many queries concurrently that the machine used to execute them is running out of memory.
This is used to set bounds of the channels used internally as well set the capacity of the HashMap used for routing responses in case of the WebSocket client. Setting this capacity to 0 (the default) means that unbounded channels will be used. If your queries per second are so high that the client is running out of memory, it might be helpful to set this to a number that works best for you.
To experiment with this, we will create three clients: one with no maximum capacity, a second with a maximum capacity of 1, and a third with a maximum capacity of 1000. To avoid any code duplication, we’ll use an enum to set these clients up.
enum DbType { Standard, With1, With1000, } impl DbType { async fn generate(self) -> Result<Surreal<Client>, Error> { let db = match self { DbType::Standard => Surreal::new::<Ws>("localhost:8000").await, DbType::With1 => Surreal::new::<Ws>("localhost:8000").with_capacity(1).await, DbType::With1000 => { Surreal::new::<Ws>("localhost:8000") .with_capacity(1000) .await } }?; db.use_ns("namespace").use_db("database").await?; db.signin(Root { username: "root", password: "root", }) .await?; Ok(db) } }
Each of these clients will be put into a for
loop 50000 times, in which they will simply return the index number of the current iteration of the loop. Note that the .take()
method returns a query response as anything that can be deserialized, including primitive types like usize
. It will also take a test_num
field so that we can see which test is running as the index numbers fly by on the terminal.
async fn select_index(db: &Surreal<Client>, idx: usize, test_num: &'static str) { let mut result = db .query("SELECT * FROM $idx") .bind(("idx", idx)) .await .unwrap(); let db_idx: Option<usize> = result.take(0).unwrap(); if let Some(db_idx) = db_idx { println!("{test_num} - {idx}: {db_idx}"); } }
The first test inside main will be done synchronously, returning the Duration
taken by the test once it is done.
let start = std::time::Instant::now(); for idx in 0..=MAX { select_index(&db, idx).await; } let res_1 = format!("Regular DB: {:?}", Instant::now() - start);
The other three tests will all be done asynchronously, so we’ll put them into their own function. Each one will spawn a tokio task which will execute and return the result whenever it is finished. Each task will be put into a Vec
of JoinHandle
s that will be awaited on at the end to ensure that we don’t exit the function before they have finished.
async fn async_test(db: Arc<Surreal<Client>>, test_num: &'static str) -> std::time::Duration { let start = std::time::Instant::now(); let mut handles = vec![]; for idx in 0..=MAX { let db = db.clone(); handles.push(tokio::spawn(async move { select_index(&db, idx, test_num).await; })); } for h in handles { h.await.unwrap(); } Instant::now() - start }
We will then make a String
variable from the output of each test, and print them out at the end when everything has run.
The entire code is as follows:
use std::sync::Arc; use std::time::Instant; use surrealdb::engine::remote::ws::{Client, Ws}; use surrealdb::opt::auth::Root; use surrealdb::{Error, Surreal}; const MAX: usize = 50_000; enum DbType { Standard, With1, With1000, } impl DbType { async fn generate(self) -> Result<Surreal<Client>, Error> { let db = match self { DbType::Standard => Surreal::new::<Ws>("localhost:8000").await, DbType::With1 => Surreal::new::<Ws>("localhost:8000").with_capacity(1).await, DbType::With1000 => { Surreal::new::<Ws>("localhost:8000") .with_capacity(1000) .await } }?; db.use_ns("namespace").use_db("database").await?; db.signin(Root { username: "root", password: "root", }) .await?; Ok(db) } } async fn select_index(db: &Surreal<Client>, idx: usize, test_num: &'static str) { let mut result = db .query("SELECT * FROM $idx") .bind(("idx", idx)) .await .unwrap(); let db_idx: Option<usize> = result.take(0).unwrap(); if let Some(db_idx) = db_idx { println!("{test_num} - {idx}: {db_idx}"); } } async fn async_test(db: Arc<Surreal<Client>>, test_num: &'static str) -> std::time::Duration { let start = std::time::Instant::now(); let mut handles = vec![]; for idx in 0..=MAX { let db = db.clone(); handles.push(tokio::spawn(async move { select_index(&db, idx, test_num).await; })); } for h in handles { h.await.unwrap(); } Instant::now() - start } async fn main() -> Result<(), Error> { let db_standard = Arc::new(DbType::Standard.generate().await?); let db_with_1 = Arc::new(DbType::With1.generate().await?); let db_with_1000 = Arc::new(DbType::With1000.generate().await?); let start = std::time::Instant::now(); for idx in 0..=MAX { select_index(&db_standard, idx, "Test1").await; } let res_1 = format!("Regular DB: {:?}", Instant::now() - start); let res_2 = format!("Async with capacity 1: {:?}", async_test(db_with_1, "Test2").await); let res_3 = format!( "Async with capacity 1000: {:?}", async_test(db_with_1000, "Test3").await ); let res_4 = format!( "Async with unbounded capacity: {:?}", async_test(db_standard, "Test4").await ); println!("{res_1}\n{res_2}\n{res_3}\n{res_4}"); Ok(()) }
Running the code, you should see the following:
A sample of the output at the end:
Regular DB: 5.244320833s Async with capacity 1: 1.523926416s Async with capacity 1000: 1.441300833s Async with unbounded capacity: 1.183820584s
Besides the classic method of using a JoinHandle
for each thread or task to wait until all have completed their operation, a channel can also be used. A channel can be used in the following way:
for
loop,for
loop,.recv()
on the receiver at the very end.As tokio’s documentation shows, the .recv()
method will cause the receiver to sleep as it waits for each task to complete, and at the end it will close once the last sender has been dropped.
This method returns None if the channel has been closed and there are no remaining messages in the channel’s buffer. This indicates that no further values can ever be received from this Receiver. The channel is closed when all senders have been dropped, or when close is called. If there are no messages in the channel’s buffer, but the channel has not yet been closed, this method will sleep until a message is sent or the channel is closed.
Here is what the async_test()
function looks like using this method.
async fn async_test(db: Arc<Surreal<Client>>, test_num: &'static str) -> std::time::Duration { let (tx, mut rx) = mpsc::channel::<()>(1); let start = std::time::Instant::now(); for idx in 0..=MAX { let sender = tx.clone(); let db = db.clone(); tokio::spawn(async move { select_index(&db, idx, test_num).await; drop(sender); }); } drop(tx); rx.recv().await; Instant::now() - start }