Skip to content

MQTT

MQTT is a lightweight queueing protocol commonly used in the IoT industry. The Arroyo MQTT connector can use MQTT brokers as sources and sinks using configurable reliability.

MQTT connections can be created with the Web UI or SQL DDL.

Creating an MQTT connection

An MQTT connection has several required and optional fields:

FieldDescriptionRequiredExample
urlThe URL of the MQTT brokerYestcp://localhost:1883
usernameUsername for the MQTT brokerFor password authbob
passwordPassword for the MQTT brokerfor password authhunter2
tls.caPath to the CA fileFor TLS auth/mount/certs/ca.pem
tls.certPath to the client cert file authFor TLS auth/mount/certs/client.pem
tls.keyPath to the client key fileFor TLS auth/mount/certs/client.key
topicThe MQTT topic to read or write fromYesevents
qosOne of ‘AtMostOnce’, ‘AtleastOnce’, or ‘ExactlyOnce’NoAtMostOnce
typeEither ‘source’ or ‘sink’Yessource
sink.retainWhether to retain messages published to this topicNofalse
max_packet_sizeMaximum MQTT packet size in bytesNo10240

Arroyo connectors can split their configuration into two parts: the profile config, which covers the common details of how to connect and authenticate against a cluster, and the table config which is specific to a particular instance of the connector.

For MQTT, the profile represents a particular MQTT broker or cluster, while the table represents a particular topic.

MQTT profiles can be defined via the Web UI and shared across multiple tables. They can be used in SQL DDL by setting the connection_profile option in the WITH clause.

See the connection profile docs for more details.

The MQTT connector supports accessing the following metadata fields:

  • topic TEXT - the MQTT topic the message was consumed from

See the source metadata docs for more on how to use metadata fields.

create table mqtt_source (
id TEXT,
lat FLOAT,
lng FLOAT,
type INT,
topic TEXT METADATA FROM 'topic'
) with (
connector = 'mqtt',
url = 'tcp://localhost:1833',
type = 'source',
format = 'json',
topic = 'events'
);
create table mqtt_sink with (
connector = 'mqtt',
url = 'tcp://localhost:1833',
type = 'sink',
format = 'json',
topic = 'results',
'sink.retain' = true
);