Pipeline clusters
Pipeline clusters are a lightweight mode for running individual jobs
Pipeline clusters provide a lightweight way to run a single pipeline in a self-contained minicluster. They are a great choice for development, CI, and serverless platforms like AWS Fargate, Google CloudRun, and Fly.io.
A pipeline cluster is created with the arroyo run
subcommand:
$ arroyo run --help
Run a query as a local pipeline cluster
Usage: arroyo run [OPTIONS] [QUERY]
Arguments:
[QUERY] The query to run [default: -]
Options:
-n, --name <NAME> Name for this pipeline
-s, --state-dir <STATE_DIR> Directory or URL where checkpoints and metadata
will be written and restored from
-p, --parallelism <PARALLELISM> Number of parallel subtasks to run [default: 1]
-f, --force Force the pipeline to start even if the state
file does not match the query
-h, --help Print help
Running a query
A pipeline cluster runs a single SQL query, however that query may contain
multiple independent INSERT INTO...SELECT
statements.
The query can be specified in one of three ways:
- Via STDIN:
$ echo "SELECT * FROM ..." | arroyo run
- As a CLI argument:
$ arroyo run query.sql
- As an environment variable:
$ ARROYO__RUN__QUERY="SELECT * FROM ..." arroyo run
State
Arroyo is a stateful system, capable of consistently saving and restoring in-progress computations. For Pipeline clusters, that state can be stored on a local filesystem or on an object store. To learn more about the supported object stores and how to configure them, see the storage docs. For production deployments, it is recommended to always use an object store.
The storage location is specified on the command line with the --state-dir
argument,
or via an environment variable with ARROYO__RUN__STATE_DIR
.
For example
$ run --state-dir s3://my-bucket/pipelines/my-pipeline query.sql
If this is the first time the pipeline has been run with this state directory, it will start with a fresh state. On subsequent runs the existing state will be restored.
This directory is expected to be exclusive to a single pipeline; i.e., for every new
pipeline you want to run you should specify a different directory or object-store key.
Doing otherwise will produce an error, which can be overridden with the --force
flag.