Skip to main contentThe core of Arroyo is a distributed dataflow engine.
Distributed dataflow
The dataflow can be described as a directed acylic graph (DAG). Each node performs some (possibly stateful) computation,
while data flows between them along the edges. Because this is a distributed dataflow, each node in the graph can be
horizontally subdivided into multiple parallel subtasks. Each subtask is responsible for a subset of the key space.
There are two types of edges between nodes:
- forward edges pass data to a single downstream subtask
- shuffle edges pass data to all downstream subtasks
For example, we can look at a logical dataflow for a basic job that looks like this:
Because the join is keyed, we need to shuffle the data from the source tasks so that events with the same key are
processed by the same join subtask. Meanwhile, the sink doesn’t require any particular key structure, so we can use a
forward edge.
This logical dataflow may be executed as a physical dataflow that looks like this:
where source [0]
and source [1]
represents two subtasks of the source task (and similarly for the join).
Each subtask may be scheduled independently. If two communicating subtasks are scheduled on the same worker, the
dataflow is performed via in-memory queues; otherwise the dataflow is performed via Arroyo’s network stack which forms
a logical MxN connection between every communicating subtask.
Streaming SQL
Arroyo pipelines are written using analytical SQL with a few extensions to support streaming semantics. See
our What is streaming SQL? blog post for a deep dive into
the semantics of streaming SQL, and the SQL docs for a detailed description of the language.
Arroyo’s SQL support is built on top of the Arrow DataFusion project,
which is responsible for parsing and planning SQL before we compile it into a streaming dataflow for execution.
Time-oriented processing
Arroyo supports time-oriented processing, inspired by the model described in the
Google Dataflow paper. A brief
overview of the model will be covered here, but a more in depth explanation can
be found in
this article.
Event time
Each event in the dataflow is assigned a timestamp. For sources that have a built-in notion of time (like
the record timestamp in Kafka) we default to that; for other sources we use the current wall-clock time
at ingestion. However it’s often more useful to set the event time from a field in the data, which represents
some logical time, which can be done via a SQL expression in the CREATE TABLE statement for the source.
All time-oriented operations in the system are performed using this event time, rather than the time at which the data
is processed (called processing time). This allows the system to perform operations such as windowing consistently
no matter when or what order events arrive. In other words, for the same sequence of input events you should always get
the same output.
Watermarks
Stream processing operates over unbounded streams of data. This means that the data is never complete. And in real world
systems, events may arrive in any order and may be arbitrary delayed. But we still need to be able to compute results,
and can’t wait indefinitely to be sure that all of the data has arrived.
To solve this problem, Arroyo uses watermarks to track the progress of the dataflow. A watermark is an estimate of
completeness of the data stream; it represents a claim that no more events with a timestamp earlier than the watermark
will arrive in the future.
Watermarks are produced by the source based on its understanding of the characteristics of the data. Currently this is
a fixed offset from the last event timestamp, but in the future we will support more sophisticated watermarking approaches
that provide a statistical estimate of completeness. Once produced by the source, watermarks are propagated through the
dataflow, and are used to determine when to perform operations such as closing windows.
Late events (which arrive after the watermark has passed) are dropped.
Stateful processing
Arroyo is a stateful dataflow system. Each node in the dataflow graph can maintain state, which is required to perform
operations like joins and windowing. In the open-source release of Arroyo, state is stored in memory on the worker nodes,
which means that it is limited to the size of the worker’s memory.
Arroyo regularly checkpoints state to ensure that it can recover from failures, and to allow for operations like scaling
and code updates. Checkpoints are consistent snapshots of the state of the dataflow at a particular point in time. In
other words, for every operator, an event E is either in or not in the checkpoint. As the latest Kafka offset is also
stored in the checkpoint, that means that after we restore from the checkpoint, we can continue processing exactly where
we left off and get identical results as if we had never failed.
The algorithm for consistent checkpointing is based on the
asynchronous barrier snapshot algorithm, which itself is an extension of the
classic Chandy-Lamport algorithm.
Checkpoints are written to remote storage like S3 so that we can recover from
machine failures, and are stored in Parquet format.
For more detail on how all of this works, see our
what is stateful stream processing?
explainer.