Connect Arroyo to a Kafka topic
orders
with default settings. Then create a second topic with the
name results
.
Next we’re going to create a Kafka connect source to generate data. Click the “Connect” tab in the control center and
select the “connect-default” cluster.
Write this JSON to a file called /tmp/datagen.json
:
orders
topic on the Topics page, you should see data being generated.
local-confluent
None
Confluent Schema Registry
http://localhost:8081
, http://host.docker.internal:8081
, or
http://<GATEWAY IP>:8081
similarly to how you configured the bootstrap servers
above)orders
orders
topic, and
starting from the latest offset.
JSON
Confluent Schema Registry
'json.confluent_schema_registry' = 'true'
in the with clause.orders
, then test that it’s valid.
When you click “Test Connection,” Arroyo will attempt to connect to Kafka with the configured bootstrap servers,
validates that the topic exists, and tries reading a message from the topic.
If everything is good, click “Create” to create the connection.
orders
source. By clicking on
the source, we can see the schema of the source table.
We can start with a simple query that will select all the orders from the orders
table to see what the data looks like:
CREATE TABLE
statements that have a
connector
with option, which specifices which connector should be used; in
this case Kafka. Other required options are type
which specifies whether the
table is a source or a sink, and format
which specifies the format of the
data.
Then, each connector will have a set of options that may be set—for Kafka, we
need to set the bootstrap_servers
option to tell it how to connect, and the
topic
to write to. See all of the options for the kafka connector in the
connectors docs.
Now we’re ready to send the results of our query to the kafka sink we created,
which we do via an INSERT INTO
statement.
This gives us the entire query: