RabbitMQ
Consume from RabbitMQ streams
RabbitMQ is a message broker that supports a wide variety of event-processing patterns and protocols. The Arroyo RabbitMQ connector supports reading from its native its native Streams protocol, which has capabilities around replay and persistence to support at-least-once semantics for processing.
(To use RabbitMQ’s MQTT support, see the MQTT connector.)
Configuring the source
RabbitMQ Stream connections can be created via the Web UI or using SQL DDL.
A RabbitMQ Stream source requires the following fields:
Field | Description | Required | Example |
---|---|---|---|
host | RabbitMQ host address | Yes | localhost |
port | RabbitMQ port | No | 5552 |
username | RabbitMQ username | No | guest |
password | RabbitMQ password | No | guest |
virtual_host | RabbitMQ virtual host | No | / |
tls_config.enabled | Enable TLS for the connection | No | true |
tls_config.trust_certificates | Trust all certificates | No | true |
tls_config.root_certificates_path | Path to root certificates | No | /path/to/certs |
tls_config.client_certificates_path | Path to client certificates | No | /path/to/client/certs |
tls_config.client_keys_path | Path to client keys | No | /path/to/keys |
load_balancer_mode | Enable load balancer mode for the connection | No | true |
stream | The RabbitMQ stream to read from | Yes | my-stream |
type | Must be ‘source’ | Yes | source |
source.offset | Starting offset for the source. One of ‘first’, ‘next’, or ‘last’ | No | last |
See the RabbitMQ Streams documentation for more details on these configurations.
Connection Profiles
Arroyo connectors split their configuration into two parts: the profile configuration for common connection details (like authentication and server settings), and the table configuration for specifics of the data being consumed.
For RabbitMQ Streams, the profile represents the RabbitMQ server configuration,
while the table represents a particular stream and its offset. Profiles can be
shared across multiple tables and defined via the Web UI or SQL DDL using the
connection_profile
option.