Arroyo provides sources and sinks for Apache Kafka for consistently reading and writing data from Kafka topics using exactly-once semantics. Kafka’s distributed log model makes it a great fit for stream processing applications like Arroyo.

Arroyo supports Kafka clusters that are self-hosted, using Amazon MSK1, and Confluent Cloud.

1 Note that MSK Serverless is not currently supported due to its reliance on non-standard IAM authentication

Configuring the Connection

Kafka connections can be created via the Web UI or via SQL.

A Kafka connection has several required and optional fields:

FieldDescriptionRequiredExample
bootstrap_serversA comma-separated list of Kafka servers to connect toYeskafka-server-1.cluster:9092,kafka-server-2.cluster:9092
auth.typeOne of ‘none’ or ‘sasl’Nosasl
auth.protocolThe SASL protocol to use (e.g., SASL_PLAINTEXT, SASL_SSL)NoSASL_PLAINTEXT
auth.mechanismThe SASL mechanism to use (e.g., SCRAM-SHA-256, SCRAM-SHA-512)NoSCRAM-SHA-256
auth.usernameThe username to use for SASL authenticationNouser
auth.passwordThe password to use for SASL authenticationNopassword
topicThe name of the Kafka topic to read from or write toYes
typeThe type of table (either ‘source’ or ‘sink’)Yessource
value.subjectConfigures the value subject read from the schema registry, if enabledNologs-value
source.offsetThe offset to start reading from (either ‘earliest’ or ‘latest’)Noearliest
source.read_modeThe read mode to use. ‘read_committed’ only reads committed Kafka messages, while ‘read_uncommitted’ will read all messages.Noread_committed
source.group_idFor sources, sets the Kafka consumer group to use; note that using the same group for multiple pipelines will result in each pipeline seeing only a subset of the dataNomy-group
sink.commit_modeThe commit mode to use (either ‘exactly_once’ or ‘at_least_once’)Noexactly_once
’sink.timestamp_field’The field to use as the timestamp for the Kafka messageNotimestamp
’sink.key_field’The field to use as the key for the Kafka messageNokey
schema_registry.endpointThe URL of the Confluent Schema Registry to useNohttps://schema-registry.cluster:8081
schema_registry.api_keyThe API Key to use for the Schema Registry. Will be the username via basic authNoABCDEFGHIJK01234
schema_registry.api_secretThe API Secret to use for the Schema Registry. Will be the password via basic authNoabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
client_configsKey-value pairs of additional rdkafka configuration optionsNomessage.max.bytes=1000,queue.buffering.max.kbytes=1048576

Kafka Sources

Kafka sources can be created via the Web UI, the API, or directly in SQL. A Kafka source is defined by a topic name and a schema.

Schemas can be defined via json-schema, or automatically configured via Confluent Schema Registry.

Kakfa sources implement exactly-once semantics by storing the last-read offset in Arroyo’s state. Additionally, you can set source.read_mode to read_committed to ensure that only committed messages are read.

Kafka Sinks

Kafka sinks can be created via the Web UI, the API, or directly in SQL. A Kafka sink is defined by a topic name. Currently, Kafka sinks only support writing JSON data, with the structure determined by the schema of the data being written.

The Kafka sink supports both exactly once and at least once modes. At least once delivery will proactively write to the downstream Kafka topic as messages come in. This can potentially result in duplicate messages in the event of a pipeline failure.

Exactly once delivery writes to Kafka using its transaction API. Data is staged within each epoch of the checkpointing system, and then committed through a two-phase protocol once all data is staged. Currently the system does not cleanly recover from failures during the commit phase, as this requires Kafka to be better able to participate in a two-phase commit. See this issue for more details.

Producer metadata

Messages produced to Kafka include data—a sequence of bytes typically encoded in a format like JSON, Avro, or Protobuf—as well a metadata. The Arroyo Kafka connector supports setting metadata fields—currently timestamp and key—to support more powerful integrations. For example:

create table sink (
    timestamp TIMESTAMP NOT NULL,
    user TEXT,
    event TEXT
) with (
    connector = 'kafka',
    bootstrap_servers = 'localhost:9092',
    format = 'json',
    'sink.timestamp_field' = 'timestamp',
    'sink.key_field' = 'user',
    type = 'sink',
    topic = 'events'
);

Kafka DDL

Kafka connection tables can be defined via SQL DDL and used as sources and sinks in SQL queries.

For example:

CREATE TABLE orders (
  customer_id INT,
  order_id INT
) WITH (
  connector = 'kafka',
  format = 'json',
  bootstrap_servers = 'kafka-server-1.cluster:9092,kafka-server-2.cluster:9092',
  topic = 'order_topic',
  type = 'source',
  'source.offset' = 'earliest',
  'source.read_mode' = 'read_committed'
);