Arroyo’s UDFs can be asynchronous. This is useful for UDFs that need to make network requests or perform other asynchronous operations, and therefore should not block processing. Since asynchronous UDFs are executed in parallel, we can maintain high throughput even with longer-running UDFs.

Note that async UDFs, like all Rust async functions, should not block as this will prevent execution of other tasks and can stall execution of the pipeline. Instead, they should use asynchronous libraries (calling .await instead of blocking) or make use of spawn_blocking.

To define an asynchronous UDF, use the async keyword in the function definition.

Options

Async UDFs have a few configuration options that can be defined in a special comment at the top of the UDF, similar to dependencies, under a [udfs] header.

OptionDefaultDescription
async_results_orderedtrueWhether to return results in the order they were requested (ordered) or as they are available (unordered).
async_max_concurrency100The maximum number of concurrent executions of this UDF.
async_timeout_seconds10The maximum number of seconds to wait for a result before timing out. Arroyo currently retries if async_results_ordered is false, and panics if it is true.

Context

Async UDFs can optionally define a Context struct that will be passed to each execution of the UDF. This can be used to store resources that should be shared across executions of the UDF.

The Context struct must implement a pre-defined trait, types::UdfContext. The trait is defined as follows:

#[async_trait]
pub trait UdfContext: Sync {
    async fn init(&self) {}
    async fn close(&self) {}
}

In addition to the above trait, the Context struct must also define a new function (with no arguments). The new and init functions will be called once when the job starts, and the close function will be called once when the job ends.

If a Context is defined, a context variable of type Arc<Context> will be passed as the first argument to the UDF.

Example

This example defines an async UDF that’s used to fetch a user’s name from a database.

/*
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-postgres = "0.7"
async-trait = "0.1.68"

[udfs]
async_results_ordered = true
async_max_concurrency = 10
async_timeout_seconds = 5
*/

use tokio_postgres::{NoTls, Error, Client};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::RwLock;

pub struct Context {
    client: RwLock<Option<Client>>,
}

impl Context {
    pub fn new() -> Self {
        Self {
            client: RwLock::new(None),
        }
    }
}

#[async_trait]
impl types::UdfContext for Context {
    async fn init(&self) {
        let conn_str = "host=localhost user=arroyo password=arroyo dbname=my_db";

        let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await.unwrap();

        let mut c = self.client.write().await;
        *c = Some(client);

        tokio::spawn(async move {
            if let Err(e) = connection.await {
                println!("connection error: {}", e);
            }
        });
    }
}

pub async fn user_name_from_id(context: Arc<Context>, id: i64) -> String {
    let client = context.client.read().await;
    let rows = client
        .as_ref()
        .unwrap()
        .query("SELECT name FROM users WHERE id = $1", &[&id])
        .await
        .unwrap();

    return if let Some(row) = rows.get(0) {
        row.get(0)
    } else {
        format!("Unknown-{}", id)
    };
}