In addition to its SELECT capabilities Arroyo supports several Data Description Language (DDL) capabilities.

CREATE TABLE

Arroyo’s CREATE TABLE statements come in three flavors: Create Table As, Memory Table and Connection Table.

CREATE TABLE AS

This command creates a table from the query included in it, e.g.

CREATE TABLE orders AS SELECT customer_id, order_id FROM orders;

The schema is inferred from the query and other queries within the same context can select from the new table.

CREATE TABLE (In-Memory)

CREATE TABLE statements without any connection info are presumed to be in memory. It can be written to within the same query context and then read from. A memory table may be used in only one INSERT INTO statement, but can then be selected from multiple times. For example, you could create an orders table with a statement like

CREATE TABLE orders (customer_id INT, order_id INT);

CREATE VIEW is simply an alias for creating a memory table.

CREATE TABLE (Connection)

Connection tables allow Arroyo to read and write to external systems like Kafka clusters. Connections tables may be used as sources or sinks depending on the type of connection. For details on all of the supported connectors, see the Connectors docs. Connection tables can be created via the Connections tabs of the Web UI, or directly in SQL via the CREATE TABLE statement.

Connection tables are created via special CREATE TABLE statements that include a WITH clause. The WITH clause specifies the connector, the format that the data is encoded with, and various other options that are specific to the connector, as documented on the individual connector pages. The general form of the statement is:

CREATE TABLE <table name> (
  [<field name> <field type>,]..
) WITH (
  connector = '<connector name>',
  format = '<format name>',
  [format options]
  [connection options]
)

where connector is one of the connectors documented here and format is one of:

  • json
  • debezium_json
  • avro
  • raw_string

For example, to create a Kafka source for the topic order_topic:

CREATE TABLE orders (
  customer_id INT,
  order_id INT,
  date_string TEXT,
  event_time TIMESTAMP GENERATED ALWAYS AS (CAST(date_string as TIMESTAMP)) STORED,
  watermark TIMESTAMP GENERATED ALWAYS AS (CAST(date_string as TIMESTAMP) - INTERVAL '15' SECOND) STORED
) WITH (
  connector = 'kafka',
  format = 'json',
  type = 'source',
  bootstrap_servers = 'localhost:9092',
  topic = 'order_topic',
  event_time_field = 'event_time',
  watermark_field = 'watermark'
);

Schema inference

When creating a connection table, you can specify the schema explicitly by listing fields in the CREATE TABLE statement, or you can let Arroyo infer the schema from how it’s used. This is mostly relevant for sinks, where the schema can be inferred from the query that writes to the table.

For example, a Kafka sink could be created like this

CREATE TABLE results WITH (
  connector = 'kafka',
  format = 'json',
  type = 'sink',
  bootstrap_servers = 'localhost:9092',
  topic = 'results'
);

and written to like this

INSERT INTO results
SELECT customer_id as customer_id, count(*) as count
FROM orders
GROUP BY customer_id, TUMBLE(INTERVAL '1' HOUR);

That will result in records like this being written to the results topic

{"customer_id": 1, "count": 10}
{"customer_id": 2, "count": 5}

Schema-inferred sinks can also be created in the Web UI by selecting the “infer schema” option when creating the table.

Note that when relying on schema inference, the column names will be determined by the query, so you will generally want to alias them using as to ensure they are what you expect.

Options

Connection tables allow you to configure a number of options that specify and modify the behavior. They are specified via the WITH clause, with an unquoted key and a single or double-quoted value. The following options are supported across all connections. Specific connections have their own options. To see all of the supported options, refer to the Connector docs.

OptionrequiredDescription
connectoryesThe name of the connector to use.
formatif connector does not have a built-in formatThe format of the data to be deserialized.
event_time_fieldnoThe name of the field to use as the event time. If not specified, the event time will be set by the source.
watermark_fieldnoThe name of the field to use as the watermark. If not specified, the watermark will be 1 second before the event time. This will usually be the event_time_field minus some constant interval, e.g. event_time - interval '10 seconds'.
idle_microsnoThe number of microseconds to wait before considering a source idle. Defaults to 30000000 (30 seconds). Set to -1 to disable source idleness.

Virtual Fields

Virtual fields can be created within the CREATE TABLE statement. These are done using the GENERATED ALWAYS AS (expression) STORED syntax. expression must be a valid Arroyo SQL expression that only depends on non-virtual fields within the table. These will primarily be used to specify event time and watermark fields, as most other use cases can be handled by the SELECT statement.

INSERT INTO

Arroyo supports INSERT INTO statements for both memory and connection tables. In line with standard SQL the insertion will happen column-wise, attempting coercion to the SQL types. For example, if you have a memory table orders with columns customer_id and order_id you could insert into it with a statement like

INSERT INTO orders SELECT customer, order FROM source_table;

If the table is a connection table this will result in a sink, otherwise it will be a memory table that can then be read from.