Arroyo provides the capability to read and write Parquet and JSON files to/from object stores and local filesystems. When used as a sink, Arroyo will produce complete files in line with the checkpointing system. As such, the file system sinks write all data exactly once. This is done against S3 by tracking multi-part uploads within the state store, allowing Arroyo to resume an in-progress upload in the event of a failure.

The FileSystem connector supports local filesystem, S3 (including S3-compatible stores like MinIO), and GCS.

As a source Arroyo reads all files to completion, at which point the job will finish.

Common Configuration

Both the source and sink versions of the connector make use of Arroyo’s StorageBackend, which is a generalization of an object store. The location within the StorageBackend is configured via the path variable in the WITH clause of the CREATE TABLE statement. The value is a URL pointing to the destination directory. The most common examples are shown below.

DescriptionExample
Local filefile:///test-data/my-cool-arroyo-pipeline
S3 Paths3://awesome-arroyo-bucket/amazing-arroyo-dir
S3 HTTP Endpointhttps://s3.us-west-2.amazonaws.com/awesome-arroyo-bucket/amazing-arroyo-dir
Local MinIO installations3://http://localhost:9123/local_bucket/sweet-dir

Additional Backend Configuration

The StorageBackend can be passed additional configuration options. Currently this is only supported for S3-API based backends (including MinIO), and are namespaced with “storage.” at the beginning. This allows you to pass in custom endpoints, credentials, and regions. The most common options are listed below.

FieldDescriptionExample
storage.aws_regionManually set the AWS regionus-east-1
storage.aws_endpointManually set the AWS endpointhttps://s3-custom-endpoint.com
storage.aws_secret_access_keyManually set the AWS secret access keyyour-secret-key
storage.aws_access_key_idManually set the AWS access key IDyour-access-key-id

Format

Both sources and sinks require a format, and support parquet and json.

Sink Specific Configuration

Output File Options

FieldDescriptionDefaultExample
target_file_sizeTarget number of bytes in a file before it is closed and a new file is openedNone100000000
target_part_sizeThe target size in bytes of each part of a multipart upload. Must be at least 5MB.524288010000000
max_partsMaximum number of multipart uploads100050
rollover_secondsNumber of seconds a file will be open before it is closed and a new file is opened303600
inactivity_rollover_secondsNumber of seconds a file will be open without any new data before it is closed and a new file is openedNone600

File Naming Options

By default Arroyo writes sequential files, tracking a max_file_index as well as including the subtask. However, there are several alternate naming schemes available.

FieldDescriptionDefaultExample
filename.prefixPrefix that will be appended to the beginning of the file name, followed by a -Nonemy-prefix
filename.suffixSuffix that will be appended to the end of the file name, preceded by a -Nonemy-suffix
filename.strategyFilenaming strategy to use. Supported values: serial, uuidserialuuid

Parquet Options

Parquet has a few supported options. Please reach out if you’d like to expand the settings allowed.

FieldDescriptionDefaultExample
parquet_compressionThe compression codec to use for Parquet files. Supported values: none, snappy, gzip, zstd, lz4.nonezstd
parquet_row_batch_sizeThe maximum number of rows to write per record batch10000100
parquet_row_group_sizeThe maximum number of rows to write per row group1000000100000

Partitioning Options

Arroyo supports partitioning of outputs. There are two types of partitioning: event time-based and field-based. You can use either or both of these types of partitioning. If both are used, the time-based partitioning is placed prior to the field-based partitioning.

Event Time-based Partitioning

Event time partitioning uses each record’s event_time, formatting it using a strftime-style formatting string. You can set the time_partition_pattern key in the sink to define the pattern.

Example: time_partition_pattern = '%Y/%m/%d/%H'

Field-based Partitioning

Field-based formatting produces a string mirroring the Hive-style partition directories, so partitioning on field_1, field_2 will result in folders like field_1=X/field_2=Y. You can set the partition_fields key in the sink to define the partition fields.

Example: partition_fields = 'field_1,field_2'

Source Specific Configuration

When using the file system source, the following options are available

FieldDescriptionDefaultExample
compression_formatThe compression format of the files to read. Supported values: none, zstd, gzip. Only used for JSON inputnonegzip
source.regex-patternA regex pattern to match files to read. If specified all files within the path will be evaluated against pattern. If not specified only files directly under the path will be read.None.*\.json

File System Sink DDL

Here’s an example for how to create a table to write parquet to S3 with partitioning:

CREATE TABLE bids (
  auction bigint,
  bidder bigint,
  price bigint,
  datetime timestamp,
  region text,
  account_id text
) WITH (
  connector = 'filesystem',
  type = 'sink',
  path = 'https://s3.us-west-2.amazonaws.com/demo/s3-uri',
  format = 'parquet',
  parquet_compression = 'zstd',
  rollover_seconds = '60',
  time_partition_pattern = '%Y/%m/%d/%H',
  partition_fields = 'region,account_id'
);

File System Source DDL

Here’s an example for how to create a table to read parquet from S3:

CREATE TABLE bids (
  auction bigint,
  bidder bigint,
  price bigint,
  datetime timestamp,
  region text,
  account_id text
) WITH (
  connector = 'filesystem',
  type = 'source',
  path = 'https://s3.us-west-2.amazonaws.com/demo/s3-uri',
  format = 'parquet',
  'source.regex-pattern' = '.*\.parquet$'
);