Arroyo supports two semantics for streaming SQL, which we call Dataflow semantics and updating semantics. There’s a core problem of trying to execute a SQL query on an unbounded, streaming source: how do we know when to compute aggregates and joins, given that we will always see more data in the future?

In Dataflow semantics, which are introduced with the use of time-oriented windows, like HOP and TUMBLE, we compute aggregates for a window once the watermark passes. This is a powerful model, as allows the streaming system to signal completeness to its consumers, so they don’t need to reason about when they are able to trust the results. However, the requirement that all computations are windowed can be limiting.

Updating semantics, on the other hand, allow for more flexibility in the types of queries that can be expressed, including most of normal batch SQL. It works by treating the input stream as a table that is constantly being updated, and the output as updates on a materialized view of the query.

When writing to a sink, the output is a stream of updates (in Debezium format), representing additions, updates, and deletes to the materialized view.

Reading from updating sources

Source connectors such as kafka can specify the format as 'debezium_json' to read Debezium formatted messages. Messages with an op value of 'c' (create) and 'r' (read) are treated as an insert, an op value of 'd' (delete) is treated as a delete, and an op value of 'u' (update) is unrolled as a delete followed by an insert.

Manipulating updating data in SQL

Updating data can be SELECTed over and filtered, however, it currently cannot be joined or aggregated.

Writing to updating sinks

Updating queries can be written to sinks with the debezium_json format. This output can then be consumed by the Debezium sink connector to write to a RDBMS like MySQL or Postgres.

For a complete example of this, see this tutorial.

TTLs

The base semantics of updating tables require that, for any event that comes in, we must be able to update the state the output. However doing this with complete correctness would require storing data for all time. This is generally intractable in a streaming system without blowing up our state. Therefore, updating states have a time-to-live (TTL) associated with them. This TTL is the maximum amount of time we will store a key after we last saw an event for it. (A key might be something like a user id or a transaction id; generally this is the thing being grouped by in an aggregation or joined on.)

By default, the TTL is 1 day, but it can be configured with the SET updating_ttl command, which takes a SQL interval. For example, to set the TTL to 1 hour:

SET updating_ttl = '1 hour';

Currently all queries in a pipeline share the same TTL. In the future, we may allow different TTLs for different queries.

Non-windowed aggregates

Aggregating data without a window will result in an updating output. This will emit an insert the first time data is processed for a group and subsequent data will retract the prior value and then insert the new value. Aggregates are buffered in the operator, occasionally flushing. By default flushing happens every 1 second, but can be overridden with the pipeline.update-aggregate-flush-interval config.

For instance, the following query

CREATE TABLE impulse WITH (
    'connector' = 'impulse',
    event_rate = '100'
);
SELECT count(*) as rows
FROM impulse;

will produce output data like the following:

beforeafterop
null { "rows": 300}"c"
{ "rows": 200 }null"d"
null{ "rows": 200 }"c"
{ "rows": 100 }null"d"
null{ "rows": 100 }"c"

For examples of manipulating updating data, see the Debezium Terminal.