Arroyo has built-in support for a number of data formats, including JSON, Avro, and Parquet. But what if you need to support reading and writing data in a custom format?

User-defined functions (UDFs) to the rescue! UDFs allow you to write custom code (currently in Rust) that can be used in your SQL queries. This can extend Arroyo in powerful ways, implementing complex logic, new types of aggregations, and more.

In this tutorial we’ll walk through how to use UDFs to parse a custom format, in this case the Common Log Format, which is used by many webservers to log requests.

The Common Log Format (hereafter CLF) is a simple, line-oriented textual format. Each line represents a single request, and has the following format:

host ident authuser [date] "request" status bytes

For example:

127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326

Set up

Before you begin, you will need a running Arroyo cluster. You can follow the instructions in the getting started guide.

We will also be using Arroyo’s StreamGen tool to generate some sample data (you can also use your own data via Kafka or one of the other sources if you prefer).

You can install StreamGen with if you have Cargo installed:

Cargo
$ cargo install streamgen

Or you can download a pre-built binary from the releases page.

Once everything is set up you, should be able to run

$ streamgen --help
Usage: streamgen [OPTIONS] <SPEC>

Arguments:
  <SPEC>  Type of data to generator [possible values: common-log, impulse]

Options:
  -o, --output <OUTPUT>  Controls where the generated data is sent [possible values: stdout, sse]
  -r, --rate <RATE>      Rate of generation in records per second
  -l, --limit <LIMIT>    Max number of records to generate
      --port <PORT>      Port for SSE server
  -h, --help             Print help
  -V, --version          Print version

Generate some data

Let’s start by generating some sample data. We’ll use StreamGen to generate a stream of CLF records, and send them to Arroyo via Server-Sent Events.

$ streamgen --format string common-log sse --port 9563

This will start a server on port 9563 that will respond to SSE requests with a stream of CLF records. You can test it out by running:

$ curl -N http://localhost:9563/sse

which should produce a stream of output like:

id: 9236
data: 45.169.149.76 - clara_quidem [30/Nov/2023:13:51:25 -0800] "GET /var/day.xls" 200 4746 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; AS; rv:11.0) like Gecko"

id: 9237
data: 196.87.121.200 - elliot_dolor [30/Nov/2023:13:51:26 -0800] "GET /" 400 4531 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.517 Safari/537.36"

id: 9238
data: 44.133.114.73 - zoie_quibusdam [30/Nov/2023:13:51:26 -0800] "GET /sbin/jackson.mp3" 401 6277 "-" "Mozilla/5.0 (iPad; CPU OS 5_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko ) Version/5.1 Mobile/9B176 Safari/7534.48.3"

Create the source

Now that we have a stream of data, we’ll set up an Arroyo source to read it.

Open up Arroyo at http://localhost:5115, to the “Pipelines” tab, then click “Create Pipeline.” navigate This will open up the pipeline editor.

We’ll start by defining our source, with a simple select * query.

create table logs (
    value TEXT
) with (
    connector = 'sse',
    endpoint = 'http://host.docker.internal:9563/sse',
    format = 'raw_string'
);

select * from logs;

The host.docker.internal hostname is a special hostname that Docker for Mac sets up to allow containers to access services running on the host machine. If you’re running Arroyo directly on your machine or in Docker on Linux, you should use localhost instead.

Click “Start Preview” to run the pipeline. You should see a stream of records flowing through the pipeline.

If you see an error in the console that says Error while reading from EventSource, that means Arroyo was not able to connect to the SSE server. Make sure the server is running and that you have the correct hostname and port.

Writing the UDF

Now that we have the data available, we can parse it into a more useful form using a UDF.

Currently, UDFs can only output a single column of a SQL data type. However, we’d like to extract multiple fields from the CLF record. The easiest way to do this today is to convert the record to JSON, then access it later using SQL JSON functions.

To do this, we’ll write a UDF that takes a string as input, parses it as a CLF record, and outputs a JSON object with the parsed fields.

For the actual parsing, we’ll rely on the Rust regex crate.

To create a new UDF, click the “UDFs” tab in the pipeline editor, then click “New.”

Then paste the following UDF code into the editor:

/*
[dependencies]
regex = "1"
serde_json = "1"
chrono = "0.4"
*/

use arroyo_udf_plugin::udf;
use regex::Regex;
use serde_json::json;
use chrono::DateTime;
use std::str::FromStr;

