Arroyo provides sources and sinks for Apache Kafka for consistently reading and writing data from Kafka topics using exactly-once semantics. Kafka’s distributed log model makes it a great fit for stream processing applications like Arroyo.
Arroyo supports Kafka clusters that are self-hosted, using Amazon MSK1, and Confluent Cloud.
1 Note that MSK Serverless is not currently supported due to its reliance on non-standard IAM authentication
Configuring the Connection
Kafka connections can be created via the Web UI or via SQL.
A Kafka connection has several required and optional fields:
|bootstrap_servers||A comma-separated list of Kafka servers to connect to||Yes|
|auth.type||One of ‘none’ or ‘sasl’||No|
|auth.protocol||The SASL protocol to use (e.g., ||No|
|auth.mechanism||The SASL mechanism to use (e.g., ||No|
|auth.username||The username to use for SASL authentication||No|
|auth.password||The password to use for SASL authentication||No|
|topic||The name of the Kafka topic to read from or write to||Yes|
|type||The type of table (either ‘source’ or ‘sink’)||Yes|
|source.offset||The offset to start reading from (either ‘earliest’ or ‘latest’)||No|
|source.read_mode||The read mode to use. ‘read_committed’ only reads committed Kafka messages, while ‘read_uncommitted’ will read all messages.||No|
|source.group_id||For sources, sets the Kafka consumer group to use; note that using the same group for multiple pipelines will result in each pipeline seeing only a subset of the data||No|
|sink.commit_mode||The commit mode to use (either ‘exactly_once’ or ‘at_least_once’)||No|
|schema_registry.endpoint||The URL of the Confluent Schema Registry to use||No|
|schema_registry.api_key||The API Key to use for the Schema Registry. Will be the username via basic auth||No|
|schema_registry.api_secret||The API Secret to use for the Schema Registry. Will be the password via basic auth||No|
Kafka sources can be created via the Web UI, the API, or directly in SQL. A Kafka source is defined by a topic name and a schema.
Schemas can be defined via json-schema, or automatically configured via Confluent Schema Registry.
Kakfa sources implement exactly-once semantics by storing the last-read offset in Arroyo’s state.
Additionally, you can set
read_committed to ensure that only committed messages are read.
Kafka sinks can be created via the Web UI, the API, or directly in SQL. A Kafka sink is defined by a topic name. Currently, Kafka sinks only support writing JSON data, with the structure determined by the schema of the data being written.
The Kafka sink supports both exactly once and at least once modes. At least once delivery will proactively write to the downstream Kafka topic as messages come in. This can potentially result in duplicate messages in the event of a pipeline failure.
Exactly once delivery writes to Kafka using its transaction API. Data is staged within each epoch of the checkpointing system, and then committed through a two-phase protocol once all data is staged. Currently the system does not cleanly recover from failures during the commit phase, as this requires Kafka to be better able to participate in a two-phase commit. See this issue for more details.
Kafka connection tables can be defined via SQL DDL and used as sources and sinks in SQL queries.
CREATE TABLE orders ( customer_id INT, order_id INT ) WITH ( connector = 'kafka', format = 'json', bootstrap_servers = 'kafka-server-1.cluster:9092,kafka-server-2.cluster:9092', topic = 'order_topic', type = 'source', 'source.offset' = 'earliest', 'source.read_mode' = 'read_committed' );