checkpoints
table in the Arroyo postgres database,
which stores information on checkpoint progress and history for Arroyo
pipelines.
Setting up Arroyo
For this tutorial, we will run Arroyo locally. To install Arroyo, see the getting started instructions. By default, Arroyo uses a sqlite data when run locally. We’ll instead have it use postgres. Ensure that postgres is installed and running, then we’ll set up the Arroyo database by runningConfigure Postgres
Next, we’ll need to configure Postgres in order to enable Debezium to read the changes. First, we’ll need to enable logical decoding by adding the following topostgresql.conf
:
/opt/homebrew/var/postgresql@15/postgresql.conf
,
while on Ubuntu something like /etc/postgresql/14/main/postgresql.conf
.
Then restart Postgres to apply the changes:
checkpoints
table, we need to enable the REPLICA IDENTITY
to FULL
:
arroyo
user that we created for the Arroyo services,
but in production it’s recommended to create a separate user for Debezium.
Run Debezium and kafka
Debezium requires Zookeeper, Kafka and some way to run Kafka Connect. There are many ways to do this, but for simplicity we recommend a single docker-compose file:docker-compose.yml
docker-compose.yml
) and run it with docker-compose up
.
Confirm that the three docker images are running via docker ps
and tailing their logs.
Create Debezium Connector
We can usecurl
to create a debezium connection using the Debezium REST API.
First, create a file connector-config.json with contents
connector-config.json
checkpoints
table to a topic named arroyo.public.checkpoints
.
Run a job to generate checkpoints
This tutorial will involve two Arroyo pipelines: one to generate checkpoints data, and one that consumes the database records generated by the first’s checkpoint activity. First, we need to run Arroyo to use Postgres as its config store:Consume checkpointing data
Next we’ll create a second pipeline that consumes the checkpointing data via Debezium."before"
, "after"
, and
"op"
, in line with the Debezium schema. However, we can also write more
complex queries over the source. For instance, consider the following query.