Parsing with UDFs
Use user-defined functions to parse custom formats
Arroyo has built-in support for a number of data formats, including JSON, Avro, and Parquet. But what if you need to support reading and writing data in a custom format?
User-defined functions (UDFs) to the rescue! UDFs allow you to write custom code (currently in Rust) that can be used in your SQL queries. This can extend Arroyo in powerful ways, implementing complex logic, new types of aggregations, and more.
In this tutorial we’ll walk through how to use UDFs to parse a custom format, in this case the Common Log Format, which is used by many webservers to log requests.
The Common Log Format (hereafter CLF) is a simple, line-oriented textual format. Each line represents a single request, and has the following format:
For example:
Set up
Before you begin, you will need a running Arroyo cluster. You can follow the instructions in the getting started guide.
We will also be using Arroyo’s StreamGen tool to generate some sample data (you can also use your own data via Kafka or one of the other sources if you prefer).
You can install StreamGen with if you have Cargo installed:
Or you can download a pre-built binary from the releases page.
Once everything is set up you, should be able to run
Generate some data
Let’s start by generating some sample data. We’ll use StreamGen to generate a stream of CLF records, and send them to Arroyo via Server-Sent Events.
This will start a server on port 9563 that will respond to SSE requests with a stream of CLF records. You can test it out by running:
which should produce a stream of output like:
Create the source
Now that we have a stream of data, we’ll set up an Arroyo source to read it.
Open up Arroyo at http://localhost:5115, to the “Pipelines” tab, then click “Create Pipeline.” navigate This will open up the pipeline editor.
We’ll start by defining our source, with a simple select *
query.
The host.docker.internal
hostname is a special hostname that Docker for Mac sets up to allow
containers to access services running on the host machine. If you’re running Arroyo
directly on your machine or in Docker on Linux, you should use localhost
instead.
Click “Start Preview” to run the pipeline. You should see a stream of records flowing through the pipeline.
If you see an error in the console that says Error while reading from EventSource
, that means
Arroyo was not able to connect to the SSE server. Make sure the server is running and that
you have the correct hostname and port.
Writing the UDF
Now that we have the data available, we can parse it into a more useful form using a UDF.
Currently, UDFs can only output a single column of a SQL data type. However, we’d like to extract multiple fields from the CLF record. The easiest way to do this today is to convert the record to JSON, then access it later using SQL JSON functions.
To do this, we’ll write a UDF that takes a string as input, parses it as a CLF record, and outputs a JSON object with the parsed fields.
For the actual parsing, we’ll rely on the Rust regex crate.
To create a new UDF, click the “UDFs” tab in the pipeline editor, then click “New.”
Then paste the following UDF code into the editor:
A few things to note here, if you’re not familiar with Rust and Arroyo UDFs:
- The special
dependencies
comment at the top is used to specify the Rust crates (libraries) that are used by the UDF, specified in the same format as a Cargo.toml file. - UDFs are Rust functions that take some number of arguments and return a
value. The arguments and return value must be a supported SQL type. In this
case, we’re taking a &str (string reference) as input and returning an
Option<String>
, which allows us to filter out invalid records that fail parsing. If you instead wanted to fail processing on invalid records, you could return aString
instead and useunwrap()
instead of?
to handle errors. - The name of the UDF (how it will be referenced in SQL) is the name of the function, in this case
parse_log
. Once you edit the name, the UI will automatically update to reflect the new name. - The
OnceLock
is used to ensure that the regex is only compiled once, and then reused on subsequent calls. This is very important for performance, as the cost of compiling the regex is much higher than the cost of running it. This one optimization can improve performance by 10x or more. - We use
unwrap()
—which is the panicking form of error handling in Rust—in various places in the UDF. However, in each case we use it we know that the operation cannot fail. For example, we know that if the Regex successfully matched, then the capture group must exist, so we can safely callunwrap()
on it. Similarly, the regex has already validated that the status and bytes fields are valid integers, so we can safely callunwrap()
on the string-to-integer conversions. - The
json!
macro from the serde_json library is a very convenient way to construct JSON in Rust, allowing you to use a syntax that looks very similar to the output JSON.
Once you’ve entered the UDF, click “Check” to validate it. Any errors will be displayed in the “Errors” tab below.
You may see that the first time you run Check it takes a while—this is because we don’t ship a full Rust compiler in the default Docker container. Instead, we dynamically install it when the first UDF is checked or built.
Using the UDF
Now that we have the UDF defined, we can use it in our pipeline.
Return to the Query tab. We have a few options for how to use the UDF. We could use it in a SELECT statement, for example:
But there’s one other thing we need to take into account when consuming data in Arroyo: time. Arroyo is built around event-time processing, which means that records can be processed according to the time they occur, rather than the time they are processed by Arroyo. This is important for handling out-of-order and late data.
In this case, we want to use the timestamp from the log record as the event time. To do this, we will rely on Arroyo’s ability to override event time as part of the CREATE TABLE statement via virtual columns.
Here we’ve added two new columns to the table:
- parsed contains our parsed log record, as a JSON string.
- event_time contains the timestamp from the log record; we use the
extract_json_string
function to extract the timestamp from the JSON string.
Currently Arroyo does not support virtual columns that depend on other virtual columns, so we
have to repeat the parse_log
call in the event_time
column.
Let’s stop some attacks
Now that we have the data in a more useful form, we can start to do some analysis on it. One thing we might want to do is detect when a particular IP address is making a lot of requests that return 401 (Unauthorized) errors; i.e., when someone is trying to break into our server.
We’ll start by finding those users, which we’ll define as any IP address that has made more than 10 requests in the last 5 minutes that returned a 401 error.
This query operates by grouping the records by host and a 5-minute window, then counting the number of records in each group. We then filter the results to only include groups with more than 10 records using a SELECT over the virtual table.
Doing something about it
Now that we’ve identified the hosts that are making a lot of unauthorized requests, we can do something about it. For example, we could use Arroyo’s Webhook sink to tell our firewall to block the offending IP address.
To try this out, you can use a service like RequestBin to create a temporary webhook URL that will show you the requests that are being sent.
Note that when running in preview mode, Arroyo will not actually send the requests to the webhook and will instead log them to the console. To send the webhooks for real, you’ll need to start the pipeline with the “Launch” button.
Conclusion
In this tutorial we’ve seen how to use UDFs to parse a custom data format, and then use that data to detect and block malicious users. This is just one example of the power of UDFs, which can also be used to implement complex logic, new types of aggregations, and more.
We’d love to hear what you’re using UDFs for! If you have any questions or feedback, please reach out on Discord.