RabbitMQ is a message broker that supports a wide variety of event-processing patterns and protocols. The Arroyo RabbitMQ connector supports reading from its native its native Streams protocol, which has capabilities around replay and persistence to support at-least-once semantics for processing.

(To use RabbitMQ’s MQTT support, see the MQTT connector.)

Configuring the source

RabbitMQ Stream connections can be created via the Web UI or using SQL DDL.

A RabbitMQ Stream source requires the following fields:

FieldDescriptionRequiredExample
hostRabbitMQ host addressYeslocalhost
portRabbitMQ portNo5552
usernameRabbitMQ usernameNoguest
passwordRabbitMQ passwordNoguest
virtual_hostRabbitMQ virtual hostNo/
tls_config.enabledEnable TLS for the connectionNotrue
tls_config.trust_certificatesTrust all certificatesNotrue
tls_config.root_certificates_pathPath to root certificatesNo/path/to/certs
tls_config.client_certificates_pathPath to client certificatesNo/path/to/client/certs
tls_config.client_keys_pathPath to client keysNo/path/to/keys
load_balancer_modeEnable load balancer mode for the connectionNotrue
streamThe RabbitMQ stream to read fromYesmy-stream
typeMust be ‘source’Yessource
source.offsetStarting offset for the source. One of ‘first’, ‘next’, or ‘last’Nolast

See the RabbitMQ Streams documentation for more details on these configurations.

Connection Profiles

Arroyo connectors split their configuration into two parts: the profile configuration for common connection details (like authentication and server settings), and the table configuration for specifics of the data being consumed.

For RabbitMQ Streams, the profile represents the RabbitMQ server configuration, while the table represents a particular stream and its offset. Profiles can be shared across multiple tables and defined via the Web UI or SQL DDL using the connection_profile option.

Examples

RabbitMQ Stream Source

CREATE TABLE messages (
    id BIGINT NOT NULL,
    content TEXT NOT NULL,
    timestamp TIMESTAMP NOT NULL
) WITH (
    type = 'source',
    connector = 'rabbitmq',
    host = 'localhost',
    stream = 'my-stream',
    username = '{{ RABBITMQ_USER }}',
    password = '{{ RABBITMQ_PASSWORD }}',
    format = 'json',
    source.offset = 'last'
);