Skip to main content
Arroyo provides the capability to read and write Parquet and JSON files to/from object stores and local filesystems. When used as a sink, Arroyo will produce complete files in line with the checkpointing system. As such, the file system sinks write all data exactly once. This is done against S3 by tracking multi-part uploads within the state store, allowing Arroyo to resume an in-progress upload in the event of a failure. The FileSystem connector supports local filesystem, S3 (including S3-compatible stores like MinIO), GCS, Cloudflare R2, and Azure Blob Storage/ADLS Gen2. As a source Arroyo reads all files to completion, at which point the job will finish.

Common Configuration

Both the source and sink versions of the connector make use of Arroyo’s StorageBackend, which is a generalization of an object store. The location within the StorageBackend is configured via the path variable in the WITH clause of the CREATE TABLE statement. The value is a URL pointing to the destination directory. The most common examples are shown below.
DescriptionExample
Local filefile:///test-data/my-cool-arroyo-pipeline
S3 Paths3://awesome-arroyo-bucket/amazing-arroyo-dir
S3 HTTP Endpointhttps://s3.us-west-2.amazonaws.com/awesome-arroyo-bucket/amazing-arroyo-dir
Local MinIO installations3::http://localhost:9123/local_bucket/sweet-dir
Cloudflare R2r2://my-bucket/path or r2://account-id@my-bucket/path
Azure Blob Storageabfs://[email protected]/path
Azure HTTPShttps://account.blob.core.windows.net/container/path
GCSgs://my-bucket/path

Additional Backend Configuration

The StorageBackend can be passed additional configuration options, which are namespaced with “storage.” at the beginning. This allows you to pass in custom endpoints, credentials, and regions.

S3 and S3-Compatible Storage (including MinIO and R2)

FieldDescriptionExample
storage.aws_regionManually set the AWS regionus-east-1
storage.aws_endpointManually set the AWS endpointhttps://s3-custom-endpoint.com
storage.aws_secret_access_keyManually set the AWS secret access keyyour-secret-key
storage.aws_access_key_idManually set the AWS access key IDyour-access-key-id

Cloudflare R2

R2 can be configured using the r2:// URL scheme. Authentication can be provided via environment variables or storage options: Environment Variables:
  • CLOUDFLARE_ACCOUNT_ID or set as part of the URL
  • R2_ACCESS_KEY_ID or AWS_ACCESS_KEY_ID
  • R2_SECRET_ACCESS_KEY or AWS_SECRET_ACCESS_KEY
Storage Options:
  • storage.r2_access_key_id or storage.aws_access_key_id
  • storage.r2_secret_access_key or storage.aws_secret_access_key
URL Formats:
  • r2://bucket/path - Account ID from environment
  • r2://account-id@bucket/path - Account ID in URL
  • https://account-id.r2.cloudflarestorage.com/bucket/path - Full endpoint URL
  • https://account-id.eu.r2.cloudflarestorage.com/bucket/path - With jurisdiction (e.g., EU)

Azure Blob Storage / ADLS Gen2

Azure storage uses standard Azure authentication via environment variables. The following URL formats are supported: URL Formats:
  • abfs://[email protected]/path - ADLS Gen2 format
  • abfss://[email protected]/path - ADLS Gen2 with SSL
  • https://account.blob.core.windows.net/container/path - Blob storage HTTPS
  • https://account.dfs.core.windows.net/container/path - ADLS Gen2 HTTPS
Authentication: Azure authentication is configured via standard Azure environment variables. Refer to the Azure SDK documentation for details on authentication options.

Format

Both sources and sinks require a format, and support parquet and json.

Sink Specific Configuration

File rollover configs

These configs control when files are rolled over, creating new files. Rolling over more frequently allows lower latency for readers, but at the cost of more files, which can slow down queries.
FieldDescriptionDefaultExample
rolling_policy.file_sizeTarget size in a file before it is rolledNone'128MB'
rolling_policy.intervalAmount of time a file after creation that a file will be rolled, expressed as a SQL intervalNoneinterval '30 minutes'
rolling_policy.inactivity_intervalNumber of seconds a file will be open without any new data before it will be rolledNoneinterval '1 hour'

