Iceberg Sink
Data from Arroyo can be written to an Apache Iceberg table using the iceberg sink.
Iceberg is an open table format designed for huge analytic datasets, providing features like schema evolution,
hidden partitioning, and time travel. Arroyo currently implements v2 of the Iceberg spec.
The Iceberg sink ingests into Iceberg tables in compatible catalogs, allowing the data to be queried by Iceberg-compatible engines like Apache Spark, Trino, and others.
Catalog support
Section titled “Catalog support”Currently, the Iceberg Sink supports writing to Iceberg REST catalogs, backed S3, R2, GCS, ABS, and Minio; other S3-compatible object stores should also work but are untested.
Known-working catalogs include:
However, any other REST catalog should also work provided it follows the spec.
Configuration
Section titled “Configuration”The Iceberg sink requires the following configuration options:
| Field | Description | Required | Example |
|---|---|---|---|
type | Type of Iceberg connector, currently must be sink | Yes | "sink" |
catalog.type | Type of Iceberg catalog; currently only rest is supported | Yes | "rest" |
catalog.warehouse | Name of the warehouse to connect to | No | "my-warehouse" |
catalog.rest.uri | URI of the Iceberg REST catalog | Yes | "http://localhost:8181" |
catalog.rest.token | Authentication token to use against the catalog | No | "asdf234234" |
namespace | Namespace for the Iceberg table | No | "default" |
table_name | Name of the Iceberg table | Yes | "my_table" |
shuffle_by_partition.enabled | Insert a shuffle (repartitioning) before the sink (see these docs) | No | true |
location_path | Optional override for the object storage path to write data | No | /my/custom/path |
In addition to the Iceberg-specific configurations, there are a number of shared configs with the FileSystemSink:
File rollover configs
These configs control when files are rolled over, creating new files. Rolling over more frequently allows lower latency for readers, but at the cost of more files, which can slow down queries.
| Field | Description | Default | Example |
|---|---|---|---|
rolling_policy.file_size | Target size in a file before it is rolled | None | '128MB' |
rolling_policy.interval | Amount of time a file after creation that a file will be rolled, expressed as a SQL interval | None | interval '30 minutes' |
rolling_policy.inactivity_interval | Number of seconds a file will be open without any new data before it will be rolled | None | interval '1 hour' |
File Naming Options
By default Arroyo names files using UUIDv7, which ensures that they sort in time-order. This behavior can be configured via the following options:
| Field | Description | Default |
|---|---|---|
filename.prefix | Prefix that will be appended to the beginning of the file name, followed by a - | None |
filename.suffix | Suffix (extension) that will be appended to the end of the file name, preceded by a . | 'json' or 'parquet' |
filename.strategy | Filenaming strategy to use. Supported values: serial, uuid, ulid, uuid_v7 | 'uuid_v7' |
Parquet Options
| Field | Description | Default |
|---|---|---|
parquet.compression | The compression codec to use for Parquet files. Supported values: none, snappy, gzip, zstd, lz4, lz4_raw. | 'zstd' |
parquet.row_group_size | Target size for each Parquet row group | '128MB' |
Multipart upload settings
Thes options allow configuring the behavior of how writes against object storage are split into parts:
| Field | Description | Default |
|---|---|---|
multipart.target_part_size | The target size of each multipart upload. Must be at least 5MB. | '32MB' |
multipart.max_parts | Maximum number of multipart uploads before a multipart will be completed | 1000 |
Supported types
Section titled “Supported types”Iceberg has a more limited typesystem than Parquet or Arroyo SQL. In the currently-implemented v2 of the Iceberg spec, only these Arroyo types may be used in an Iceberg sink:
- BOOL
- INT / BIGINT
- FLOAT / DOUBLE
- DECIMAL
- TIMESTAMP(6) (microsecond-precision)
- TEXT
- BYTEA
Partitioning
Section titled “Partitioning”The Iceberg sink supports partitioning using the PARTITIONED BY clause on the
Create Table statement. Partitioning helps improve query performance by
organizing data based on specific columns or transforms. For more background on how
partitioning works in Iceberg, see the Iceberg docs.
Partition Transforms
Section titled “Partition Transforms”Arroyo’s Iceberg sink supports the following partition transforms:
| Transform | Description |
|---|---|
identity(column) | Partition by the exact value of the column |
bucket(N, column) | Partition into N buckets based on hash of the column |
truncate(N, column) | Truncate string values to N characters |
year(column) | Extract year from timestamp |
month(column) | Extract year and month from timestamp |
day(column) | Extract year, month, and day from timestamp |
hour(column) | Extract year, month, day, and hour from timestamp |
You can combine multiple partition transforms:
CREATE TABLE partitioned_events ( user_id BIGINT, region TEXT, event_type TEXT, value DOUBLE, event_time TIMESTAMP) WITH ( 'connector' = 'iceberg', 'catalog.type' = 'rest', 'catalog.rest.url' = 'http://localhost:8181', 'namespace' = 'analytics', 'table_name' = 'events', 'type' = 'sink')PARTITIONED BY ( day(event_time), identity(region));Shuffle by partition
Section titled “Shuffle by partition”When using field-based partitioning and high parallelism, you may end up with many files; typically each sink subtask will write a file for every partition key. To avoid this, you can configure the dataflow to insert a shuffle step before the sink, which will ensure that all records for a particular partition key end up on the same sink node:
'shuffle_by_partition.enabled' = trueFor example, if our partition key is event_type and we have 100 distinct
types, at parallelism 32 we’d end up with 3,200 files being written for each
flush interval. By enabling shuffle_by_partition, we reduce that 100.
Note that this may lead to performance problems if your data is highly skewed across your partition keys; for example, if 90% of your data is in the same partition, those events will all end up on the same sink subtask which may not be able to keep up with the volume.
Example
Section titled “Example”Here’s a complete example of how to use the Iceberg sink with R2 Data Catalog:
create table impulse with ( connector = 'impulse', event_rate = 100);
create table sink ( id INT, ts TIMESTAMP(6) NOT NULL, count INT) with ( connector = 'iceberg', 'catalog.type' = 'rest', 'catalog.rest.url' = 'https://catalog.cloudflarestorage.com/bddda7b15979aaad1875d7a1643c463a/my-bucket', 'catalog.warehouse' = 'bddda7b15979aaad1875d7a1643c463a_my-bucket', type = 'sink', table_name = 'events', format = 'parquet', 'rolling_policy.interval' = interval '30 seconds') PARTITIONED BY ( bucket(id, 4), hour(ts));
insert into sinkselect subtask_index, row_time(), counterfrom impulse;Limitations
Section titled “Limitations”In Arroyo 0.15, we released the first version of the Iceberg sink. There are currently some limitations that users shuold be aware of:
- There is no support for schema evolution; any evolution must be done outside of Arroyo
- Iceberg tables may only be written to, not read
- Arroyo does not perform any compaction operations (including snapshot expiration); we recommend using it with a catalog that implements its own compaction operations, like R2 Data Catalog or S3 Tables
- The only supported data file format is Parquet