Arroyo provides sources and sinks for Apache Kafka for consistently reading and writing data from Kafka topics using exactly-once semantics. Kafka’s distributed log model makes it a great fit for stream processing applications like Arroyo.

Arroyo supports Kafka clusters that are self-hosted, using Amazon MSK1, and Confluent Cloud.

1 Note that MSK Serverless is not currently supported due to its reliance on non-standard IAM authentication

Configuring the Connection

Kafka connections can be created via the Web UI or via SQL.

Kafka creation flow

A Kafka connection has several required and optional fields:

FieldDescriptionRequiredExample
bootstrap_serversA comma-separated list of Kafka servers to connect toYeskafka-server-1.cluster:9092,kafka-server-2.cluster:9092
auth.typeOne of ‘none’ or ‘sasl’Nosasl
auth.protocolThe SASL protocol to use (e.g., SASL_PLAINTEXT, SASL_SSL)NoSASL_PLAINTEXT
auth.mechanismThe SASL mechanism to use (e.g., SCRAM-SHA-256, SCRAM-SHA-512)NoSCRAM-SHA-256
auth.usernameThe username to use for SASL authenticationNouser
auth.passwordThe password to use for SASL authenticationNopassword
topicThe name of the Kafka topic to read from or write toYes
typeThe type of table (either ‘source’ or ‘sink’)Yessource
source.offsetThe offset to start reading from (either ‘earliest’ or ‘latest’)Noearliest
source.read_modeThe read mode to use. ‘read_committed’ only reads committed Kafka messages, while ‘read_uncommitted’ will read all messages.Noread_committed
source.group_idFor sources, sets the Kafka consumer group to use; note that using the same group for multiple pipelines will result in each pipeline seeing only a subset of the dataNomy-group
sink.commit_modeThe commit mode to use (either ‘exactly_once’ or ‘at_least_once’)Noexactly_once
schema_registry.endpointThe URL of the Confluent Schema Registry to useNohttps://schema-registry.cluster:8081
schema_registry.api_keyThe API Key to use for the Schema Registry. Will be the username via basic authNoABCDEFGHIJK01234
schema_registry.api_secretThe API Secret to use for the Schema Registry. Will be the password via basic authNoabcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789

Kafka Sources

Kafka sources can be created via the Web UI, the API, or directly in SQL. A Kafka source is defined by a topic name and a schema.

Schemas can be defined via json-schema, or automatically configured via Confluent Schema Registry.

Kakfa sources implement exactly-once semantics by storing the last-read offset in Arroyo’s state. Additionally, you can set source.read_mode to read_committed to ensure that only committed messages are read.

Kafka Sinks

Kafka sinks can be created via the Web UI, the API, or directly in SQL. A Kafka sink is defined by a topic name. Currently, Kafka sinks only support writing JSON data, with the structure determined by the schema of the data being written.

The Kafka sink supports both exactly once and at least once modes. At least once delivery will proactively write to the downstream Kafka topic as messages come in. This can potentially result in duplicate messages in the event of a pipeline failure.

Exactly once delivery writes to Kafka using its transaction API. Data is staged within each epoch of the checkpointing system, and then committed through a two-phase protocol once all data is staged. Currently the system does not cleanly recover from failures during the commit phase, as this requires Kafka to be better able to participate in a two-phase commit. See this issue for more details.

Kafka DDL

Kafka connection tables can be defined via SQL DDL and used as sources and sinks in SQL queries.

For example:

CREATE TABLE orders (
  customer_id INT,
  order_id INT
) WITH (
  connector = 'kafka',
  format = 'json',
  bootstrap_servers = 'kafka-server-1.cluster:9092,kafka-server-2.cluster:9092',
  topic = 'order_topic',
  type = 'source',
  'source.offset' = 'earliest',
  'source.read_mode' = 'read_committed'
);