Arroyo provides sources and sinks to read and write from Confluent Cloud-hosted Kafka topics, with support for exactly-once semantics.

Setup

To connect to Confluent Cloud, you may create a connection either in the Web UI or directly via. SQL.

Create a Connection via UI

To create a connection via the Web UI, open the Arroyo UI and click “Connections” in the left sidebar, then “Create Connection.” On the connectors page, click “Create” under the Confluent Cloud connector.

On the next screen, you will configure the Connection Profile, which tells Arroyo how to connect and authenticate to your Confluent Cloud account. Either select an existing profile, or click “Create new” to create a new profile.

The Create Connection screen in Arroyo

Start by giving the connection profile a name, which will help you identify this connection. You may wish for this to be the same as the cluster name in Confluent Cloud.

For the rest of the fields, we’ll need some information from your Confluent Cloud account.

Connect to Confluent Cloud

In another tab, open the Confluent Cloud console, and navigate to the Cluster Settings page for the cluster you wish to connect:

Confluent Cloud cluster settings page

Find the Bootstrap server (like pkc-p11xm.us-east-1.aws.confluent.cloud:9092) and enter that in the “Bootstrap Servers” field.

Next we need credentials for Arroyo to connect to your Kafka and Confluent Schema Registry. Click API Keys in the Confluent Cloud sidebar. Click “Add key” and create a new set of credentials. On the last page, you will see the Key and Secret. Enter these into the Arroyo connection dialog as the key and secret fields.

if you prefer, you may manage keys and secrets using your own secret management infrastructure, like Hashicorp Vault. If using a separate secrets manager, you should inject the credentials into the pods running Arroyo services as environment variables. These variables can be referenced when setting up a connection using {{ }} template syntax, like {{KAFKA_KEY}}

Creating an API key in the Confluent Cloud UI

Give the new credentials a name, and click “Download and continue.”

Confluent Schema Registry

Optionally, you may also configure the connection to the Confluent Schema Registry if you wish to use it to fetch and produce schemas for your data.

In the Confluent Cloud UI, click Environments in the left sidebar, then select the environment with your cluster.

On the right side bar, you will see a panel called “Stream Governance API.” with an Endpoint URL.

Finding the schema registry endpoint in the Confluent Cloud UI

Enter that into the Arroyo Connection as the Endpoint, under “Confluent Schema Registry.”

Next, click the “Add Key” or “View & Manage” link under “Credentials” in the Stream Governance API panel.

That will take you to the credentials page, where you can create a new set of credentials.

Creating credentials for the schema registry

Enter the Key and Secret as the API Key and API Secret for Confluent Schema Registry, then click “Download and continue.”

Finally, in the Arroyo UI, click “Validate” to test the connection. If everything is set up correctly, you should see a success message and can click “Create” to save the connection.

Creating the table

Once you’ve told Arroyo how to connect to your Confluent Cloud account, you can create source and sink tables that use that configuration.

Next enter the topic you would like to use, and choose whether this will be a source or a sink.

As a source, you may choose the initial offset to read from (either earliest or latest), and whether this should be a transactional (exactly-once) source or not.

As a sink you may choose the commit mode, which determines whether the sink is transactional. Choosing exactly-once means that messages will be written exactly once to the sink, but you may experience higher latency waiting for the transaction to commit durably.

On the next screen, select the format and schema for your data. See the format docs for more details on the supported formats. If you are using Confluent Schema Registry, select that in the “schema type” drop-down.

Finally, give the table a name (which you will use to reference it in SQL) and click “Test Connection.” This will verify that your connection is properly set up and we are able to connect to your Confluent cluster successfully. Once everything has been validated, click “Create.”

Configuration

A Confluent Cloud table can have the following configuration:

