These release notes are also available on the Arroyo blog.

Arroyo 0.9.0 introduces async UDFs, which allow users to use databases, services, and models from within their pipelines. It also brings support for joining update tables, more control over bad data handling, a redesigned connection profile editor, and more.

For this release, we are thrilled to welcome two new contributors to the Arroyo project:

Thanks to all our contributors for this release:

Async UDFs

User-defined functions (UDFs) and user-defined aggregate functions (UDAFs) allow you to extend Arroyo with custom logic. New in Arroyo 0.9 is support for what we call async UDFs.

Existing (sync) UDFs are expected to implement simple, computational logic. Common use cases include parsing custom formats, implementing functions that are not natively supported, or implementing custom business logic that would be difficult to express in SQL. Because they are run synchronously with processing, if they take too long to run they can block the entire pipeline.

This isn’t a hypothetical concern. A UDF that takes 10ms to run will be limited to processing just 100 events per second per subtask!

And there are many use cases where you want to run logic that is not a simple, stateless computation. You may need to do point lookups in a database to enrich events, make an API call to another service, or even perform inference against an AI model.

Async UDFs allow you to do all of these things, without blocking the pipeline. Async UDFs are defined as a Rust async fn, supporting non-blocking IO. Then within the Arroyo runtime, many instances of the UDF can be run in parallel, with a configurable concurrency limit.

What does this look like? Let’s take an example of a UDF that enriches web events with GeoIP data by calling a GeoIP service. First, we define the UDF:

/*
[dependencies]
reqwest = { version = "0.11.23", features = ["json"] }
serde_json = "1"

[udfs]
async_max_concurrency = 1000
*/

pub async fn get_city(ip: String) -> Option<String> {
  let body: serde_json::Value =
    reqwest::get(
      format!("http://geoip-service:8000/{ip}"))
        .await
        .ok()?
        .json()
        .await
        .ok()?;

    body.pointer("/names/en").and_then(|t|
      t.as_str()
    ).map(|t| t.to_string())
}

Then we can use this UDF in a query, for example this one that finds the most common cities in the last 15 minutes:

create view cities as
select get_city(logs.ip) as city
from logs;

SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY window
        ORDER BY count DESC) as row_num
    FROM (SELECT count(*) as count,
        city,
        hop(interval '5 seconds', interval '15 minutes') as window
            FROM cities
            WHERE city IS NOT NULL
            group by city, window)) WHERE row_num <= 5;

Async UDFs support several configuration options that control their behavior, including the maximum number of concurrent requests, timeouts, and whether event order should be preserved.

They also support defining a Context struct which is passed in to each invocation of the UDF. This allows you to do setup for expensive operations (like setting up a Database connection pool) just once, and share the result with all invocations.

For example, we can create a client for Postgres like this:

pub struct Context {
    client: RwLock<Option<Client>>,
}

impl Context {
    pub fn new() -> Self {
        Self {
            client: RwLock::new(None),
        }
    }
}

#[async_trait]
impl arroyo_types::UdfContext for Context {
    async fn init(&self) {
        let conn_str = "host=localhost user=dbuser password=dbpassword dbname=my_db";

        let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await.unwrap();

        let mut c = self.client.write().await;
        *c = Some(client);

        tokio::spawn(async move {
            if let Err(e) = connection.await {
                println!("connection error: {}", e);
            }
        });
    }
}

See the docs for full details.

Joining Update tables

Arroyo has two semantics for streaming SQL, one based on event-time watermarks and another that we call “Update Tables,” which allow for incremental computation of most analytical SQL constructs. However, previously there was a restriction on joins, which were only supported for the watermark semantic.

Now that restriction is gone, and update tables can be joined. Hurray!

For example, this query will count the number of page views by user, and when a transaction comes in joins that accumulated count:

CREATE VIEW page_view_counts as
SELECT count(*) as count, user_id as user_id
FROM page_views
GROUP BY user_id;


SELECT T.user_id, amount, P.count as page_views
FROM transactions T
LEFT JOIN page_view_counts P on T.user_id = P.user_id;

Bad Data handling

It happens to the best of us. You’ve carefully built out your data architecture, perform rigorous code reviews, and diligently monitor your rollouts.

And yet it happens—somehow, invalid data got into your Kafka topic. Now in Arroyo 0.9, you can now configure the source to drop bad data instead of causing a processing failure.

This behavior can be configured via the Web UI or with the bad_data option when creating a source in SQL. Two options are currently available:

  • fail (default, and behavior in Arroyo 0.8) causes the pipeline to restart when bad data is encountered

  • drop causes the data to be dropped, while logging an error message to the console and incrementing the arroyo_worker_deserialization_errors metric

  • Add control for deserialization error behavior by @jbeisen in #443

  • Add bad data option to create connection form by @jbeisen in #452

Environment variable substitution

Some connectors need to be configured with authentication details or other secret data. Today in Arroyo those secrets are stored in the configuration database (postgres), which is not very secure.

Now in Arroyo 0.9, we’re introducing environment-variable substitution for sensitive configuration fields:

This feature allows you to use double curly braces ({{ }}) to reference environment variables, which will get substituted in at run time. For example, if you have an environment variable called WEBHOOK_SECRET you can now include this as a header in a webhook sink like

Authentication: Basic {{ WEBHOOK_SECRET }}

Env variable substitution can be used for both connections created via the Web UI and those created directly in SQL, like:

CREATE TABLE slack (
    value TEXT
) WITH (
    connector = 'webhook',
    endpoint = 'https://hooks.slack.com/services/{{ SLACK_KEY }}',
    method = 'POST',
    headers = 'Content-Type:application/json',
    format = 'json'
);
  • Support environment variable substitution by @jbeisen in #433

Confluent Cloud connector

Confluent is the company founded by the creators of Apache Kafka. They provide a cloud-native distribution of Kafka and serverless Kafka platform. While Arroyo has always supported reading and writing from Kafka, with Arroyo 0.9 we’re making it even easier to integrate with your Confluent Cloud topics with the new Confluent Cloud connector.

See the complete integration guide to get started.

Connection Profile UI

Connection Profiles encapsulate common configuration that is shared across multiple connection tables. For example, a single Kafka cluster may have many topics that you would like to consumer or produce to from Arroyo.

We’ve upgraded the process of creating and managing these profiles. It’s now possible to view and delete existing connection profiles, and the whole UI has gotten a little spiffier.

We’re also now validating that the profile is good (like ensuring we can talk to Kafka with the provided address and credentials), so you don’t discover you mistyped something at the end of the connection creation process.

  • Redesign cluster profile UI and add validation by @mwylde in #454
  • Add autocomplete when creating connections and Kafka topic autocomplete by @mwylde in #458

Fixes

Improvements

Full Changelog: https://github.com/ArroyoSystems/arroyo/compare/v0.8.1…v0.9.0