Updating Tables
Semantics for updating tables in Arroyo SQL
Arroyo supports two semantics for streaming SQL, which we call Dataflow semantics and updating semantics. There’s a core problem of trying to execute a SQL query on an unbounded, streaming source: how do we know when to compute aggregates and joins, given that we will always see more data in the future?
In Dataflow semantics, which are introduced with the use of time-oriented windows,
like HOP
and TUMBLE
, we compute aggregates for a window once the watermark passes.
This is a powerful model, as allows the streaming system to signal completeness to its
consumers, so they don’t need to reason about when they are able to trust the results. However,
the requirement that all computations are windowed can be limiting.
Updating semantics, on the other hand, allow for more flexibility in the types of queries that can be expressed, including most of normal batch SQL. It works by treating the input stream as a table that is constantly being updated, and the output as updates on a materialized view of the query.
When writing to a sink, the output is a stream of updates (in Debezium format), representing additions, updates, and deletes to the materialized view.
Reading from updating sources
Source connectors such as kafka can specify the format as
'debezium_json'
to read Debezium formatted messages.
Messages with an op value of 'c'
(create) and 'r'
(read) are treated as an
insert, an op value of 'd'
(delete) is treated as a delete, and an op value of
'u'
(update) is unrolled as a delete followed by an insert.
Manipulating updating data in SQL
Updating data can be SELECT
ed over and filtered, however, it currently cannot be joined
or aggregated.
Writing to updating sinks
Updating queries can be written to sinks with the debezium_json
format. This output
can then be consumed by the Debezium sink connector
to write to a RDBMS like MySQL or Postgres.
For a complete example of this, see this tutorial.
TTLs
The base semantics of updating tables require that, for any event that comes in, we must be able to update the state the output. However doing this with complete correctness would require storing data for all time. This is generally intractable in a streaming system without blowing up our state. Therefore, updating states have a time-to-live (TTL) associated with them. This TTL is the maximum amount of time we will store a key after we last saw an event for it. (A key might be something like a user id or a transaction id; generally this is the thing being grouped by in an aggregation or joined on.)
By default, the TTL is 1 day, but it can be configured with the SET updating_ttl
command, which
takes a SQL interval. For example, to set the TTL to 1 hour:
Currently all queries in a pipeline share the same TTL. In the future, we may allow different TTLs for different queries.
Non-windowed aggregates
Aggregating data without a window will result in an updating output.
This will emit an insert the first time data is processed for a group and subsequent data
will retract the prior value and then insert the new value.
Aggregates are buffered in the operator, occasionally flushing. By default flushing happens every 1 second,
but can be overridden with the pipeline.update-aggregate-flush-interval
config.
For instance, the following query
will produce output data like the following:
before | after | op |
---|---|---|
null | { "rows": 300} | "c" |
{ "rows": 200 } | null | "d" |
null | { "rows": 200 } | "c" |
{ "rows": 100 } | null | "d" |
null | { "rows": 100 } | "c" |
For examples of manipulating updating data, see the Debezium Terminal.