Pipeline clusters provide a lightweight way to run a single pipeline in a self-contained minicluster. They are a great choice for development, CI, and serverless platforms like AWS Fargate, Google CloudRun, and Fly.io.

A pipeline cluster is created with the arroyo run subcommand:

$ arroyo run --help
Run a query as a local pipeline cluster

Usage: arroyo run [OPTIONS] [QUERY]

Arguments:
  [QUERY]  The query to run [default: -]

Options:
  -n, --name <NAME>                Name for this pipeline
  -s, --state-dir <STATE_DIR>      Directory or URL where checkpoints and metadata
                                   will be written and restored from
  -p, --parallelism <PARALLELISM>  Number of parallel subtasks to run [default: 1]
  -f, --force                      Force the pipeline to start even if the state
                                   file does not match the query
  -h, --help                       Print help

Running a query

A pipeline cluster runs a single SQL query, however that query may contain multiple independent INSERT INTO...SELECT statements.

The query can be specified in one of three ways:

  1. Via STDIN:
$ echo "SELECT * FROM ..." | arroyo run
  1. As a CLI argument:
$ arroyo run query.sql
  1. As an environment variable:
$ ARROYO__RUN__QUERY="SELECT * FROM ..." arroyo run

State

Arroyo is a stateful system, capable of consistently saving and restoring in-progress computations. For Pipeline clusters, that state can be stored on a local filesystem or on an object store. To learn more about the supported object stores and how to configure them, see the storage docs. For production deployments, it is recommended to always use an object store.

The storage location is specified on the command line with the --state-dir argument, or via an environment variable with ARROYO__RUN__STATE_DIR.

For example

$ run --state-dir s3://my-bucket/pipelines/my-pipeline query.sql

If this is the first time the pipeline has been run with this state directory, it will start with a fresh state. On subsequent runs the existing state will be restored.

This directory is expected to be exclusive to a single pipeline; i.e., for every new pipeline you want to run you should specify a different directory or object-store key. Doing otherwise will produce an error, which can be overridden with the --force flag.