The core of Arroyo is a distributed dataflow engine.

Distributed dataflow

The dataflow can be described as a directed acylic graph (DAG). Each node performs some (possibly stateful) computation, while data flows between them along the edges. Because this is a distributed dataflow, each node in the graph can be horizontally subdivided into multiple parallel subtasks. Each subtask is responsible for a subset of the key space.

There are two types of edges between nodes:

  • forward edges pass data to a single downstream subtask
  • shuffle edges pass data to all downstream subtasks

For example, we can look at a logical dataflow for a basic job that looks like this:

Because the join is keyed, we need to shuffle the data from the source tasks so that events with the same key are processed by the same join subtask. Meanwhile, the sink doesn’t require any particular key structure, so we can use a forward edge.

This logical dataflow may be executed as a physical dataflow that looks like this:

where source [0] and source [1] represents two subtasks of the source task (and similarly for the join).

Each subtask may be scheduled independently. If two communicating subtasks are scheduled on the same worker, the dataflow is performed via in-memory queues; otherwise the dataflow is performed via Arroyo’s network stack which forms a logical MxN connection between every communicating subtask.

Streaming SQL

Arroyo pipelines are written using analytical SQL with a few extensions to support streaming semantics. See our What is streaming SQL? blog post for a deep dive into the semantics of streaming SQL, and the SQL docs for a detailed description of the language.

Arroyo’s SQL support is built on top of the Arrow DataFusion project, which is responsible for parsing and planning SQL before we compile it into a streaming dataflow for execution.

Time-oriented processing

Arroyo supports time-oriented processing, inspired by the model described in the Google Dataflow paper. A brief overview of the model will be covered here, but a more in depth explanation can be found in this article.

Event time

Each event in the dataflow is assigned a timestamp. For sources that have a built-in notion of time (like the record timestamp in Kafka) we default to that; for other sources we use the current wall-clock time at ingestion. However it’s often more useful to set the event time from a field in the data, which represents some logical time, which can be done via a SQL expression in the CREATE TABLE statement for the source.

All time-oriented operations in the system are performed using this event time, rather than the time at which the data is processed (called processing time). This allows the system to perform operations such as windowing consistently no matter when or what order events arrive. In other words, for the same sequence of input events you should always get the same output.

Watermarks

Stream processing operates over unbounded streams of data. This means that the data is never complete. And in real world systems, events may be arrive in any order and may be arbitrary delayed. But we still need to be able to compute results, and can’t wait indefinitely to be sure that all of the data has arrived.

To solve this problem, Arroyo uses watermarks to track the progress of the dataflow. A watermark is an estimate of completeness of the data stream; it represents a claim that no more events with a timestamp earlier than the watermark will arrive in the future.

Watermarks are produced by the source based on its understanding of the characteristics of the data. Currently this is a fixed offset from the last event timestamp, but in the future we will support more sophisticated watermarking approaches that provide a statistical estimate of completeness. Once produced by the source, watermarks are propagated through the dataflow, and are used to determine when to perform operations such as closing windows.

Late events (which arrive after the watermark has passed) are dropped.

Stateful processing

Arroyo is a stateful dataflow system. Each node in the dataflow graph can maintain state, which is required to perform operations like joins and windowing. In the open-source release of Arroyo, state is stored in memory on the worker nodes, which means that it is limited to the size of the worker’s memory.

Arroyo regularly checkpoints state to ensure that it can recover from failures, and to allow for operations like scaling and code updates. Checkpoints are consistent snapshots of the state of the dataflow at a particular point in time. In other words, for every operator, an event E is either in or not in the checkpoint. As the latest Kafka offset is also stored in the checkpoint, that means that after we restore from the checkpoint, we can continue processing exactly where we left off and get identical results as if we had never failed.

The algorithm for consistent checkpointing is based on the asynchronous barrier snapshot algorithm, which itself is an extension of the classic Chandy-Lamport algorithm.

Checkpoints are written to remote storage like S3 so that we can recover from machine failures, and are stored in Parquet format.

For more detail on how all of this works, see our what is stateful stream processing? explainer.