Debezium
Debezium is a CDC (change data capture) tool that is widely used to dynamically respond to changes in databases like Postgres and MySQL. In this tutorial we’ll show how to use Debezium as a source for Arroyo.
We will read off of the checkpoints table in the Arroyo postgres database,
which stores information on checkpoint progress and history for Arroyo
pipelines.
Setting up Arroyo
Section titled “Setting up Arroyo”For this tutorial, we will run Arroyo locally. To install Arroyo, see the getting started instructions.
By default, Arroyo uses a sqlite data when run locally. We’ll instead have it use postgres. Ensure that postgres is installed and running, then we’ll set up the Arroyo database by running
$ psql postgres -c "CREATE USER arroyo WITH PASSWORD 'arroyo' SUPERUSER;"$ createdb arroyo$ arroyo migrateConfigure Postgres
Section titled “Configure Postgres”Next, we’ll need to configure Postgres in order to enable Debezium to read the changes.
First, we’ll need to enable logical decoding by adding the following to postgresql.conf:
wal_level = logicalThe location of this file varies depending on operating system and method of installation.
On Mac OS with Homebrew, it will be something like /opt/homebrew/var/postgresql@15/postgresql.conf,
while on Ubuntu something like /etc/postgresql/14/main/postgresql.conf.
Then restart Postgres to apply the changes:
$ sudo systemctl restart postgresql$ brew services restart postgresqlNext, for the checkpoints table, we need to enable the REPLICA IDENTITY to FULL:
$ sudo -u postgres psql -d arroyo \ -c "ALTER TABLE checkpoints REPLICA IDENTITY FULL;"$ psql -d arroyo \ -c "ALTER TABLE checkpoints REPLICA IDENTITY FULL;"For this demo we can use the same arroyo user that we created for the Arroyo services,
but in production it’s recommended to create a separate user for Debezium.
Run Debezium and kafka
Section titled “Run Debezium and kafka”Debezium requires Zookeeper, Kafka and some way to run Kafka Connect. There are many ways to do this, but for simplicity we recommend a single docker-compose file:
version: '2'services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 2181:2181
kafka: image: confluentinc/cp-kafka:7.3.0 depends_on: - zookeeper ports: - 9094:9094 - 9092:9092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: EXTERNAL://0.0.0.0:9092,INTERNAL://0.0.0.0:9094 KAFKA_ADVERTISED_LISTENERS: EXTERNAL://localhost:9092,INTERNAL://kafka:9094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 debezium: image: debezium/connect:2.3 depends_on: - kafka ports: - 8083:8083 environment: BOOTSTRAP_SERVERS: kafka:9094 GROUP_ID: 1 CONFIG_STORAGE_TOPIC: debezium_connect_configs OFFSET_STORAGE_TOPIC: debezium_connect_offsets STATUS_STORAGE_TOPIC: debezium_connect_statusesWrite that to a file (e.g. docker-compose.yml) and run it with docker-compose up.
Confirm that the three docker images are running via docker ps and tailing their logs.
Create Debezium Connector
Section titled “Create Debezium Connector”We can use curl to create a debezium connection using the Debezium REST API.
First, create a file connector-config.json with contents
{ "name": "arroyo-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "host.docker.internal", "database.port": "5432", "database.user": "arroyo", "database.password": "arroyo", "database.dbname": "arroyo", "database.server.name": "arroyo", "tombstones.on.delete": "false", "table.include.list": "public.checkpoints", "topic.prefix": "arroyo", "plugin.name": "pgoutput", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false" }}Then, submit it to Debezium via
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ localhost:8083/connectors -d @connector-config.jsonThis should now emit Debezium messages for each change to the checkpoints
table to a topic named arroyo.public.checkpoints.
Run a job to generate checkpoints
Section titled “Run a job to generate checkpoints”This tutorial will involve two Arroyo pipelines: one to generate checkpoints data, and one that consumes the database records generated by the first’s checkpoint activity.
First, we need to run Arroyo to use Postgres as its config store:
$ ARROYO__DATABASE__TYPE=postgres target/debug/arroyo clusterWe’ll start by creating the pipeline that will produce checkpoints—which will form the input for our debezium data.
Open the Arroyo Pipelines UI at http://localhost:5115/pipelines/new, and create a new pipeline with the following SQL:
CREATE TABLE impulse ( counter BIGINT UNSIGNED NOT NULL, subtask_index BIGINT UNSIGNED NOT NULL)WITH ( connector = 'impulse', event_rate = '100');
select count(*)from impulsegroup by hop(interval '2 seconds', interval '10 seconds');This creates an impulse source that generates 100 events per second, and then counts them in 10-second sliding window, although the details of the query are not relevant to this tutorial.
Click “Launch,” give it a name, and click “Start.”
On the job detail page, there’s a “Checkpoints” tab; clicking into that will show the data that we’ll be reading from the database.
Consume checkpointing data
Section titled “Consume checkpointing data”Next we’ll create a second pipeline that consumes the checkpointing data via Debezium.
CREATE TABLE checkpoints ( id BIGINT PRIMARY KEY, organization_id TEXT, job_id TEXT, state_backend TEXT, epoch INT, min_epoch INT, start_time TIMESTAMP NOT NULL, finish_time TIMESTAMP, state TEXT, operators TEXT) WITH ( connector = 'kafka', bootstrap_servers = 'localhost:9092', type = 'source', topic = 'arroyo.public.checkpoints', format = 'debezium_json', 'json.timestamp_format' = 'RFC3339');
SELECT * FROM checkpointsThis will merely dump out the data as three columns, "before", "after", and
"op", in line with the Debezium schema. However, we can also write more
complex queries over the source. For instance, consider the following query.
SELECT id, epoch, min_epoch, state FROM checkpoints where finish_time is null;This will produce an updating dataset that shows all non-finished checkpoints as the state evolves.
Writing Debezium outputs
Section titled “Writing Debezium outputs”In addition to reading Debezium streams as update tables, Arroyo also supports writing Debezium messages to Kafka as a sink. See the connector docs for more details.
Have more questions? File an issue on Github or join our Discord!