Use Kafka topics as sources and sinks
Field | Description | Required | Example |
---|---|---|---|
bootstrap_servers | A comma-separated list of Kafka servers to connect to | Yes | kafka-server-1.cluster:9092,kafka-server-2.cluster:9092 |
auth.type | One of none , sasl , or aws_msk_iam | No | sasl |
auth.protocol | The SASL protocol to use (e.g., SASL_PLAINTEXT , SASL_SSL ) | No | SASL_PLAINTEXT |
auth.mechanism | The SASL mechanism to use (e.g., SCRAM-SHA-256 , SCRAM-SHA-512 ) | No | SCRAM-SHA-256 |
auth.username | The username to use for SASL authentication | No | user |
auth.password | The password to use for SASL authentication | No | password |
auth.region | The region to use for AWS_MSK_IAM auth | No | ’us-east-1’ |
topic | The name of the Kafka topic to read from or write to | Yes | |
type | The type of table (either ‘source’ or ‘sink’) | Yes | source |
value.subject | Configures the value subject read from the schema registry, if enabled | No | logs-value |
source.offset | The offset to start reading from (either ‘earliest’ or ‘latest’) | No | earliest |
source.read_mode | The read mode to use. ‘read_committed’ only reads committed Kafka messages, while ‘read_uncommitted’ will read all messages. | No | read_committed |
source.group_id | For sources, sets the Kafka consumer group to use; note that using the same group for multiple pipelines will result in each pipeline seeing only a subset of the data | No | my-group |
sink.commit_mode | The commit mode to use (either ‘exactly_once’ or ‘at_least_once’) | No | exactly_once |
’sink.timestamp_field’ | The field to use as the timestamp for the Kafka message | No | timestamp |
’sink.key_field’ | The field to use as the key for the Kafka message | No | key |
schema_registry.endpoint | The URL of the Confluent Schema Registry to use | No | https://schema-registry.cluster:8081 |
schema_registry.api_key | The API Key to use for the Schema Registry. Will be the username via basic auth | No | ABCDEFGHIJK01234 |
schema_registry.api_secret | The API Secret to use for the Schema Registry. Will be the password via basic auth | No | abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 |
client_configs | Key-value pairs of additional rdkafka configuration options | No | message.max.bytes=1000,queue.buffering.max.kbytes=1048576 |
source.read_mode
to read_committed
to ensure that only committed messages are read.
offset_id BIGINT
— the offset of the messagepartition INT
- the partition of the messagetopic TEXT
— the topic the message was consumed fromtimestamp BIGINT
- the record timestampkey BYTEA
- the message key