SurrealDB Docs Logo

Enter a search query

Concurrency

While the Rust SDK for SurrealDB uses the tokio async runtime, the operation of the database itself will only take place concurrently if the code itself uses concurrency. The following example shows how to do this and a comparison of the performance between synchronous and asynchronous usage.

Getting started

Start a running database using the following command:

surreal start --user root --pass root

To follow along interactively, connect using Surrealist or the following command to open up the CLI:

surrealdb % surreal sql --user root --pass root --ns namespace --db database --pretty

Then use the cargo add command to add the surrealdb and tokio crates. The dependencies inside Cargo.toml should look something like this:

[dependencies] surrealdb = "2.0.4" tokio = "1.41.0"

The code

The SurrealDB client by nature has no maximum capacity for the number of channels that can be made to perform queries. To change this setting, the .with_capacity() method can be used. As tokio’s documentation notes, this method is useful if your client is running so many queries concurrently that the machine used to execute them is running out of memory.

This is used to set bounds of the channels used internally as well set the capacity of the HashMap used for routing responses in case of the WebSocket client. Setting this capacity to 0 (the default) means that unbounded channels will be used. If your queries per second are so high that the client is running out of memory, it might be helpful to set this to a number that works best for you.

To experiment with this, we will create three clients: one with no maximum capacity, a second with a maximum capacity of 1, and a third with a maximum capacity of 1000. To avoid any code duplication, we’ll use an enum to set these clients up.

enum DbType { Standard, With1, With1000, } impl DbType { async fn generate(self) -> Result<Surreal<Client>, Error> { let db = match self { DbType::Standard => Surreal::new::<Ws>("localhost:8000").await, DbType::With1 => Surreal::new::<Ws>("localhost:8000").with_capacity(1).await, DbType::With1000 => { Surreal::new::<Ws>("localhost:8000") .with_capacity(1000) .await } }?; db.use_ns("namespace").use_db("database").await?; db.signin(Root { username: "root", password: "root", }) .await?; Ok(db) } }

Each of these clients will be put into a for loop 50000 times, in which they will simply return the index number of the current iteration of the loop. Note that the .take() method returns a query response as anything that can be deserialized, including primitive types like usize. It will also take a test_num field so that we can see which test is running as the index numbers fly by on the terminal.

