Arroyo provides sources and sinks for Amazon Kinesis for reading and writing data from Kinesis streams. Kinesis’s real-time data streaming capabilities make it a great fit for stream processing applications like Arroyo.

Configuring the Connection

Kinesis connections can be created via the Web UI or via SQL.

Kinesis creation flow

A Kinesis connection has several required and optional fields:

FieldDescriptionDefault ValueExample
stream_nameThe Kinesis stream name for this tableMust be specifiedmy_stream
source.offsetThe offset to start reading from (either ‘earliest’ or ‘latest’)latestlatest
sink.max_records_per_batchThe number of records to batch together before writing to Kinesis500100
sink.max_bytes_per_batchThe maximum size of a batch of records to write to Kinesis45000001000000
sink.flush_interval_millisThe number of milliseconds to wait before flushing a batch of records to Kinesis10001000

Kinesis Sources

Kinesis sources can be created via the Web UI, the API, or directly in SQL. Kinesis sources implement exactly-once semantics by storing the last-read sequence number in Arroyo’s state. To query from Kinesis using SQL, use a CREATE TABLE statement like

CREATE TABLE orders (
  customer_id INT,
  order_id INT
) WITH (
  connector = 'kinesis',
  stream_name = 'my_stream',
  type = 'source',
  format = 'json',
  'source.offset' = 'latest'
);

Kinesis Sinks

Kinesis sinks can be created via the Web UI, the API, or directly in SQL. To write to Kinesis from SQL, create a table with a CREATE TABLE statement like

CREATE TABLE orders_sink (
  customer_id INT,
  order_id INT
) WITH (
  connector = 'kinesis',
  stream_name = 'my_sink_stream',
  type = 'sink',
  format = 'json',
  'sink.max_records_per_batch' = '100',
  'sink.max_bytes_per_batch' = '5000000',
  'sink.flush_interval_millis' = '1000'
);