Common Configuration
Both the source and sink versions of the connector make use of Arroyo’s StorageBackend, which is a generalization of an object store. The location within the StorageBackend is configured via thepath variable in the WITH clause of the CREATE TABLE statement.
The value is a URL pointing to the destination directory. The most common examples are shown below.
| Description | Example |
|---|---|
| Local file | file:///test-data/my-cool-arroyo-pipeline |
| S3 Path | s3://awesome-arroyo-bucket/amazing-arroyo-dir |
| S3 HTTP Endpoint | https://s3.us-west-2.amazonaws.com/awesome-arroyo-bucket/amazing-arroyo-dir |
| Local MinIO installation | s3::http://localhost:9123/local_bucket/sweet-dir |
| Cloudflare R2 | r2://my-bucket/path or r2://account-id@my-bucket/path |
| Azure Blob Storage | abfs://[email protected]/path |
| Azure HTTPS | https://account.blob.core.windows.net/container/path |
| GCS | gs://my-bucket/path |
Additional Backend Configuration
The StorageBackend can be passed additional configuration options, which are namespaced with “storage.” at the beginning. This allows you to pass in custom endpoints, credentials, and regions.S3 and S3-Compatible Storage (including MinIO and R2)
| Field | Description | Example |
|---|---|---|
storage.aws_region | Manually set the AWS region | us-east-1 |
storage.aws_endpoint | Manually set the AWS endpoint | https://s3-custom-endpoint.com |
storage.aws_secret_access_key | Manually set the AWS secret access key | your-secret-key |
storage.aws_access_key_id | Manually set the AWS access key ID | your-access-key-id |
Cloudflare R2
R2 can be configured using ther2:// URL scheme. Authentication can be provided via environment variables or storage options:
Environment Variables:
CLOUDFLARE_ACCOUNT_IDor set as part of the URLR2_ACCESS_KEY_IDorAWS_ACCESS_KEY_IDR2_SECRET_ACCESS_KEYorAWS_SECRET_ACCESS_KEY
storage.r2_access_key_idorstorage.aws_access_key_idstorage.r2_secret_access_keyorstorage.aws_secret_access_key
r2://bucket/path- Account ID from environmentr2://account-id@bucket/path- Account ID in URLhttps://account-id.r2.cloudflarestorage.com/bucket/path- Full endpoint URLhttps://account-id.eu.r2.cloudflarestorage.com/bucket/path- With jurisdiction (e.g., EU)
Azure Blob Storage / ADLS Gen2
Azure storage uses standard Azure authentication via environment variables. The following URL formats are supported: URL Formats:abfs://[email protected]/path- ADLS Gen2 formatabfss://[email protected]/path- ADLS Gen2 with SSLhttps://account.blob.core.windows.net/container/path- Blob storage HTTPShttps://account.dfs.core.windows.net/container/path- ADLS Gen2 HTTPS
Format
Both sources and sinks require a format, and supportparquet and json.
Sink Specific Configuration
File rollover configs
These configs control when files are rolled over, creating new files. Rolling over more frequently allows lower latency for readers, but at the cost of more files, which can slow down queries.| Field | Description | Default | Example |
|---|---|---|---|
rolling_policy.file_size | Target size in a file before it is rolled | None | '128MB' |
rolling_policy.interval | Amount of time a file after creation that a file will be rolled, expressed as a SQL interval | None | interval '30 minutes' |
rolling_policy.inactivity_interval | Number of seconds a file will be open without any new data before it will be rolled | None | interval '1 hour' |
File Naming Options
By default Arroyo names files using UUIDv7, which ensures that they sort in time-order. This behavior can be configured via the following options:| Field | Description | Default |
|---|---|---|
filename.prefix | Prefix that will be appended to the beginning of the file name, followed by a - | None |
filename.suffix | Suffix (extension) that will be appended to the end of the file name, preceded by a . | 'json' or 'parquet' |
filename.strategy | Filenaming strategy to use. Supported values: serial, uuid, ulid, uuid_v7 | 'uuid_v7' |
Parquet Options
| Field | Description | Default |
|---|---|---|
parquet.compression | The compression codec to use for Parquet files. Supported values: none, snappy, gzip, zstd, lz4, lz4_raw. | 'zstd' |
parquet.row_group_size | Target size for each Parquet row group | '128MB' |
Multipart upload settings
Thes options allow configuring the behavior of how writes against object storage are split into parts:| Field | Description | Default |
|---|---|---|
multipart.target_part_size | The target size of each multipart upload. Must be at least 5MB. | '32MB' |
multipart.max_parts | Maximum number of multipart uploads before a multipart will be completed | 1000 |
Partitioning Options
Arroyo supports partitioning of outputs. There are two types of partitioning: event time-based and field-based. You can use either or both of these types of partitioning. If both are used, the time-based partitioning is placed prior to the field-based partitioning.Event Time-based Partitioning
Event time partitioning uses each record’s event_time, formatting it using a strftime-style formatting string. You can set thetime_partition_pattern key in the sink to define the pattern.
Example:
time_partition_pattern = '%Y/%m/%d/%H'
Field-based Partitioning
Field-based formatting produces a string mirroring the Hive-style partition directories, so partitioning on field_1, field_2 will result in folders likefield_1=X/field_2=Y.
You can set the partition_fields key in the sink to define the partition fields.
Example:
partition_fields = 'field_1,field_2'
Shuffle by partition
When using field-based partitioning and high parallelism, you may end up with many files; typically each sink subtask will write a file for every partition key. To avoid this, you can configure the dataflow to insert a shuffle step before the sink, which will ensure that all records for a particular partition key end up on the same sink node:event_type and we have 100 distinct
types, at parallelism 32 we’d end up with 3,200 files being written for each
flush interval. By enabling shuffle_by_partition, we reduce that 100.
Note that this may lead to performance problems if your data is highly skewed
across your partition keys; for example, if 90% of your data is in the same
partition, those events will all end up on the same sink subtask which may
not be able to keep up with the volume.
Source Specific Configuration
When using the file system source, the following options are available| Field | Description | Default | Example |
|---|---|---|---|
| compression_format | The compression format of the files to read. Supported values: none, zstd, gzip. Only used for JSON input | none | gzip |
| source.regex-pattern | A regex pattern to match files to read. If specified all files within the path will be evaluated against pattern. If not specified only files directly under the path will be read. | None | .*\.json |