iceberg sink.
Iceberg is an open table format designed for huge analytic datasets, providing features like schema evolution,
hidden partitioning, and time travel. Arroyo currently implements v2 of the Iceberg spec.
The Iceberg sink ingests into Iceberg tables in compatible catalogs, allowing
the data to be queried by Iceberg-compatible engines like Apache Spark, Trino,
and others.
Catalog support
Currently, the Iceberg Sink supports writing to Iceberg REST catalogs, backed S3, R2, GCS, ABS, and Minio; other S3-compatible object stores should also work but are untested. Known-working catalogs include: However, any other REST catalog should also work provided it follows the spec.Configuration
The Iceberg sink requires the following configuration options:| Field | Description | Required | Example |
|---|---|---|---|
type | Type of Iceberg connector, currently must be sink | Yes | "sink" |
catalog.type | Type of Iceberg catalog; currently only rest is supported | Yes | "rest" |
catalog.warehouse | Name of the warehouse to connect to | No | "my-warehouse" |
catalog.rest.uri | URI of the Iceberg REST catalog | Yes | "http://localhost:8181" |
catalog.rest.token | Authentication token to use against the catalog | No | "asdf234234" |
namespace | Namespace for the Iceberg table | No | "default" |
table_name | Name of the Iceberg table | Yes | "my_table" |
shuffle_by_partition.enabled | Insert a shuffle (repartitioning) before the sink (see these docs) | No | true |
location_path | Optional override for the object storage path to write data | No | /my/custom/path |
File rollover configs
These configs control when files are rolled over, creating new files. Rolling over more frequently allows lower latency for readers, but at the cost of more files, which can slow down queries.| Field | Description | Default | Example |
|---|---|---|---|
rolling_policy.file_size | Target size in a file before it is rolled | None | '128MB' |
rolling_policy.interval | Amount of time a file after creation that a file will be rolled, expressed as a SQL interval | None | interval '30 minutes' |
rolling_policy.inactivity_interval | Number of seconds a file will be open without any new data before it will be rolled | None | interval '1 hour' |
File Naming Options
By default Arroyo names files using UUIDv7, which ensures that they sort in time-order. This behavior can be configured via the following options:| Field | Description | Default |
|---|---|---|
filename.prefix | Prefix that will be appended to the beginning of the file name, followed by a - | None |
filename.suffix | Suffix (extension) that will be appended to the end of the file name, preceded by a . | 'json' or 'parquet' |
filename.strategy | Filenaming strategy to use. Supported values: serial, uuid, ulid, uuid_v7 | 'uuid_v7' |
Parquet Options
| Field | Description | Default |
|---|---|---|
parquet.compression | The compression codec to use for Parquet files. Supported values: none, snappy, gzip, zstd, lz4, lz4_raw. | 'zstd' |
parquet.row_group_size | Target size for each Parquet row group | '128MB' |
Multipart upload settings
Thes options allow configuring the behavior of how writes against object storage are split into parts:| Field | Description | Default |
|---|---|---|
multipart.target_part_size | The target size of each multipart upload. Must be at least 5MB. | '32MB' |
multipart.max_parts | Maximum number of multipart uploads before a multipart will be completed | 1000 |
Supported types
Iceberg has a more limited typesystem than Parquet or Arroyo SQL. In the currently-implemented v2 of the Iceberg spec, only these Arroyo types may be used in an Iceberg sink:- BOOL
- INT / BIGINT
- FLOAT / DOUBLE
- DECIMAL
- TIMESTAMP(6) (microsecond-precision)
- TEXT
- BYTEA
Partitioning
The Iceberg sink supports partitioning using thePARTITIONED BY clause on the
Create Table statement. Partitioning helps improve query performance by
organizing data based on specific columns or transforms. For more background on how
partitioning works in Iceberg, see the Iceberg docs.
Partition Transforms
Arroyo’s Iceberg sink supports the following partition transforms:| Transform | Description |
|---|---|
identity(column) | Partition by the exact value of the column |
bucket(N, column) | Partition into N buckets based on hash of the column |
truncate(N, column) | Truncate string values to N characters |
year(column) | Extract year from timestamp |
month(column) | Extract year and month from timestamp |
day(column) | Extract year, month, and day from timestamp |
hour(column) | Extract year, month, day, and hour from timestamp |
Shuffle by partition
When using field-based partitioning and high parallelism, you may end up with many files; typically each sink subtask will write a file for every partition key. To avoid this, you can configure the dataflow to insert a shuffle step before the sink, which will ensure that all records for a particular partition key end up on the same sink node:event_type and we have 100 distinct
types, at parallelism 32 we’d end up with 3,200 files being written for each
flush interval. By enabling shuffle_by_partition, we reduce that 100.
Note that this may lead to performance problems if your data is highly skewed
across your partition keys; for example, if 90% of your data is in the same
partition, those events will all end up on the same sink subtask which may
not be able to keep up with the volume.
Example
Here’s a complete example of how to use the Iceberg sink with R2 Data Catalog:Limitations
In Arroyo 0.15, we released the first version of the Iceberg sink. There are currently some limitations that users shuold be aware of:- There is no support for schema evolution; any evolution must be done outside of Arroyo
- Iceberg tables may only be written to, not read
- Arroyo does not perform any compaction operations (including snapshot expiration); we recommend using it with a catalog that implements its own compaction operations, like R2 Data Catalog or S3 Tables
- The only supported data file format is Parquet