Skip to main content
Data from Arroyo can be written to an Apache Iceberg table using the iceberg sink. Iceberg is an open table format designed for huge analytic datasets, providing features like schema evolution, hidden partitioning, and time travel. Arroyo currently implements v2 of the Iceberg spec. The Iceberg sink ingests into Iceberg tables in compatible catalogs, allowing the data to be queried by Iceberg-compatible engines like Apache Spark, Trino, and others.

Catalog support

Currently, the Iceberg Sink supports writing to Iceberg REST catalogs, backed S3, R2, GCS, ABS, and Minio; other S3-compatible object stores should also work but are untested. Known-working catalogs include: However, any other REST catalog should also work provided it follows the spec.

Configuration

The Iceberg sink requires the following configuration options:
FieldDescriptionRequiredExample
typeType of Iceberg connector, currently must be sinkYes"sink"
catalog.typeType of Iceberg catalog; currently only rest is supportedYes"rest"
catalog.warehouseName of the warehouse to connect toNo"my-warehouse"
catalog.rest.uriURI of the Iceberg REST catalogYes"http://localhost:8181"
catalog.rest.tokenAuthentication token to use against the catalogNo"asdf234234"
namespaceNamespace for the Iceberg tableNo"default"
table_nameName of the Iceberg tableYes"my_table"
shuffle_by_partition.enabledInsert a shuffle (repartitioning) before the sink (see these docs)Notrue
location_pathOptional override for the object storage path to write dataNo/my/custom/path
In addition to the Iceberg-specific configurations, there are a number of shared configs with the FileSystemSink:

File rollover configs

These configs control when files are rolled over, creating new files. Rolling over more frequently allows lower latency for readers, but at the cost of more files, which can slow down queries.
FieldDescriptionDefaultExample
rolling_policy.file_sizeTarget size in a file before it is rolledNone'128MB'
rolling_policy.intervalAmount of time a file after creation that a file will be rolled, expressed as a SQL intervalNoneinterval '30 minutes'
rolling_policy.inactivity_intervalNumber of seconds a file will be open without any new data before it will be rolledNoneinterval '1 hour'

File Naming Options

By default Arroyo names files using UUIDv7, which ensures that they sort in time-order. This behavior can be configured via the following options:
FieldDescriptionDefault
filename.prefixPrefix that will be appended to the beginning of the file name, followed by a -None
filename.suffixSuffix (extension) that will be appended to the end of the file name, preceded by a .'json' or 'parquet'
filename.strategyFilenaming strategy to use. Supported values: serial, uuid, ulid, uuid_v7'uuid_v7'

Parquet Options

FieldDescriptionDefault
parquet.compressionThe compression codec to use for Parquet files. Supported values: none, snappy, gzip, zstd, lz4, lz4_raw.'zstd'
parquet.row_group_sizeTarget size for each Parquet row group'128MB'

Multipart upload settings

Thes options allow configuring the behavior of how writes against object storage are split into parts:
FieldDescriptionDefault
multipart.target_part_sizeThe target size of each multipart upload. Must be at least 5MB.'32MB'
multipart.max_partsMaximum number of multipart uploads before a multipart will be completed1000

Supported types

Iceberg has a more limited typesystem than Parquet or Arroyo SQL. In the currently-implemented v2 of the Iceberg spec, only these Arroyo types may be used in an Iceberg sink:
  • BOOL
  • INT / BIGINT
  • FLOAT / DOUBLE
  • DECIMAL
  • TIMESTAMP(6) (microsecond-precision)
  • TEXT
  • BYTEA

Partitioning

The Iceberg sink supports partitioning using the PARTITIONED BY clause on the Create Table statement. Partitioning helps improve query performance by organizing data based on specific columns or transforms. For more background on how partitioning works in Iceberg, see the Iceberg docs.

Partition Transforms

Arroyo’s Iceberg sink supports the following partition transforms:
TransformDescription
identity(column)Partition by the exact value of the column
bucket(N, column)Partition into N buckets based on hash of the column
truncate(N, column)Truncate string values to N characters
year(column)Extract year from timestamp
month(column)Extract year and month from timestamp
day(column)Extract year, month, and day from timestamp
hour(column)Extract year, month, day, and hour from timestamp
You can combine multiple partition transforms:
CREATE TABLE partitioned_events (
    user_id BIGINT,
    region TEXT,
    event_type TEXT,
    value DOUBLE,
    event_time TIMESTAMP
) WITH (
    'connector' = 'iceberg',
    'catalog.type' = 'rest',
    'catalog.rest.url' = 'http://localhost:8181',
    'namespace' = 'analytics',
    'table_name' = 'events',
    'type' = 'sink'
)
PARTITIONED BY (
    day(event_time),
    identity(region)
);

Shuffle by partition

When using field-based partitioning and high parallelism, you may end up with many files; typically each sink subtask will write a file for every partition key. To avoid this, you can configure the dataflow to insert a shuffle step before the sink, which will ensure that all records for a particular partition key end up on the same sink node:
'shuffle_by_partition.enabled' = true
For example, if our partition key is event_type and we have 100 distinct types, at parallelism 32 we’d end up with 3,200 files being written for each flush interval. By enabling shuffle_by_partition, we reduce that 100. Note that this may lead to performance problems if your data is highly skewed across your partition keys; for example, if 90% of your data is in the same partition, those events will all end up on the same sink subtask which may not be able to keep up with the volume.

Example

Here’s a complete example of how to use the Iceberg sink with R2 Data Catalog:
create table impulse with (
    connector = 'impulse',
    event_rate = 100
);

create table sink (
    id INT,
    ts TIMESTAMP(6) NOT NULL,
    count INT
) with (
    connector = 'iceberg',
    'catalog.type' = 'rest',
    'catalog.rest.url' = 'https://catalog.cloudflarestorage.com/bddda7b15979aaad1875d7a1643c463a/my-bucket',
    'catalog.warehouse' = 'bddda7b15979aaad1875d7a1643c463a_my-bucket',
    type = 'sink',
    table_name = 'events',
    format = 'parquet',
    'rolling_policy.interval' = interval '30 seconds'
) PARTITIONED BY (
    bucket(id, 4),
    hour(ts)
);

insert into sink
select subtask_index, row_time(), counter
from impulse;

Limitations

In Arroyo 0.15, we released the first version of the Iceberg sink. There are currently some limitations that users shuold be aware of:
  • There is no support for schema evolution; any evolution must be done outside of Arroyo
  • Iceberg tables may only be written to, not read
  • Arroyo does not perform any compaction operations (including snapshot expiration); we recommend using it with a catalog that implements its own compaction operations, like R2 Data Catalog or S3 Tables
  • The only supported data file format is Parquet