Arroyo interacts with other data systems via connectors, which can implement sources and sinks for reading and writing data respectively. The list of connectors is constantly expanding. If you’d like to connect Arroyo with a system that’s not currently supported, please get in touch with the team on Discord or via GitHub issues.
Arroyo SQL supports a special kind of table called a connection table which is used to interact with external systems as sources or sinks.
Connection tables come in two forms. Saved connections are created in the Web UI or via the API and can be easily reused across queries. Existing saved connections can be viewed via the Connections tab of the Web UI, and their schemas can be seen in the catalogue view in the SQL editor.
Connection tables can also be created via
CREATE TABLE statements in SQL, as described
in the DDL docs.
A particular connection table is either a source (meaning it reads data from an external system) or a sink (meaning it writes data). Some connectors support only one of these, while others support both.
See the individual connector docs for details on on their capabilities and how to configure them in Arroyo.
For some connectors, Arroyo supports connection profiles,
which allow you to define common connection details once and use it across multiple connections.
Currently the Kafka,
Redis, and Kinesis connectors support connection profiles.
Connection profiles are created as part of defining a saved connection in the Web UI,
where they can be reused.
In SQL, you can directly specify a connection profile via the
connection_profile option in the
Alternately, SQL users can manually specify the various options via the WITH clause.
Arroyo supports a number of different data formats for reading and writing data, including
JSON, Avro, Parquet, and raw strings. The format is configured via the
when creating a SQL table.
See the format docs for more details on how to configure each format.
The framing configuration for a source determines how messages read from the source are split into records for processing. This is particularly useful for sources—like HTTP endpoints—that do not have their own framing protocol.
By default, the framing is
none, which means that the source will emit a
single record for each message it reads.
Framing is configured via the
framing option when creating a SQL table. Currently
newline is supported, which splits the input on newlines.
You may also set
framing.newline.max_length to a number of bytes to limit the
maximum length of a single record. Records that exceed this length will be truncated.
CREATE TABLE my_source ( value TEXT ) WITH ( ... framing = 'newline', 'framing.newline.max_length' = '10000' )
Connections in Arroyo must have associated schemas to allow them to be used in SQL queries. Schemas describe how to interpret the data, mapping it into a table composed of Arroyo SQL types.
Schemas can be defined when creating the source in the Web UI or API, which allows them to be reused across queries. Arroyo supports several methods of schema definition, some of which are also associated with a particular data format:
Json and Avro Schemas for Kafka topics can also be read automatically from Confluent Schema Registry.
Paritioned sources (like Kafka or Kinesis) may experience periods when some partitions are active but others are idle due to the way that they are keyed. This can cause issues in Arroyo due to how watermarks are calculated: as the minimum of the watermarks of all partitions.
If some partitions are idle, the watermark will not advance, and queries that depend on it will not make progress. To address this, sources support a concept of idleness, which allows them to mark partitions as idle after a period of inactivity. Idle partitions, meanwhile, are ignored for the purpose of calculating watermarks and so allow queries to advance.
Idleness is enabled by default for all sources with a period of 5 minutes. It can be
configured when creating a source in SQL by setting the
idle_micros options, or disabled
by setting it to
A special case of idleness occurs when there are more Arroyo source tasks than partitions (for example, a Kafka topic with 4 partitions read by 8 Arroyo tasks). This means that some tasks will never receive data, and so will never advance their watermarks. This can occur as well for non-partitioned sources like WebSocket, where only a single task is able to read data.
To address this, source tasks with no work assigned to them will mark themselves as idle immediately.