DDL Statements
creating tables and views in Arroyo SQL
In addition to its SELECT capabilities Arroyo supports several Data Description Language (DDL) capabilities.
CREATE TABLE
Arroyo’s CREATE TABLE statements come in three flavors: Create Table As, Memory Table and Connection Table.
CREATE TABLE AS
This command creates a table from the query included in it, e.g.
CREATE TABLE orders AS SELECT customer_id, order_id FROM orders;
The schema is inferred from the query and other queries within the same context can select from the new table.
CREATE TABLE (In-Memory)
CREATE TABLE statements without any connection info are presumed to be in
memory. It can be written to within the same query context and then read from. A
memory table may be used in only one INSERT INTO
statement, but can then be
selected from multiple times. For example, you could create an orders
table
with a statement like
CREATE TABLE orders (customer_id INT, order_id INT);
CREATE VIEW
is simply an alias for creating a memory table.
CREATE TABLE (Connection)
Connection tables allow Arroyo to read and write to external systems like Kafka
clusters. Connections tables may be used as sources or sinks depending on
the type of connection. For details on all of the supported connectors, see the
Connectors docs. Connection tables can be created via the
Connections tabs of the Web UI, or directly in SQL via the CREATE TABLE
statement.
Connection tables are created via special CREATE TABLE
statements that include
a WITH
clause. The WITH
clause specifies the connector, the
format that the data is encoded with, and various other
options that are specific to the connector, as documented on the individual
connector pages. The general form of the statement is:
CREATE TABLE <table name> (
[<field name> <field type>,]..
) WITH (
connector = '<connector name>',
format = '<format name>',
[format options]
[connection options]
)
where connector
is one of the connectors documented here and
format is one of:
json
debezium_json
avro
raw_string
For example, to create a Kafka source for the topic order_topic
:
CREATE TABLE orders (
customer_id INT,
order_id INT,
date_string TEXT,
event_time TIMESTAMP GENERATED ALWAYS AS (CAST(date_string as TIMESTAMP)) STORED,
watermark TIMESTAMP GENERATED ALWAYS AS (CAST(date_string as TIMESTAMP) - INTERVAL '15' SECOND) STORED
) WITH (
connector = 'kafka',
format = 'json',
type = 'source',
bootstrap_servers = 'localhost:9092',
topic = 'order_topic',
event_time_field = 'event_time',
watermark_field = 'watermark'
);
Schema inference
When creating a connection table, you can specify the schema explicitly by listing fields in the CREATE TABLE statement, or you can let Arroyo infer the schema from how it’s used. This is mostly relevant for sinks, where the schema can be inferred from the query that writes to the table.
For example, a Kafka sink could be created like this
CREATE TABLE results WITH (
connector = 'kafka',
format = 'json',
type = 'sink',
bootstrap_servers = 'localhost:9092',
topic = 'results'
);
and written to like this
INSERT INTO results
SELECT customer_id as customer_id, count(*) as count
FROM orders
GROUP BY customer_id, TUMBLE(INTERVAL '1' HOUR);
That will result in records like this being written to the results
topic
{"customer_id": 1, "count": 10}
{"customer_id": 2, "count": 5}
Schema-inferred sinks can also be created in the Web UI by selecting the “infer schema” option when creating the table.
Note that when relying on schema inference, the column names will be determined
by the query, so you will generally want to alias them using as
to ensure they
are what you expect.
Options
Connection tables allow you to configure a number of options that specify and
modify the behavior. They are specified via the WITH
clause, with an unquoted
key and a single or double-quoted value. The following options are supported
across all connections. Specific connections have their own options. To see all
of the supported options, refer to the Connector docs.
Option | required | Description |
---|---|---|
connector | yes | The name of the connector to use. |
format | if connector does not have a built-in format | The format of the data to be deserialized. |
event_time_field | no | The name of the field to use as the event time. If not specified, the event time will be set by the source. |
watermark_field | no | The name of the field to use as the watermark. If not specified, the watermark will be 1 second before the event time. This will usually be the event_time_field minus some constant interval, e.g. event_time - interval '10 seconds' . |
idle_micros | no | The number of microseconds to wait before considering a source idle. Defaults to 30000000 (30 seconds). Set to -1 to disable source idleness. |
Virtual Fields
Virtual fields can be created within the CREATE TABLE
statement.
These are done using the GENERATED ALWAYS AS (expression) STORED
syntax.
expression
must be a valid Arroyo SQL expression that only depends on non-virtual fields within the table.
These will primarily be used to specify event time and watermark fields,
as most other use cases can be handled by the SELECT
statement.
INSERT INTO
Arroyo supports INSERT INTO statements for both memory and connection tables. In
line with standard SQL the insertion will happen column-wise, attempting
coercion to the SQL types. For example, if you have a memory table orders
with
columns customer_id
and order_id
you could insert into it with a statement
like
INSERT INTO orders SELECT customer, order FROM source_table;
If the table is a connection table this will result in a sink, otherwise it will be a memory table that can then be read from.