Connect Arroyo to a Confluent Kafka topic
pkc-p11xm.us-east-1.aws.confluent.cloud:9092
)
and enter that in the “Bootstrap Servers” field.
Next we need credentials for Arroyo to connect to your Kafka and Confluent
Schema Registry. Click API Keys
in the Confluent Cloud sidebar. Click “Add
key” and create a new set of credentials. On the last page, you will see the Key
and Secret. Enter these into the Arroyo connection dialog as the key
and
secret
fields.
{{ }}
template syntax, like {{KAFKA_KEY}}
Field | Description | Required | Example |
---|---|---|---|
connection_profile | The name of the connection profile to use for this table | Yes | my_connection |
topic | The name of the Kafka topic to read from or write to | Yes | orders_topic |
type | The type of table (either ‘source’ or ‘sink’) | Yes | source |
source.offset | The offset to start reading from (either ‘earliest’ or ‘latest’) | No | earliest |
source.read_mode | The read mode to use. ‘read_committed’ only reads committed Kafka messages, while ‘read_uncommitted’ will read all messages. | No | read_committed |
source.group_id | For sources, sets the Kafka consumer group to use; note that using the same group for multiple pipelines will result in each pipeline seeing only a subset of the data | No | my-group |
sink.commit_mode | The commit mode to use (either ‘exactly_once’ or ‘at_least_once’) | No | exactly_once |
source.read_mode
to
read_committed
to ensure that only committed messages are read.
JSON
and store the raw JSON value in that column. The JSON value can then be processed via SQL JSON functions.
The schema is read from the schema registry at the time the table is created, which for tables created via the UI
is when the table is saved; for tables created via SQL it is when the pipeline is started. The schema is
cached for the lifetime of the table, so if the schema in the schema registry is updated or deleted, reads
from the table will continue to use the original schema.
null
, which are converted to nullable
columns)JSON
column, in which form they
can be processed via SQL JSON functions.
At table creation time (when the table is saved in the UI or when the pipeline
is started via SQL for SQL-defined tables), Arroyo will read the schema from the schema registry and
convert it to a SQL schema. That schema is used as the “reader” schema for the lifetime of the table
and determines what fields are available. If the schema in the schema registry is updated or deleted,
reads from the table will continue to use the original schema.
When using the schema registry, Arroyo will also look at the schema ID for each
message, and if it encounters a new schema ID, it will fetch the schema from the
schema registry and use it as the “writer” schema. Messages are then decoded
using the writer schema and then evolved to conform to the reader schema. This
ensures that we are able to read the data, so long as the schema in the schema
registry is compatible with the reader schema. Arroyo supports
all types of schema evolution,
with the caveat that fields marked required—NOT NULL in SQL—cannot be removed.