Arroyo SQL supports various types of joins over streaming and batch data sources.

Window joins

Window joins operate over bounded time-oriented windows (one of TUMBLE, HOP, or SESSION). Each subquery being joined must be computed over the exact same window, and all matching elements within that window (according to the type of join) will be returned.

For example, a join over TUMBLE(interval '1 minute') will be computed once a minute, and return all matching elements within that minute. Window completions are triggered by the arrival of watermarks, as described here.

Window joins default to INNER joins, but you may also specify LEFT, RIGHT, or FULL before the JOIN keyword.

A full example of a window join looks like this:

CREATE TABLE page_views (
  event_time TIMESTAMP,
  user_id TEXT,
  page_url TEXT
) WITH (
    ...
);

CREATE TABLE ad_clicks (
  event_time TIMESTAMP,
  user_id TEXT,
  ad_id TEXT
) WITH (
    ...
);

SELECT
  pv.window,
  page_views,
  ad_clicks
FROM (
  SELECT TUMBLE(INTERVAL '1 hour') AS window, COUNT(DISTINCT user_id) as page_views
  FROM page_views
  GROUP BY 1
) pv
INNER JOIN (
  SELECT TUMBLE(INTERVAL '1 hour') AS window, COUNT(DISTINCT user_id) as ad_clicks
  FROM ad_clicks
  GROUP BY 1
) ac
ON pv.window = ac.window;

Note that it is not currently possible to reaggregate the results of a windowed join.

Updating joins

Unlike window joins, updating joins do not require inputs to be time-windowed. These behave like standard SQL joins, but are computed incrementally and produce updating output (a stream of delete, append, and update results), which must be sent to an update-compatible sink (for example, using the debezium_json format).

An example updating join looks like this:

CREATE TABLE users (
  user_id TEXT PRIMARY KEY,
  user_name TEXT
) WITH (
  ...
);

CREATE TABLE user_actions (
  event_time TIMESTAMP,
  user_id TEXT,
  action TEXT
) WITH (
  ...
);


SELECT u.user_id, u.user_name, a.action, a.event_time
FROM users u
JOIN user_actions a
ON u.user_id = a.user_id;

As updating joins are not time-bounded, there is nothing that bounds the state size; in principle they must remember all data the pipeline has seen. This is not usually practical, so Arroyo by default uses a TTL of 1 day after which join state is discarded. This may be configured via SET updating_ttl. See the updating docs for more details.

Lookup joins

Lookup joins allow you to enrich streams by referencing external or static data stored in external systems (e.g., Redis, relational databases). The right side of a lookup join is a special kind of table, called a TEMPORARY TABLE, which is unmaterialized and backed by the external system. Each record that comes into the join from the left (stream) side causes a query to the underlying system.

For example, you may have a set of detailed customer records stored in an RDBMS that you would like to use to enrich an analytics stream.

Currently, Redis is supported as a lookup connector.

An example lookup join looks like this:

CREATE TEMPORARY TABLE customers (
    -- For Redis lookup tables, it's required that there be a single
    -- METADATA FROM 'key' marked as PRIMARY KEY, as Redis only supports
    -- efficient lookups by key
    customer_id TEXT METADATA FROM 'key' PRIMARY KEY,
    name TEXT,
    plan TEXT
) with (
    connector = 'redis',
    address = 'redis://localhost:6379',
    format = 'json',
    'lookup.cache.max_bytes' = 1000000,
    'lookup.cache.ttl' = interval '5 seconds'
);

CREATE TABLE events (
    event_id TEXT,
    timestamp TIMESTAMP,
    customer_id TEXT,
    event_type TEXT
) WITH (
    connector = 'kafka',
    topic = 'events',
    type = 'source',
    format = 'json',
    bootstrap_servers = 'broker:9092'
);

SELECT  e.event_id,  e.timestamp,  c.name, c.plan
FROM  events e
LEFT JOIN customers c
-- you may use SQL expressions like concat to generate the exact key
-- format in Redis
ON concat('customer.', e.customer_id) = c.customer_id
WHERE c.plan = 'Premium';

The lookup.cache.max_bytes and lookup.cache.ttl are optional arguments that control the behavior of the built-in cache, which avoids the need to query the same keys over and over again.

Lookup joins can be either INNER (the default) or LEFT.