FieldDescriptionRequiredExample
connection_profileThe name of the connection profile to use for this tableYesmy_connection
topicThe name of the Kafka topic to read from or write toYesorders_topic
typeThe type of table (either ‘source’ or ‘sink’)Yessource
source.offsetThe offset to start reading from (either ‘earliest’ or ‘latest’)Noearliest
source.read_modeThe read mode to use. ‘read_committed’ only reads committed Kafka messages, while ‘read_uncommitted’ will read all messages.Noread_committed
source.group_idFor sources, sets the Kafka consumer group to use; note that using the same group for multiple pipelines will result in each pipeline seeing only a subset of the dataNomy-group
sink.commit_modeThe commit mode to use (either ‘exactly_once’ or ‘at_least_once’)Noexactly_once

Confluent Sources

Confluent sources can be created via the Web UI, the API, or directly in SQL. A Confluent source is defined by a topic name and a schema.

Schemas can be defined via json-schema, or automatically configured via Confluent Schema Registry.

Confluent sources implement exactly-once semantics by storing the last-read offset in Arroyo’s state. Additionally, you can set source.read_mode to read_committed to ensure that only committed messages are read.

Confluent Sinks

Confluent Kafka sinks can be created via the Web UI, the API, or directly in SQL. A Confluent Kafka sink is defined by a topic name.

The sink supports both exactly once and at least once modes. At least once delivery will proactively write to the downstream Kafka topic as messages come in. This can potentially result in duplicate messages in the event of a pipeline failure.

Exactly once delivery writes to Kafka use its transaction API. Data is staged within each epoch of the checkpointing system, and then committed through a two-phase protocol once all data is staged.

Confluent DDL

Kafka connection tables can be defined via SQL DDL and used as sources and sinks in SQL queries.

For example:

CREATE TABLE orders (
  customer_id INT,
  order_id INT
) WITH (
  connector = 'confluent',
  connection_profile = 'my_cluster',
  format = 'avro',
  topic = 'order_topic',
  type = 'source',
  'source.offset' = 'latest',
  'source.read_mode' = 'read_committed'
);

Schema Registry integration

Arroyo supports the Confluent Schema Registry for both JSON and Avro schemas. To use the Schema Registry for a table, you must have configured the connection profile with the schema registry endpoint and credentials. Then, it will be available as an option when setting the schema for the table.

How Arroyo handles schemas depends on whether the table is a source or a sink and the format.

Source tables

For source tables, Arroyo will read the schema from the schema registry at the time the table is created. This JSON or Avro schema is converted into a SQL table schema, as if it had been defined directly in SQL.

JSON

Because SQL has a less flexible type system than JSON schema, not all JSON schema features are supported directly. For any fields that cannot be converted to a SQL type, Arroyo will create a column of type JSON and store the raw JSON value in that column. The JSON value can then be processed via SQL JSON functions.

The schema is read from the schema registry at the time the table is created, which for tables created via the UI is when the table is saved; for tables created via SQL it is when the pipeline is started. The schema is cached for the lifetime of the table, so if the schema in the schema registry is updated or deleted, reads from the table will continue to use the original schema.

Avro

For Avro, there are some features that cannot be converted to SQL types:

  • Unions (aside from unions that are a single type and null, which are converted to nullable columns)
  • Maps
  • Arrays

Fields with these types will be re-encoded as JSON and stored in a JSON column, in which form they can be processed via SQL JSON functions.

At table creation time (when the table is saved in the UI or when the pipeline is started via SQL for SQL-defined tables), Arroyo will read the schema from the schema registry and convert it to a SQL schema. That schema is used as the “reader” schema for the lifetime of the table and determines what fields are available. If the schema in the schema registry is updated or deleted, reads from the table will continue to use the original schema.

When using the schema registry, Arroyo will also look at the schema ID for each message, and if it encounters a new schema ID, it will fetch the schema from the schema registry and use it as the “writer” schema. Messages are then decoded using the writer schema and then evolved to conform to the reader schema. This ensures that we are able to read the data, so long as the schema in the schema registry is compatible with the reader schema. Arroyo supports all types of schema evolution, with the caveat that fields marked required—NOT NULL in SQL—cannot be removed.

Sink tables

For sink tables, Arroyo will write the schema to the schema registry at the time the pipeline is created. This schema is determined by the SQL schema of the table, and is converted to JSON or Avro as appropriate.

If there’s an existing schema in the schema registry, the new schema must be compatible for it to be written.