Debezium
Ingesting from Postgres tables with Debezium
Debezium is a CDC (change data capture) tool that is widely used to dynamically respond to changes in databases like Postgres and MySQL. In this tutorial we’ll show how to use Debezium as a source for Arroyo.
We will read off of the checkpoints
table in the Arroyo postgres database,
which stores information on checkpoint progress and history for Arroyo
pipelines.
Setting up Arroyo
First, follow the developer set up in order to run Arroyo locally. This set up will give us a persistent Postgres database we can use for our source.
Configure Postgres
Next, we’ll need to configure Postgres in order to enable Debezium to read the changes.
First, we’ll need to enable logical decoding by adding the following to postgresql.conf
:
The location of this file varies depending on operating system and method of installation.
On Mac OS with Homebrew, it will be something like /opt/homebrew/var/postgresql@15/postgresql.conf
,
while on Ubuntu something like /etc/postgresql/14/main/postgresql.conf
.
Then restart Postgres to apply the changes:
Next, for the checkpoints
table, we need to enable the REPLICA IDENTITY
to FULL
:
For this demo we can use the same arroyo
user that we created for the Arroyo services,
but in production it’s recommended to create a separate user for Debezium.
Run Debezium and kafka
Debezium requires Zookeeper, Kafka and some way to run Kafka Connect. There are many ways to do this, but for simplicity we recommend a single docker-compose file:
Write that to a file (e.g. docker-compose.yml
) and run it with docker-compose up
.
Confirm that the three docker images are running via docker ps
and tailing their logs.
Create Debezium Connector
We can use curl
to create a debezium connection using the Debezium REST API.
First, create a file connector-config.json with contents
Then, submit it to Debezium via
This should now emit Debezium messages for each change to the checkpoints
table to a topic named arroyo.public.checkpoints
.
Run a job to generate checkpoints
This tutorial will involve two Arroyo pipelines: one to generate checkpoints data, and one that consumes the database records generated by the first’s checkpoint activity.
We’ll start by creating the pipeline that will produce checkpoints—which will form the input for our debezium data.
Open the Arroyo Pipelines UI at http://localhost:5115/pipelines/new, and create a new pipeline with the following SQL:
This creates an impulse source that generates 100 events per second, and then counts them in 10-second sliding window, although the details of the query are not relevant to this tutorial.
Click “Launch,” give it a name, and click “Start.”
On the job detail page, there’s a “Checkpoints” tab; clicking into that will show the data that we’ll be reading from the database.
Consume checkpointing data
Next we’ll create a second pipeline that consumes the checkpointing data via Debezium.
This will merely dump out the data as three columns, "before"
, "after"
, and
"op"
, in line with the Debezium schema. However, we can also write more
complex queries over the source. For instance, consider the following query.
This will produce an updating dataset that shows all non-finished checkpoints as the state evolves.
Writing Debezium outputs
In addition to reading Debezium streams as update tables, Arroyo also supports writing Debezium messages to Kafka as a sink. See the connector docs for more details.
Have more questions? File an issue on Github or join our Discord!