File Naming Options

By default Arroyo names files using UUIDv7, which ensures that they sort in time-order. This behavior can be configured via the following options:
FieldDescriptionDefault
filename.prefixPrefix that will be appended to the beginning of the file name, followed by a -None
filename.suffixSuffix (extension) that will be appended to the end of the file name, preceded by a .'json' or 'parquet'
filename.strategyFilenaming strategy to use. Supported values: serial, uuid, ulid, uuid_v7'uuid_v7'

Parquet Options

FieldDescriptionDefault
parquet.compressionThe compression codec to use for Parquet files. Supported values: none, snappy, gzip, zstd, lz4, lz4_raw.'zstd'
parquet.row_group_sizeTarget size for each Parquet row group'128MB'

Multipart upload settings

Thes options allow configuring the behavior of how writes against object storage are split into parts:
FieldDescriptionDefault
multipart.target_part_sizeThe target size of each multipart upload. Must be at least 5MB.'32MB'
multipart.max_partsMaximum number of multipart uploads before a multipart will be completed1000

Partitioning Options

Arroyo supports partitioning of outputs. There are two types of partitioning: event time-based and field-based. You can use either or both of these types of partitioning. If both are used, the time-based partitioning is placed prior to the field-based partitioning.

Event Time-based Partitioning

Event time partitioning uses each record’s event_time, formatting it using a strftime-style formatting string. You can set the time_partition_pattern key in the sink to define the pattern. Example: time_partition_pattern = '%Y/%m/%d/%H'

Field-based Partitioning

Field-based formatting produces a string mirroring the Hive-style partition directories, so partitioning on field_1, field_2 will result in folders like field_1=X/field_2=Y. You can set the partition_fields key in the sink to define the partition fields. Example: partition_fields = 'field_1,field_2'

Shuffle by partition

When using field-based partitioning and high parallelism, you may end up with many files; typically each sink subtask will write a file for every partition key. To avoid this, you can configure the dataflow to insert a shuffle step before the sink, which will ensure that all records for a particular partition key end up on the same sink node:
'shuffle_by_partition.enabled' = true
For example, if our partition key is event_type and we have 100 distinct types, at parallelism 32 we’d end up with 3,200 files being written for each flush interval. By enabling shuffle_by_partition, we reduce that 100. Note that this may lead to performance problems if your data is highly skewed across your partition keys; for example, if 90% of your data is in the same partition, those events will all end up on the same sink subtask which may not be able to keep up with the volume.

Source Specific Configuration

When using the file system source, the following options are available
FieldDescriptionDefaultExample
compression_formatThe compression format of the files to read. Supported values: none, zstd, gzip. Only used for JSON inputnonegzip
source.regex-patternA regex pattern to match files to read. If specified all files within the path will be evaluated against pattern. If not specified only files directly under the path will be read.None.*\.json

File System Sink DDL

Here’s an example for how to create a table to write parquet to S3 with partitioning:
CREATE TABLE bids (
  auction bigint,
  bidder bigint,
  price bigint,
  datetime timestamp,
  region text,
  account_id text
) WITH (
  connector = 'filesystem',
  type = 'sink',
  path = 'https://s3.us-west-2.amazonaws.com/demo/s3-uri',
  format = 'parquet',
  parquet_compression = 'zstd',
  rollover_seconds = 60,
  time_partition_pattern = '%Y/%m/%d/%H',
  partition_fields = 'region,account_id'
);

File System Source DDL

Here’s an example for how to create a table to read parquet from S3:
CREATE TABLE bids (
  auction bigint,
  bidder bigint,
  price bigint,
  datetime timestamp,
  region text,
  account_id text
) WITH (
  connector = 'filesystem',
  type = 'source',
  path = 'https://s3.us-west-2.amazonaws.com/demo/s3-uri',
  format = 'parquet',
  'source.regex-pattern' = '.*\.parquet$'
);