Kafka Setup
Configure Sluice to publish messages to Apache Kafka.
Prerequisites
- A running Kafka broker (or cluster)
- Broker address(es) accessible from wherever Sluice runs
For local development, start a broker with Docker:
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_CFG_PROCESS_ROLES=broker,controller \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093 \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
bitnami/kafka:latest
Minimal Configuration
listen_addr: "0.0.0.0:8080"
bus:
type: kafka
bootstrap_servers: "localhost:9092"
topics:
my-events:
target: "my-events-v1"
This listens on port 8080, connects to Kafka at localhost:9092 with a shared default producer, and maps my-events to the Kafka topic my-events-v1.
Publish a message:
curl -X POST http://localhost:8080/v1/publish/my-events \
-H "Content-Type: application/octet-stream" \
--data-binary '{"hello": "world"}'
Default Producer Settings
The bus section configures the shared default Kafka producer. Topics without per-topic overrides use these settings.
bus:
type: kafka
bootstrap_servers: "broker1:9092,broker2:9092,broker3:9092"
acks: "all"
linger_ms: 5
batch_size: 65536
extra:
compression.type: "zstd"
enable.idempotence: "true"
| Setting | Default | Description |
|---|---|---|
bootstrap_servers | (required) | Comma-separated broker addresses |
acks | all | Durability. all = wait for all in-sync replicas |
linger_ms | 5 | Batching delay in ms. Higher = better throughput, worse latency |
batch_size | 65536 | Max batch size in bytes before sending regardless of linger |
extra | {} | Any librdkafka configuration property |
Recommended production settings
bus:
type: kafka
bootstrap_servers: "broker1:9092,broker2:9092,broker3:9092"
acks: "all"
linger_ms: 10
batch_size: 131072
extra:
compression.type: "zstd"
enable.idempotence: "true"
max.in.flight.requests.per.connection: "5"
message.timeout.ms: "30000"
Per-Topic Producer Tuning
Requires a license key. Per-topic tuning fields are ignored without a license key. All topics use the shared default producer in the free tier.
Topics with specific throughput or durability requirements can override the bus-level defaults. Put tuning fields directly on the topic -- no nesting required.
topics:
payments:
target: "payments-v1"
acks: "all"
linger_ms: 0
batch_size: 16384
bootstrap_servers: "kafka-payments:9092"
clickstream:
target: "clickstream-v1"
# No overrides, uses the shared default
Per-topic settings override the bus-level defaults. Anything not specified is inherited. So you only need to set what you want to change:
topics:
low-latency:
target: "low-latency-v1"
linger_ms: 0 # only override linger, inherit the rest
When to use per-topic overrides
- The topic publishes to a different Kafka cluster than the default
- The topic needs different durability settings (acks: 0 for metrics vs acks: all for payments)
- The topic needs isolation from noisy neighbors. A slow topic on the shared producer can back-pressure others
Most topics should use the default producer. 5-15 dedicated producers is typical.
Schema Registry
Requires a license key. schema_registry_url and schema_poll_interval_secs require a license key. Without one, use schema_override to load schemas from local files.
Kafka uses Confluent Schema Registry for schema validation. The registry URL is configured directly on the bus: block.
bus:
type: kafka
bootstrap_servers: "localhost:9092"
schema_registry_url: "http://localhost:8081"
# schema_registry_username: "user" # optional, for basic auth
# schema_registry_password: "pass" # optional
Sluice looks up schemas using the subject {topic-name}-value. The schema type (PROTOBUF, AVRO, JSON) and content are read from the registry response. Format is detected automatically.
Local schema override (dev/testing, free tier)
For local development, you can override the Confluent Schema Registry with local schema files using the top-level schema_override. This works without a license key.
schema_override:
schemas_dir: "schemas/"
Looks for schemas/{topic-name}.desc (proto), .avsc (avro), or .json. Format is inferred from which file exists. When schema_override is set, it takes priority over the Confluent Schema Registry.
Config Schema Reference
bus:
type: kafka # required
bootstrap_servers: "host:9092" # required, comma-separated broker addresses
schema_registry_url: "http://localhost:8081" # optional, Confluent SR URL
# schema_registry_username: "user" # optional, basic auth
# schema_registry_password: "pass" # optional
acks: "all" # "all", "1", or "0". default: "all"
linger_ms: 5 # batching delay in ms. default: 5
batch_size: 65536 # max batch size in bytes. default: 65536
extra: # optional, any librdkafka property
compression.type: "zstd"
All fields optional. Unset fields inherit from the bus config. Ignored when bus is pubsub.
my-topic:
target: "bus-native-name" # optional, defaults to key name
validation: { ... } # optional
ordering: { ... } # optional
acks: "all" # optional, override durability
linger_ms: 0 # optional, override batching delay
batch_size: 16384 # optional, override batch size
bootstrap_servers: "other:9092" # optional, override broker addresses
extra: # optional, additional librdkafka properties
compression.type: "lz4"
Dev/testing only. Overrides Confluent SR with local files.
schema_override:
schemas_dir: "schemas/" # looks for {topic}.desc, .avsc, or .json
Environment Variable Overrides
Override any YAML setting via environment variables with the SLUICE_ prefix and __ as a separator:
# Override bootstrap servers
docker run -e SLUICE_BUS__BOOTSTRAP_SERVERS=prod-kafka:9092 ghcr.io/data-tier/sluice:0.2.2
# Override listen address
docker run -e SLUICE_LISTEN_ADDR=0.0.0.0:9090 ghcr.io/data-tier/sluice:0.2.2