#[udf]
fn parse_log(s: &str) -> Option<String> {
    // use OnceLock to prevent re-compilation of the regexes on every request
    use std::sync::OnceLock;
    static REGEX: OnceLock<Regex> = OnceLock::new();
    let regex = REGEX.get_or_init(|| {
        Regex::new(r#"([0-9a-f\.:]+) ([\S\-]+) ([\S\-]+) \[([^\]]+)\] "(.*)" ([0-9]{3}) ([0-9]*).*"#).unwrap()
    });

    // try to parse the log, returning None if it fails
    let captures = regex.captures(s)?;

    // try parsing the timestamp using the chrono library
    let date = DateTime::parse_from_str(captures.get(4).unwrap().as_str(), "%d/%b/%Y:%H:%M:%S %z").ok()?;

    // pull out the fields
    Some(json!({
        "host": captures.get(1).unwrap().as_str().to_string(),
        "ident": captures.get(2).unwrap().as_str().to_string(),
        "authuser": captures.get(3).unwrap().as_str().to_string(),
        "date": date.to_rfc3339(),
        "request": captures.get(5).unwrap().as_str().to_string(),
        "status": u16::from_str(captures.get(6).unwrap().as_str()).unwrap(),
        "bytes": usize::from_str(captures.get(7).unwrap().as_str()).unwrap(),
    }).to_string())
}

A few things to note here, if you’re not familiar with Rust and Arroyo UDFs:

  1. The special dependencies comment at the top is used to specify the Rust crates (libraries) that are used by the UDF, specified in the same format as a Cargo.toml file.
  2. UDFs are Rust functions that take some number of arguments and return a value. The arguments and return value must be a supported SQL type. In this case, we’re taking a &str (string reference) as input and returning an Option<String>, which allows us to filter out invalid records that fail parsing. If you instead wanted to fail processing on invalid records, you could return a String instead and use unwrap() instead of ? to handle errors.
  3. The name of the UDF (how it will be referenced in SQL) is the name of the function, in this case parse_log. Once you edit the name, the UI will automatically update to reflect the new name.
  4. The OnceLock is used to ensure that the regex is only compiled once, and then reused on subsequent calls. This is very important for performance, as the cost of compiling the regex is much higher than the cost of running it. This one optimization can improve performance by 10x or more.
  5. We use unwrap()—which is the panicking form of error handling in Rust—in various places in the UDF. However, in each case we use it we know that the operation cannot fail. For example, we know that if the Regex successfully matched, then the capture group must exist, so we can safely call unwrap() on it. Similarly, the regex has already validated that the status and bytes fields are valid integers, so we can safely call unwrap() on the string-to-integer conversions.
  6. The json! macro from the serde_json library is a very convenient way to construct JSON in Rust, allowing you to use a syntax that looks very similar to the output JSON.

Once you’ve entered the UDF, click “Check” to validate it. Any errors will be displayed in the “Errors” tab below.

You may see that the first time you run Check it takes a while—this is because we don’t ship a full Rust compiler in the default Docker container. Instead, we dynamically install it when the first UDF is checked or built.

Using the UDF

Now that we have the UDF defined, we can use it in our pipeline.

Return to the Query tab. We have a few options for how to use the UDF. We could use it in a SELECT statement, for example:

select parse_log(value) from logs;

But there’s one other thing we need to take into account when consuming data in Arroyo: time. Arroyo is built around event-time processing, which means that records can be processed according to the time they occur, rather than the time they are processed by Arroyo. This is important for handling out-of-order and late data.

In this case, we want to use the timestamp from the log record as the event time. To do this, we will rely on Arroyo’s ability to override event time as part of the CREATE TABLE statement via virtual columns.

create table logs (
    value TEXT,
    parsed TEXT GENERATED ALWAYS AS (parse_log(value)) STORED,
    event_time TIMESTAMP GENERATED ALWAYS AS
        (CAST(extract_json_string(parse_log(value), '$.date') as TIMESTAMP)) STORED
) with (
    connector = 'sse',
    endpoint = 'http://host.docker.internal:9563/sse',
    format = 'raw_string',
    event_time_field = 'event_time'
);

Here we’ve added two new columns to the table:

  • parsed contains our parsed log record, as a JSON string.
  • event_time contains the timestamp from the log record; we use the extract_json_string function to extract the timestamp from the JSON string.

Currently Arroyo does not support virtual columns that depend on other virtual columns, so we have to repeat the parse_log call in the event_time column.

Let’s stop some attacks

Now that we have the data in a more useful form, we can start to do some analysis on it. One thing we might want to do is detect when a particular IP address is making a lot of requests that return 401 (Unauthorized) errors; i.e., when someone is trying to break into our server.

We’ll start by finding those users, which we’ll define as any IP address that has made more than 10 requests in the last 5 minutes that returned a 401 error.

CREATE TABLE hosts AS (
    SELECT
        extract_json_string(parsed, '$.host') as host,
        count(*) as attempts
    FROM logs
    WHERE
        extract_json(parsed, '$.status')[1] = 401
    GROUP BY host, hop(interval '5 seconds', interval '5 minutes'));

SELECT host, attempts
FROM hosts
WHERE attempts > 10;

This query operates by grouping the records by host and a 5-minute window, then counting the number of records in each group. We then filter the results to only include groups with more than 10 records using a SELECT over the virtual table.

Doing something about it

Now that we’ve identified the hosts that are making a lot of unauthorized requests, we can do something about it. For example, we could use Arroyo’s Webhook sink to tell our firewall to block the offending IP address.

CREATE TABLE block (
  host TEXT,
  attempts INT
) WITH (
  connector = 'webhook',
  endpoint = '<your webhook URL here>',
  format = 'json'
);

INSERT INTO block
SELECT host, attempts
FROM hosts
WHERE attempts > 10;

To try this out, you can use a service like RequestBin to create a temporary webhook URL that will show you the requests that are being sent.

Note that when running in preview mode, Arroyo will not actually send the requests to the webhook and will instead log them to the console. To send the webhooks for real, you’ll need to start the pipeline with the “Launch” button.

Conclusion

In this tutorial we’ve seen how to use UDFs to parse a custom data format, and then use that data to detect and block malicious users. This is just one example of the power of UDFs, which can also be used to implement complex logic, new types of aggregations, and more.

We’d love to hear what you’re using UDFs for! If you have any questions or feedback, please reach out on Discord.