Arroyo supports two semantics for streaming SQL, which we call Dataflow semantics and updating semantics. There’s a core problem of trying to execute a SQL query on an unbounded, streaming source: how do we know when to compute aggregates and joins, given that we will always see more data in the future?

In Dataflow semantics, which are introduced with the use of time-oriented windows, like HOP and TUMBLE, we compute aggregates for a window once the watermark passes. This is a powerful model, as allows the streaming system to signal completeness to its consumers, so they don’t need to reason about when they are able to trust the results. However, the requirement that all computations are windowed can be limiting.

Updating semantics, on the other hand, allow for more flexibility in the types of queries that can be expressed, including most of normal batch SQL. It works by treating the input stream as a table that is constantly being updated, and the output as updates on a materialized view of the query.

When writing to a sink, the output is a stream of updates (in Debezium format), representing additions, updates, and deletes to the materialized view.

Reading from updating sources

Source connectors such as kafka can specify the format as 'debezium_json' to read Debezium formatted messages.

Updating sources need at least one primary key, which tells Arroyo which rows are logically the same. The primary key is specified in the DDL, like this:

CREATE TABLE debezium_source (
    id INT PRIMARY KEY,
    customer_name TEXT,
    price FLOAT,
    order_date TIMESTAMP,
    status TEXT
) WITH (
    connector = 'kafka',
    format = 'debezium_json',
    type = 'source',
    ...
);

Manipulating updating data in SQL

Updating data can be SELECTed over and filtered, however, it currently cannot be joined or aggregated.

Writing to updating sinks

Updating queries can be written to sinks with the debezium_json format. This output can then be consumed by the Debezium sink connector to write to a RDBMS like MySQL or Postgres.

For a complete example of this, see this tutorial.

TTLs

The base semantics of updating tables require that, for any event that comes in, we must be able to update the state the output. However doing this with complete correctness would require storing data for all time. This is generally intractable in a streaming system without blowing up our state. Therefore, updating states have a time-to-live (TTL) associated with them. This TTL is the maximum amount of time we will store a key after we last saw an event for it. (A key might be something like a user id or a transaction id; generally this is the thing being grouped by in an aggregation or joined on.)

By default, the TTL is 1 day, but it can be configured with the SET updating_ttl command, which takes a SQL interval. For example, to set the TTL to 1 hour:

SET updating_ttl = '1 hour';

Currently all queries in a pipeline share the same TTL. In the future, we may allow different TTLs for different queries.

Non-windowed aggregates

Aggregating data without a window will result in an updating output. This will emit an insert the first time data is processed for a group and subsequent data will retract the prior value and then insert the new value. Aggregates are buffered in the operator, occasionally flushing. By default flushing happens every 1 second, but can be overridden with the pipeline.update-aggregate-flush-interval config.

For instance, the following query

CREATE TABLE impulse WITH (
    'connector' = 'impulse',
    event_rate = '100'
);
SELECT count(*) as rows
FROM impulse
HAVING rows < 500;

will produce output data like the following:

beforeafterop
null{ "rows": 100}"c"
{ "rows": 100 }{ "rows": 200}"u"
{ "rows": 200 }{ "rows": 300}"u"
{ "rows": 300 }{ "rows": 400}"u"
{ "rows": 400 }null"d"

For examples of manipulating updating data, see the Debezium Terminal.