Setting up Kafka
The easiest way to get a Kafka broker up and running is to use the Confluent Platform. Follow the installation instructions here to set up a local installation on your machine. Once you have the Confluent Platform installed, you can start the Kafka broker by running the following command in the confluent directory:Generating data
The Confluent Platform includes Kafka connect, which can be used to generate data for testing using the datagen plugin. First, we need to install the plugin. Run the following command in the confluent directory:orders
with default settings. Then create a second topic with the
name results
.
Next we’re going to create a Kafka connect source to generate data. Click the “Connect” tab in the control center and
select the “connect-default” cluster.
Write this JSON to a file called /tmp/datagen.json
:
orders
topic on the Topics page, you should see data being generated.
Creating a Kafka connection
Now that we have a Kafka broker running and some data being generated, we can connect it to Arroyo by creating what’s called a “Connection,” which is an external system that Arroyo can read from (as a source) or write to (as a sink). There are two ways to do this: we can either create a connection in the Web UI, or directly in SQL using DDL statements. We’ll start by creating the source connection in the Web UI by opening it at http://localhost:5115. A Connection in Arroyo is an external system that Arroyo will read or write to. To create a Kafka connection, click the “Connections” tab in the Arroyo UI and then click “Create Connection.” Find Kafka in the list of connectors, and click “Create.” Next we will need to configure the connection information for the Kafka cluster. This is stored separately from the table configuration so that it can be shared across many different connections. At the bottom left, click “Create new”. How you configure the connection will depend on how you’re running Arroyo. For each method, you will use the following settings:- Name:
local-confluent
- Boostrap Servers:
If you’re running Arroyo locally, configure your kafka connection with:
- Authentication:
None
- Schema Registry:
Confluent Schema Registry
- Endpoint:
http://localhost:8081
,http://host.docker.internal:8081
, orhttp://<GATEWAY IP>:8081
similarly to how you configured the bootstrap servers above)
Configuring source table
Once the cluster config is created, we will need to configure the source table. Select your newly created connection profile, then click “Continue”. At the “Configure table” stage, enter:- Topic:
orders
orders
topic, and
starting from the latest offset.

- Data format:
JSON
- Schema type:
Confluent Schema Registry
When Kafka uses the schema registry, it uses a
special encoding
that’s not quite standard JSON. This means that it’s necessary to specify that
the schema registry is being used. In the UI, this can be done by fetching the schema from the schema registry.
In SQL this requires setting
'json.confluent_schema_registry' = 'true'
in the with clause.orders
, then test that it’s valid.
When you click “Test Connection,” Arroyo will attempt to connect to Kafka with the configured bootstrap servers,
validates that the topic exists, and tries reading a message from the topic.
If everything is good, click “Create” to create the connection.
Creating the Pipeline
Now we’re ready to talk to Kafka! Go to the Pipelines tab, and click “Create Pipeline.” This brings us to the pipeline SQL editor. On the left, we can see all of the sources and sinks we’ve created, including theorders
source. By clicking on
the source, we can see the schema of the source table.
We can start with a simple query that will select all the orders from the orders
table to see what the data looks like:

Inserting into a Kafka sink
There are two ways to create connections in Arroyo. Previously we created a Kafka source connection via the Web UI, and now we’re going to create a Kafka sink via SQL. Connections (sources and sinks) are modeled in SQL as tables, so we can create them via DDL statements. It will look like this:CREATE TABLE
statements that have a
connector
with option, which specifices which connector should be used; in
this case Kafka. Other required options are type
which specifies whether the
table is a source or a sink, and format
which specifies the format of the
data.
Then, each connector will have a set of options that may be set—for Kafka, we
need to set the bootstrap_servers
option to tell it how to connect, and the
topic
to write to. See all of the options for the kafka connector in the
connectors docs.
Now we’re ready to send the results of our query to the kafka sink we created,
which we do via an INSERT INTO
statement.
This gives us the entire query:
Note that in preview mode, connection sinks are swapped out for a Web Sink so that you
can see the output and ensuring no data test data is written the actual sink. The real
sink will be used when the pipeline is started.