Async Rust UDFs
Async User-defined functions (UDFs)
In addition to traditional scalar UDFs, Arroyo supports async UDFs, which
can make use of async
Rust functions. Currently, Python UDFs do not support
async functions.
Sync UDFs are expected to implement simple, computational logic. Common use cases include parsing custom formats, implementing functions that are not natively supported, or implementing custom business logic that would be difficult to express in SQL. Because they are run synchronously with processing, if they take too long to run they can block the entire pipeline.
And there are many use cases where you want to run logic that is not a simple, stateless computation. You may need to do point lookups in a database to enrich events, make an API call to another service, or even perform inference against an AI model.
Async UDFs allow you to do all of these things, without blocking the pipeline. Async UDFs are defined as a Rust async fn, supporting non-blocking IO. Then within the Arroyo runtime, many instances of the UDF can be run in parallel, with a configurable concurrency limit.
Note that async UDFs, like all Rust async functions, should not block or do
CPU-intensive work within the async runtime, as that will block execution of
other instances of the UDF. Instead, they should use async libraries (calling
.await
instead of blocking) or use
spawn_blocking.
Options
Async UDFs can be configured via options in the #[udf(...)]
macro.
Option | Default | Description |
---|---|---|
unordered | - | Results are returned as they are available, regardless of the ordering of the original data; this provides the highest throughput and lowest latency. |
ordered | - | Results are returned in the order of the original events. Records produced by the async UDF are held back until all earlier records have been emitted. |
allowed_in_flight | 1000 | The maximum number of concurrent executions of this UDF. |
timeout | 5s | The amount of time to wait for an Async UDF to complete. If this time is exceeded for an invocation, the pipeline will restart. |
Example
This example defines an async UDF that fetches a user’s name from a database.
/*
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-postgres = "0.7"
async-trait = "0.1.68"
*/
use arroyo_udf_plugin::udf;
use tokio_postgres::{NoTls, Error, Client};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::OnceCell;
static CLIENT: OnceCell<Arc<Client>> = OnceCell::const_new();
async fn get_client() -> Arc<Client> {
CLIENT.get_or_init(|| async {
let conn_str = "host=localhost user=arroyo password=arroyo dbname=my_db";
let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
println!("connection error: {}", e);
}
});
Arc::new(client)
});
}
#[udf(ordered, timeout="100ms", allowed_in_flight=100)]
async fn user_name_from_id(context: Arc<Context>, id: i64) -> Option<String> {
let client = get_client().await;
let rows = client
.query_opt("SELECT name FROM users WHERE id = $1", &[&id])
.await
.unwrap();
rows.map(|row| row.get(0))
}