The Arroyo system architecture is composed of several services, as can be seen in the following diagram:

Arroyo Architecture

These services can be run in a single process for a simple deployment, or separately for a fully scaleable, fault-tolerant system. All components of the system are compiled into a single binary (called arroyo-bin) which provides sub-commands to run specific services or an entire Arroyo cluster.

This document will cover the subsystems that make up an Arroyo cluster.

Web UI

The Web UI is a single-page app written in react that interacts with the system via the REST API. It allows users to configure the system, create pipelines, and monitor their statuses.

Arroyo API

The API server backs the Web UI and handles all configuration operations and pipeline management via the REST API.

API operations, like creating a pipeline or updating its state, operate solely by updating the configuration in the postgres database. This is asynchronously applied to the actual state of the environment by the controller. The API itself is stateless and does not communicate with the rest of the system except via the database.

Arroyo Controller

The controller runs a control loop that continuously reconciles the desired state of the system (as defined in the database) with the actual state of the system, as determined by monitoring the environment. This is similar to a Kubernetes controller, (see here for an overview of this pattern).

There is a single controller instance that runs in the system, and high-availability is not yet supported in the open source release. If the controller instance fails, the workers will eventually stop and wait for the controller to come back online.

The controller manages a state machine for each job, which is documented here and is also responsible for initiating checkpoints on the workers. Communication between the controller and workers is done via the gRPC API defined in rpc.proto.

Schedulers

Arroyo supports multiple schedulers, which are responsible for running the workers that make up the dataflow engine. The scheduler can be set by setting the SCHEDULER environment variable for the controller.

Process Scheduler

The process scheduler is the default, and is most useful for local development. In this mode, the controller will spawn processes locally to run the workers.

Embedded Scheduler

The embedded scheduler runs pipelines within the same process as the controller. This is most useful for development on Arroyo itself and is not recommended for production use.

Node Scheduler

Arroyo comes with a lightweight scheduler and node runtime for running distributed workers. In this mode, you must start some number of arroyo-node processes (for example, in an autoscaling group) and configure them with the controller’s address. The controller will then schedule workers on these nodes.

Kubernetes Scheduler

Arroyo can be deployed to Kubernetes and configured to schedule workers on the cluster. See more details here.

Arroyo Worker

Workers run the actual processing logic, making up the dataplane. Workers run a binary that is produced by linking Rust code generated from the SQL query with the arroyo-worker library. Workers within the same pipeline are connected via TCP connections over which dataflow occurs. Each worker is configured with a number of slots, which controls how many subtasks it may run. By default, each slot runs a parallel slice of the entire DAG, which generally does a good job of balancing the load across workers.

The controller is responsible for scheduling the necessary number of workers to run a pipeline, initiating checkpoints, monitoring their health, and recovering from failures.

Postgres

Arroyo uses Postgres to store configuration (sources, connections, sinks, pipelines, etc) and the status of the system. This drives the API, and is also used by the controller to drive the reconciliation loop.

Prometheus

Arroyo uses Prometheus for metrics collection. This powers the metrics dashboard in the Web UI. It also relies on the Prometheus Pushgateway to collect metrics from workers.

S3

Checkpoints are stored on S3 for recovery.

Job State Machine

Jobs are managed via a state machine that is implemented in the controller:

Job State Machine

Jobs are initialized in the Created state when they are created via the API. Once the controller loads the job, it will begin managing it, and is responsible for moving it to a terminal state (like Running, Stopped, Failed) within a bounded amount of time.