Joins
Arroyo SQL supports various types of joins over streaming and batch data sources.
Window joins
Section titled “Window joins”Window joins operate over bounded time-oriented windows (one of TUMBLE, HOP, or SESSION). Each subquery being joined must be computed over the exact same window, and all matching elements within that window (according to the type of join) will be returned.
For example, a join over TUMBLE(interval '1 minute') will be computed once a minute, and return
all matching elements within that minute. Window completions are triggered by the arrival of watermarks,
as described here.
Window joins default to INNER joins, but you may also specify LEFT, RIGHT, or FULL before the JOIN
keyword.
A full example of a window join looks like this:
CREATE TABLE page_views ( event_time TIMESTAMP, user_id TEXT, page_url TEXT) WITH ( ...);
CREATE TABLE ad_clicks ( event_time TIMESTAMP, user_id TEXT, ad_id TEXT) WITH ( ...);
SELECT pv.window, page_views, ad_clicksFROM ( SELECT TUMBLE(INTERVAL '1 hour') AS window, COUNT(DISTINCT user_id) as page_views FROM page_views GROUP BY 1) pvINNER JOIN ( SELECT TUMBLE(INTERVAL '1 hour') AS window, COUNT(DISTINCT user_id) as ad_clicks FROM ad_clicks GROUP BY 1) acON pv.window = ac.window;Note that it is not currently possible to reaggregate the results of a windowed join.
Updating joins
Section titled “Updating joins”Unlike window joins, updating joins do not require inputs to be time-windowed. These
behave like standard SQL joins, but are computed incrementally and produce updating output
(a stream of delete, append, and update results), which must be sent to an update-compatible
sink (for example, using the debezium_json format).
An example updating join looks like this:
CREATE TABLE users ( user_id TEXT PRIMARY KEY, user_name TEXT) WITH ( ...);
CREATE TABLE user_actions ( event_time TIMESTAMP, user_id TEXT, action TEXT) WITH ( ...);
SELECT u.user_id, u.user_name, a.action, a.event_timeFROM users uJOIN user_actions aON u.user_id = a.user_id;As updating joins are not time-bounded, there is nothing that bounds the state
size; in principle they must remember all data the pipeline has seen. This is
not usually practical, so Arroyo by default uses a TTL of 1 day after which join
state is discarded. This may be configured via SET updating_ttl. See the
updating docs for more
details.
Lookup joins
Section titled “Lookup joins”Lookup joins allow you to enrich streams by referencing external or static data
stored in external systems (e.g., Redis, relational databases). The right side
of a lookup join is a special kind of table, called a TEMPORARY TABLE, which
is unmaterialized and backed by the external system. Each record that comes into
the join from the left (stream) side causes a query to the underlying system.
For example, you may have a set of detailed customer records stored in an RDBMS that you would like to use to enrich an analytics stream.
Currently, Redis is supported as a lookup connector.
An example lookup join looks like this:
CREATE TEMPORARY TABLE customers ( -- For Redis lookup tables, it's required that there be a single -- METADATA FROM 'key' marked as PRIMARY KEY, as Redis only supports -- efficient lookups by key customer_id TEXT METADATA FROM 'key' PRIMARY KEY, name TEXT, plan TEXT) with ( connector = 'redis', address = 'redis://localhost:6379', format = 'json', 'lookup.cache.max_bytes' = 1000000, 'lookup.cache.ttl' = interval '5 seconds');
CREATE TABLE events ( event_id TEXT, timestamp TIMESTAMP, customer_id TEXT, event_type TEXT) WITH ( connector = 'kafka', topic = 'events', type = 'source', format = 'json', bootstrap_servers = 'broker:9092');
SELECT e.event_id, e.timestamp, c.name, c.planFROM events eLEFT JOIN customers c-- you may use SQL expressions like concat to generate the exact key-- format in RedisON concat('customer.', e.customer_id) = c.customer_idWHERE c.plan = 'Premium';The lookup.cache.max_bytes and lookup.cache.ttl are optional arguments that
control the behavior of the built-in cache, which avoids the need to query the
same keys over and over again.
Lookup joins can be either INNER (the default) or LEFT.