CREATE TABLE
calls in SQL and must be configured
with format = 'debezium_json'
. Otherwise, the options are the same as for other
Kafka sources.
Postgres Sources
Setting up Debezium
To read from Postgres via Debezium, you will first need to set up Debezium to read from your Postgres changelogs and publish them to Kafka. See the Debezium Postgres docs for more details on how to set this up. An example Debezium config coud look like this:connector-config.json
Arroyo requires access to the full Postgres transaction log, so you must set the following
option on your table:
debezium.public.table_name
which will contain
the changes to the public.table_name
table in Postgres.
To verify that this is working, you can use the kafka-console-consumer
tool in your
Kafka installation to read from the topic:
Creating a Debezium table in Arroyo
Once you have a Debezium topic set up, you can create a table in Arroyo that will read from that topic. This is a normal Kafka source table, but you must set theformat
to
debezium_json
and the topic
to the Debezium topic you created above.
debezium_table
that will read from the topic
debezium.public.table_name
and will have the schema id int, name string, age int
.
You can then query this table like any other table in Arroyo:

Postgres Sinks
Setting up a Postgres sink
Arroyo can write query results to Postgres via the Debezium connector for JDBC. There are several steps to setting up a Postgres sink.Create topic
Create a Kafka topic to write to. This can be done via the
kafka-topics
tool in your Kafka installation:Create table
Create the destination table in Postgres:
psql
Configure Debezium
Configure the Debezium sink connector to write to the Postgres table:
connector.json
Writing to a Postgres sink
Now that we’ve set up Debezium, we can write to the sink table from Arroyo.The Debezium connector for JDBC requires that your data has a schema associated with it. There
are several mechanisms of providing a schema (including via Avro and the Confluent Schema Registry),
but as of now Arroyo only supports the embedded-JSON schema format, which is why the
value.converter
is set to org.apache.kafka.connect.json.JsonConverter
. and value.converter.schemas.enable
is set
to true
.On the Arroyo side, you must set the format
to debezium_json
to indicate that the data is in
this format and set 'json.include_schema' = true
to include the schema in the JSON data.