Select statements are used to manipulate data in Arroyo.``` The general form of that statement is:

[WITH with_query [, ...]]
SELECT select_expr [, ...]
FROM from_item
[JOIN join_item [, ...]]
[WHERE condition]
[GROUP BY grouping_element [, ...]]

WITH clause

The with clauses allow you to give names to subquery which you can then reference. The syntax for a with clause is:

WITH query_name AS (subquery) [,...]

For example, using the nexmark source, you can create datasets for bids and price and then join.


WITH bids AS
    (SELECT bid.auction AS auction, bid.price AS price
        FROM nexmark where bid is not null),
auctions AS
    (SELECT auction.id AS id
        FROM nexmark where auction is not null)
SELECT * FROM bids bids
        JOIN auctions auctions
            ON bids.auction = auctions.id;

SELECT clause

The select cause is a comma-separated list of expressions, with an optional alias.

Column names must be unique.

SELECT select_expr [, ...]

FROM clause

The FROM clause specifies the primary source of data. It will be either a table name or subquery. The table name can be either a saved source, a table created in the WITH clause or a table created via CREATE TABLE and inserted into. Tables can be given aliases, but will default to their name as the alias for things like joins.

FROM from_item

JOIN clause

The JOIN clause allows you to join multiple tables together. By default it will be an inner join, but you can also specify LEFT, RIGHT, or FULL joins. Joins must include an ON clause specifying the join condition.

SELECT * FROM bids join auctions on bids.auction = auctions.id;

Joins in Arroyo are flexible, and depending on exactly how they are structured may be either append or updating. The append mode is the basic output mode for streaming and will be used when the operator is able to only emit final results.

This will happen when the left and right inputs are non-updating and either the join is an inner join or the join is over an event time window.

Otherwise, the join will be updating and will be a combination of appends (add a row), updates (change a row), and retractions (remove a row). Updating tables will need to be emitted to a sink that supports updates, currently Kafka with format = 'debezium_json' and the Console sink.

WHERE clause

The WHERE clause allows you to filter the data with a boolean condition. This predicate is applied to the incoming rows, so cannot include conditions on the resulting columns.

WHERE condition

GROUP BY clause

The GROUP BY clause is used to compute aggregates over some set of fields. All GROUP BY queries will implicitly include a time window, and if the input doesn’t already have a time window, it should be specified as one of the grouping fields.

For example,

SELECT
    count(*) AS bids,
    count(distinct auction_id) AS distinct_auctions,
    tumble(interval '1 minute') AS window
FROM BIDS GROUP BY 3

UNNEST operator

The UNNEST operator allows you to unnest arrays into multiple rows. This can be used as a normal scalar function with the following restrictions:

  • It may only appear in the SELECT clause
  • Only one array may be unnested per select statement For example,
SELECT
    UNNEST(make_array(1, 2, 3)) as x
FROM BIDS;

which will produce the following output:

+---+
| x |
+---+
| 1 |
| 2 |
| 3 |
+---+