async fn select_index(db: &Surreal<Client>, idx: usize, test_num: &'static str) { let mut result = db .query("SELECT * FROM $idx") .bind(("idx", idx)) .await .unwrap(); let db_idx: Option<usize> = result.take(0).unwrap(); if let Some(db_idx) = db_idx { println!("{test_num} - {idx}: {db_idx}"); } }

The first test inside main will be done synchronously, returning the Duration taken by the test once it is done.

let start = std::time::Instant::now(); for idx in 0..=MAX { select_index(&db, idx).await; } let res_1 = format!("Regular DB: {:?}", Instant::now() - start);

The other three tests will all be done asynchronously, so we’ll put them into their own function. Each one will spawn a tokio task which will execute and return the result whenever it is finished. Each task will be put into a Vec of JoinHandles that will be awaited on at the end to ensure that we don’t exit the function before they have finished.

async fn async_test(db: Arc<Surreal<Client>>, test_num: &'static str) -> std::time::Duration { let start = std::time::Instant::now(); let mut handles = vec![]; for idx in 0..=MAX { let db = db.clone(); handles.push(tokio::spawn(async move { select_index(&db, idx, test_num).await; })); } for h in handles { h.await.unwrap(); } Instant::now() - start }

We will then make a String variable from the output of each test, and print them out at the end when everything has run.

The entire code is as follows:

use std::sync::Arc; use std::time::Instant; use surrealdb::engine::remote::ws::{Client, Ws}; use surrealdb::opt::auth::Root; use surrealdb::{Error, Surreal}; const MAX: usize = 50_000; enum DbType { Standard, With1, With1000, } impl DbType { async fn generate(self) -> Result<Surreal<Client>, Error> { let db = match self { DbType::Standard => Surreal::new::<Ws>("localhost:8000").await, DbType::With1 => Surreal::new::<Ws>("localhost:8000").with_capacity(1).await, DbType::With1000 => { Surreal::new::<Ws>("localhost:8000") .with_capacity(1000) .await } }?; db.use_ns("namespace").use_db("database").await?; db.signin(Root { username: "root", password: "root", }) .await?; Ok(db) } } async fn select_index(db: &Surreal<Client>, idx: usize, test_num: &'static str) { let mut result = db .query("SELECT * FROM $idx") .bind(("idx", idx)) .await .unwrap(); let db_idx: Option<usize> = result.take(0).unwrap(); if let Some(db_idx) = db_idx { println!("{test_num} - {idx}: {db_idx}"); } } async fn async_test(db: Arc<Surreal<Client>>, test_num: &'static str) -> std::time::Duration { let start = std::time::Instant::now(); let mut handles = vec![]; for idx in 0..=MAX { let db = db.clone(); handles.push(tokio::spawn(async move { select_index(&db, idx, test_num).await; })); } for h in handles { h.await.unwrap(); } Instant::now() - start } #[tokio::main] async fn main() -> Result<(), Error> { let db_standard = Arc::new(DbType::Standard.generate().await?); let db_with_1 = Arc::new(DbType::With1.generate().await?); let db_with_1000 = Arc::new(DbType::With1000.generate().await?); let start = std::time::Instant::now(); for idx in 0..=MAX { select_index(&db_standard, idx, "Test1").await; } let res_1 = format!("Regular DB: {:?}", Instant::now() - start); let res_2 = format!("Async with capacity 1: {:?}", async_test(db_with_1, "Test2").await); let res_3 = format!( "Async with capacity 1000: {:?}", async_test(db_with_1000, "Test3").await ); let res_4 = format!( "Async with unbounded capacity: {:?}", async_test(db_standard, "Test4").await ); println!("{res_1}\n{res_2}\n{res_3}\n{res_4}"); Ok(()) }

Running the code, you should see the following:

  1. The first test runs one index and one query at a time, taking by far the longest time.
  2. The second test will run much faster. Despite only having a capacity of 1, it still runs concurrently and does not need to wait for the output of the previous query to send in its own.
  3. The third test with a capacity of 1000 will run even faster,
  4. The last test using the first unbounded database should run fastest of all. If running the test on a particularly slow computer, however, you may see a slowdown compared to the other two async tests if the computer’s memory capacity is reached during the test.

A sample of the output at the end:

Regular DB: 5.244320833s Async with capacity 1: 1.523926416s Async with capacity 1000: 1.441300833s Async with unbounded capacity: 1.183820584s

Using a channel instead of a JoinHandle

Besides the classic method of using a JoinHandle for each thread or task to wait until all have completed their operation, a channel can also be used. A channel can be used in the following way:

  • Create a channel with a buffer of 1 (the minimum size), as the channel will not be used to actually send data,
  • Clone the sender and send it into each iteration of the for loop,
  • Drop the cloned sender once the database query is done,
  • Drop the original sender after the for loop,
  • Call .recv() on the receiver at the very end.

As tokio’s documentation shows, the .recv() method will cause the receiver to sleep as it waits for each task to complete, and at the end it will close once the last sender has been dropped.

This method returns None if the channel has been closed and there are no remaining messages in the channel’s buffer. This indicates that no further values can ever be received from this Receiver. The channel is closed when all senders have been dropped, or when close is called. If there are no messages in the channel’s buffer, but the channel has not yet been closed, this method will sleep until a message is sent or the channel is closed.

Here is what the async_test() function looks like using this method.

async fn async_test(db: Arc<Surreal<Client>>, test_num: &'static str) -> std::time::Duration { let (tx, mut rx) = mpsc::channel::<()>(1); let start = std::time::Instant::now(); for idx in 0..=MAX { let sender = tx.clone(); let db = db.clone(); tokio::spawn(async move { select_index(&db, idx, test_num).await; drop(sender); }); } drop(tx); rx.recv().await; Instant::now() - start }

On this page

© SurrealDB GitHub Discord Community Cloud Features Releases Install