Debezium is a CDC (change data capture) tool that is widely used to dynamically respond to changes in databases like Postgres and MySQL. In this tutorial we’ll show how to use Debezium as a source for Arroyo.

We will read off of the checkpoints table in the Arroyo postgres database, which stores information on checkpoint progress and history for Arroyo pipelines.

Setting up Arroyo

For this tutorial, we will run Arroyo locally. To install Arroyo, see the getting started instructions.

By default, Arroyo uses a sqlite data when run locally. We’ll instead have it use postgres. Ensure that postgres is installed and running, then we’ll set up the Arroyo database by running

$ psql postgres -c "CREATE USER arroyo WITH PASSWORD 'arroyo' SUPERUSER;"
$ createdb arroyo
$ arroyo migrate

Configure Postgres

Next, we’ll need to configure Postgres in order to enable Debezium to read the changes. First, we’ll need to enable logical decoding by adding the following to postgresql.conf:

wal_level = logical

The location of this file varies depending on operating system and method of installation. On Mac OS with Homebrew, it will be something like /opt/homebrew/var/postgresql@15/postgresql.conf, while on Ubuntu something like /etc/postgresql/14/main/postgresql.conf.

Then restart Postgres to apply the changes:

Next, for the checkpoints table, we need to enable the REPLICA IDENTITY to FULL:

For this demo we can use the same arroyo user that we created for the Arroyo services, but in production it’s recommended to create a separate user for Debezium.

Run Debezium and kafka

Debezium requires Zookeeper, Kafka and some way to run Kafka Connect. There are many ways to do this, but for simplicity we recommend a single docker-compose file:

docker-compose.yml
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    depends_on:
      - zookeeper
    ports:
      - 9094:9094
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: EXTERNAL://0.0.0.0:9092,INTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: EXTERNAL://localhost:9092,INTERNAL://kafka:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  debezium:
    image: debezium/connect:2.3
    depends_on:
      - kafka
    ports:
      - 8083:8083
    environment:
      BOOTSTRAP_SERVERS: kafka:9094
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: debezium_connect_configs
      OFFSET_STORAGE_TOPIC: debezium_connect_offsets
      STATUS_STORAGE_TOPIC: debezium_connect_statuses

Write that to a file (e.g. docker-compose.yml) and run it with docker-compose up.

Confirm that the three docker images are running via docker ps and tailing their logs.

Create Debezium Connector

We can use curl to create a debezium connection using the Debezium REST API. First, create a file connector-config.json with contents

connector-config.json
{
    "name": "arroyo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "host.docker.internal",
        "database.port": "5432",
        "database.user": "arroyo",
        "database.password": "arroyo",
        "database.dbname": "arroyo",
        "database.server.name": "arroyo",
        "tombstones.on.delete": "false",
        "table.include.list": "public.checkpoints",
        "topic.prefix": "arroyo",
        "plugin.name": "pgoutput",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
    }
}

Then, submit it to Debezium via

$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" \
  localhost:8083/connectors -d @connector-config.json

This should now emit Debezium messages for each change to the checkpoints table to a topic named arroyo.public.checkpoints.

Run a job to generate checkpoints

This tutorial will involve two Arroyo pipelines: one to generate checkpoints data, and one that consumes the database records generated by the first’s checkpoint activity.

First, we need to run Arroyo to use Postgres as its config store:

$ ARROYO__DATABASE__TYPE=postgres target/debug/arroyo cluster

We’ll start by creating the pipeline that will produce checkpoints—which will form the input for our debezium data.

Open the Arroyo Pipelines UI at http://localhost:5115/pipelines/new, and create a new pipeline with the following SQL:

CREATE TABLE impulse (
    counter BIGINT UNSIGNED NOT NULL,
    subtask_index BIGINT UNSIGNED NOT NULL
)
WITH (
    connector = 'impulse',
    event_rate = '100'
);

select count(*)
from impulse
group by hop(interval '2 seconds', interval '10 seconds');

This creates an impulse source that generates 100 events per second, and then counts them in 10-second sliding window, although the details of the query are not relevant to this tutorial.

Click “Launch,” give it a name, and click “Start.”

On the job detail page, there’s a “Checkpoints” tab; clicking into that will show the data that we’ll be reading from the database.

Consume checkpointing data

Next we’ll create a second pipeline that consumes the checkpointing data via Debezium.

CREATE TABLE checkpoints (
    id BIGINT PRIMARY KEY,
    organization_id TEXT,
    job_id TEXT,
    state_backend TEXT,
    epoch INT,
    min_epoch INT,
    start_time TIMESTAMP NOT NULL,
    finish_time TIMESTAMP,
    state TEXT,
    operators TEXT
) WITH (
    connector = 'kafka',
    bootstrap_servers = 'localhost:9092',
    type = 'source',
    topic = 'arroyo.public.checkpoints',
    format = 'debezium_json',
    'json.timestamp_format' = 'RFC3339'
);

SELECT * FROM checkpoints

This will merely dump out the data as three columns, "before", "after", and "op", in line with the Debezium schema. However, we can also write more complex queries over the source. For instance, consider the following query.

SELECT id, epoch, min_epoch, state FROM checkpoints where finish_time is null;

This will produce an updating dataset that shows all non-finished checkpoints as the state evolves.

Writing Debezium outputs

In addition to reading Debezium streams as update tables, Arroyo also supports writing Debezium messages to Kafka as a sink. See the connector docs for more details.

Have more questions? File an issue on Github or join our Discord!