MQTT is a lightweight queueing protocol commonly used in the IoT industry. The Arroyo MQTT connector can use MQTT brokers as sources and sinks using configurable reliability.

Configuring the Connection

MQTT connections can be created with the Web UI or SQL DDL.

An MQTT connection has several required and optional fields:

FieldDescriptionRequiredExample
urlThe URL of the MQTT brokerYestcp://localhost:1883
usernameUsername for the MQTT brokerFor password authbob
passwordPassword for the MQTT brokerfor password authhunter2
tls.caPath to the CA fileFor TLS auth/mount/certs/ca.pem
tls.certPath to the client cert file authFor TLS auth/mount/certs/client.pem
tls.keyPath to the client key fileFor TLS auth/mount/certs/client.key
topicThe MQTT topic to read or write fromYesevents
qosOne of ‘AtMostOnce’, ‘AtleastOnce’, or ‘ExactlyOnce’NoAtMostOnce
typeEither ‘source’ or ‘sink’Yessource
sink.retainWhether to retain messages published to this topicNofalse

Connection Profiles

Arroyo connectors can split their configuration into two parts: the profile config, which covers the common details of how to connect and authenticate against a cluster, and the table config which is specific to a particular instance of the connector.

For MQTT, the profile represents a particular MQTT broker or cluster, while the table represents a particular topic.

MQTT profiles can be defined via the Web UI and shared across multiple tables. They can be used in SQL DDL by setting the connection_profile option in the WITH clause.

See the connection profile docs for more details.

Source metadata

The MQTT connector supports accessing the following metadata fields:

  • topic - the MQTT topic the message was consumed from

See the source metadata docs for more on how to use metadata fields.

Examples

MQTT Source

create table mqtt_source (
    id TEXT,
    lat FLOAT,
    lng FLOAT,
    type INT,
    topic TEXT GENERATED ALWAYS AS (metadata('topic')) STORED
) with (
    connector = 'mqtt',
    url = 'tcp://localhost:1833',
    type = 'source',
    format = 'json',
    topic = 'events'
);

MQTT Sink

create table mqtt_sink with (
    connector = 'mqtt',
    url = 'tcp://localhost:1833',
    type = 'sink',
    format = 'json',
    topic = 'results',
    'sink.retain' = 'true'
);