The core of Arroyo is a distributed dataflow engine.
The dataflow can be described as a directed acylic graph (DAG). Each node performs some (possibly stateful) computation, while data flows between them along the edges. Because this is a distributed dataflow, each node in the graph can be horizontally subdivided into multiple parallel subtasks. Each subtask is responsible for a subset of the key space.
There are two types of edges between nodes:
- forward edges pass data to a single downstream subtask
- shuffle edges pass data to all downstream subtasks
For example, we can look at a logical dataflow for a basic job that looks like this:
Because the join is keyed, we need to shuffle the data from the source tasks so that events with the same key are processed by the same join subtask. Meanwhile, the sink doesn’t require any particular key structure, so we can use a forward edge.
This logical dataflow may be executed as a physical dataflow that looks like this:
source  and
source  represents two subtasks of the source task (and similarly for the join).
Each subtask may be scheduled independently. If two communicating subtasks are scheduled on the same worker, the dataflow is performed via in-memory queues; otherwise the dataflow is perofrmed via Arroyo’s network stack which forms a logical NxN connection between every communicating subtask.
Arroyo pipelines are written using analytical SQL with a few extensions to support streaming semantics. See our What is streaming SQL? blog post for a deep dive into the semantics of streaming SQL, and the SQL docs for a detailed description of the language.
Arroyo’s SQL support is built on top of the Arrow DataFusion project, which is responsible for parsing and planning SQL before we compile it into a streaming dataflow for execution.
Arroyo supports time-oriented processing, inspired by the model described in the Google Dataflow paper. A brief overview of the model will be covered here, but a more in depth explanation can be found in this article.
Each event in the dataflow is assigned a timestamp; for events read from Kafka this is the Kafka message timestamp, and in the future we will allow parsing the timestamp from a field in the data.
All time-oriented operations in the system are performed using this event time, rather than the time at which the data is processed (called processing time). This allows the system to perform operations such as windowing consistently no matter when or what order events arrive. In other words, for the same sequence of input events you should always get the same output.
Stream processing operates over unbounded streams of data. This means that the data is never complete. And in real world systems, events may be arrive in any order and may be arbitrary delayed. But we still need to be able to compute reuslts, and can’t wait indefinitely to be sure that all of the data has arrived.
To solve this problem, Arroyo uses watermarks to track the progress of the dataflow. A watermark is an estimate of completeness of the data stream; it represents a claim that no more events with a timestamp earlier than the watermark will arrive in the future.
Watermarks are produced by the source based on its understanding of the characteristics of the data. Currently this is a fixed offset from the last event timestamp, but in the future we will support more sophisticated watermking approaches that provide a statistical estimate of completeness. Once produced by the source, watermarks are propagated through the dataflow, and are used to determine when to perform operations such as closing windows.
Late events (which arrive after the watermark has passed) are dropped.
Arroyo is a stateful dataflow system. Each node in the dataflow graph can maintain state, which is required to perform operations like joins and windowing. In the open-source release of Arroyo, state is stored in memory on the worker nodes, which means that it is limited to the size of the worker’s memory. Arroyo Cloud runs on a distributed state backend that separates storage and compute and allows for arbitrarily large state.
Arroyo regularly checkpoints state to ensure that it can recover from failures, and to allow for operations like scaling and code updates. Checkpoints are consistent snapshots of the state of the dataflow at a particular point in time. In other words, for every operator, an event E is either in or not in the checkpoint. As the latest Kafka offset is also stored in the checkpoint, that means that after we restore from the checkpoint, we can continue processing exactly where we left off and get identical results as if we had never failed.
Checkpoints are written to S3 so that we can recover from machine failures, and are stored in Parquet format.