NATS is a messaging and queueing system designed for simplicity and performance. Arroyo includes sources and sinks for Core NATS and NATS Jetstream, which layers on persistence and delivery guarantees.

Configuring the Connection

NATS connections can be created with the Web UI or SQL DDL.

An NATS connection has several required and optional fields:

FieldDescriptionRequiredExample
serversComma-separated list of NATS serversYesnats-1:4222,nats-2:4222
auth.typeOne of ‘none’, or ‘credentials’Nocredentials
auth.usernameUsername for authFor auth.type = ‘credentials’bob
auth.passwordPassword for authFor auth.type = ‘credentials’hunter2
typeEither ‘source’ or ‘sink’Yessource
streamThe NATS stream to read from, for jetstreamOne of stream or subjectevents-arroyo
subjectThe Core NATS subject to read or write fromOne of stream or subjectevents
consumer.ack_policyOne of ‘Explicit’, ‘None’, or ‘All’NoExplicit
consumer.replay_policyOne of ‘Original’ or ‘Instant’NoInstant
consumer.ack_waitDuration in seconds that the server will wait for an ack for any individual message once it has been delivered to a consumerNo300
consumer.descriptionDescription of the consumerNoarroyo-consumer
consumer.filter_subjectsComma-separated list of subjects the consumer should filter onNoevents1,events2
consumer.num_replicasNumber of replicas for the consumer’s stateNo3
consumer.inactive_thresholdDuration in seconds before inactive consumers will be cleaned upNo600
consumer.rate_limitMaximum number of messages per second that will be delivered to consumer (-1 for no limit)No1000
consumer.max_ack_pendingMaximum number of messages without an acknowledgement that may be in flight before sending is paused (-1 for no limit)No1000
consumer.max_deliverMaximum number of times a specific message delivery will be attempted (-1 for no limit)No20
consumer.max_waitingMaximum number of messages that can be waiting to be delivered (-1 for no limit)No10000
consumer.max_batchMaximum number of messages that may be delivered in a single batchNo1000
consumer.max_bytesMaximum number of bytes that will be delivered in a single batchNo104857600
consumer.max_expiresMaximum number of messages that can be delivered to the consumer before they are considered expiredNo30000

See the NATS documentation for more explanation of these configurations.

Connection Profiles

Arroyo connectors can split their configuration into two parts: the profile config, which covers the common details of how to connect and authenticate against a cluster, and the table config which is specific to a particular instance of the connector.

For NATS, the profile represents a particular NATS server or cluster, while the table represents a particular subject or stream.

NATS profiles can be defined via the Web UI and shared across multiple tables. They can be used in SQL DDL by setting the connection_profile option in the WITH clause.

See the connection profile docs for more details.

Examples

NATS Source

CREATE TABLE logs (
    id BIGINT NOT NULL,
    time TIMESTAMP NOT NULL,
    host TEXT NOT NULL,
    level TEXT NOT NULL,
    message TEXT NOT NULL
) with (
    type = 'source',
    connector = 'nats',
    servers = 'localhost:4222',
    subject = 'logs',
    'auth.type' = 'credentials',
    'auth.username' = '{{ NATS_USER }}',
    'auth.password' = '{{ NATS_PASSWORD }}',
    format = 'json'
);

NATS Sink

CREATE TABLE results (
    count BIGINT NOT NULL,
    time TIMESTAMP NOT NULL
) with (
    type = 'sink',
    connector = 'nats',
    servers = 'localhost:4222',
    subject = 'results',
    format = 'json'
);