Sluice / Kafka Setup

Kafka Setup

Configure Sluice to publish messages to Apache Kafka.

Prerequisites

For local development, start a broker with Docker:

docker run -d --name kafka \
  -p 9092:9092 \
  -e KAFKA_CFG_PROCESS_ROLES=broker,controller \
  -e KAFKA_CFG_NODE_ID=0 \
  -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093 \
  -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  bitnami/kafka:latest

Minimal Configuration

listen_addr: "0.0.0.0:8080"

bus:
  type: kafka
  bootstrap_servers: "localhost:9092"

topics:
  my-events:
    target: "my-events-v1"

This listens on port 8080, connects to Kafka at localhost:9092 with a shared default producer, and maps my-events to the Kafka topic my-events-v1.

Publish a message:

curl -X POST http://localhost:8080/v1/publish/my-events \
  -H "Content-Type: application/octet-stream" \
  --data-binary '{"hello": "world"}'

Default Producer Settings

The bus section configures the shared default Kafka producer. Topics without per-topic overrides use these settings.

bus:
  type: kafka
  bootstrap_servers: "broker1:9092,broker2:9092,broker3:9092"
  acks: "all"
  linger_ms: 5
  batch_size: 65536
  extra:
    compression.type: "zstd"
    enable.idempotence: "true"
SettingDefaultDescription
bootstrap_servers(required)Comma-separated broker addresses
acksallDurability. all = wait for all in-sync replicas
linger_ms5Batching delay in ms. Higher = better throughput, worse latency
batch_size65536Max batch size in bytes before sending regardless of linger
extra{}Any librdkafka configuration property

Recommended production settings

bus:
  type: kafka
  bootstrap_servers: "broker1:9092,broker2:9092,broker3:9092"
  acks: "all"
  linger_ms: 10
  batch_size: 131072
  extra:
    compression.type: "zstd"
    enable.idempotence: "true"
    max.in.flight.requests.per.connection: "5"
    message.timeout.ms: "30000"

Per-Topic Producer Tuning

Requires a license key. Per-topic tuning fields are ignored without a license key. All topics use the shared default producer in the free tier.

Topics with specific throughput or durability requirements can override the bus-level defaults. Put tuning fields directly on the topic -- no nesting required.

topics:
  payments:
    target: "payments-v1"
    acks: "all"
    linger_ms: 0
    batch_size: 16384
    bootstrap_servers: "kafka-payments:9092"

  clickstream:
    target: "clickstream-v1"
    # No overrides, uses the shared default

Per-topic settings override the bus-level defaults. Anything not specified is inherited. So you only need to set what you want to change:

topics:
  low-latency:
    target: "low-latency-v1"
    linger_ms: 0    # only override linger, inherit the rest

When to use per-topic overrides

Most topics should use the default producer. 5-15 dedicated producers is typical.

Schema Registry

Requires a license key. schema_registry_url and schema_poll_interval_secs require a license key. Without one, use schema_override to load schemas from local files.

Kafka uses Confluent Schema Registry for schema validation. The registry URL is configured directly on the bus: block.

bus:
  type: kafka
  bootstrap_servers: "localhost:9092"
  schema_registry_url: "http://localhost:8081"
  # schema_registry_username: "user"   # optional, for basic auth
  # schema_registry_password: "pass"   # optional

Sluice looks up schemas using the subject {topic-name}-value. The schema type (PROTOBUF, AVRO, JSON) and content are read from the registry response. Format is detected automatically.

Local schema override (dev/testing, free tier)

For local development, you can override the Confluent Schema Registry with local schema files using the top-level schema_override. This works without a license key.

schema_override:
  schemas_dir: "schemas/"

Looks for schemas/{topic-name}.desc (proto), .avsc (avro), or .json. Format is inferred from which file exists. When schema_override is set, it takes priority over the Confluent Schema Registry.

Config Schema Reference

bus:
  type: kafka                      # required
  bootstrap_servers: "host:9092"   # required, comma-separated broker addresses
  schema_registry_url: "http://localhost:8081"  # optional, Confluent SR URL
  # schema_registry_username: "user"            # optional, basic auth
  # schema_registry_password: "pass"            # optional
  acks: "all"                      # "all", "1", or "0". default: "all"
  linger_ms: 5                     # batching delay in ms. default: 5
  batch_size: 65536                # max batch size in bytes. default: 65536
  extra:                           # optional, any librdkafka property
    compression.type: "zstd"

All fields optional. Unset fields inherit from the bus config. Ignored when bus is pubsub.

my-topic:
  target: "bus-native-name"        # optional, defaults to key name
  validation: { ... }              # optional
  ordering: { ... }                # optional
  acks: "all"                      # optional, override durability
  linger_ms: 0                     # optional, override batching delay
  batch_size: 16384                # optional, override batch size
  bootstrap_servers: "other:9092"  # optional, override broker addresses
  extra:                           # optional, additional librdkafka properties
    compression.type: "lz4"

Dev/testing only. Overrides Confluent SR with local files.

schema_override:
  schemas_dir: "schemas/"          # looks for {topic}.desc, .avsc, or .json

Environment Variable Overrides

Override any YAML setting via environment variables with the SLUICE_ prefix and __ as a separator:

# Override bootstrap servers
docker run -e SLUICE_BUS__BOOTSTRAP_SERVERS=prod-kafka:9092 ghcr.io/data-tier/sluice:0.2.2

# Override listen address
docker run -e SLUICE_LISTEN_ADDR=0.0.0.0:9090 ghcr.io/data-tier/sluice:0.2.2