Sluice / Topic Configuration

Topic Configuration

Each topic maps a logical name (used in the HTTP API) to a bus-native target, with optional validation, ordering, and bus-specific overrides.

Basic Topic

The only required field is target, the bus-native topic name:

topics:
  user-events:
    target: "user-events-v1"    # Kafka topic name or Pub/Sub topic ID

Publish to it:

curl -X POST http://localhost:8080/v1/publish/user-events \
  --data-binary '...'

The logical name user-events (in the URL) is decoupled from the bus-native name user-events-v1. You can rename or migrate topics without changing client code.

Schema Registry

Schemas come from the bus-native registry. For Kafka, this is Confluent Schema Registry (configured via schema_registry_url on the bus: block). For Pub/Sub, schemas are fetched automatically from the topic's attached schema in GCP. See the Kafka or Pub/Sub guides for full setup details.

Local schema override (dev/testing)

For local development, you can override the bus-native registry with local schema files. This works with both Kafka and Pub/Sub.

schema_override:
  schemas_dir: "schemas/"

The convention is schemas/{topic-name}.desc for proto, .avsc for avro, and .json for json. When schema_override is set, it takes priority over the bus-native registry.

Validation

Sluice validates payloads against schemas before they reach the bus. Invalid messages get a 400. Bad data never hits your topics. Topics specify which fields to check. The schema itself comes from the registry.

topics:
  user-events:
    validation:
      enabled: true
      required_fields:
        - "user_id"
        - "event_type"
        - "timestamp"

Wire-format tag scanning. Reads field numbers and types directly from the binary payload in a single pass. Checks required fields without fully deserializing.

topics:
  page-views:
    validation:
      enabled: true
      required_fields:
        - "url"
        - "session_id"

Schema-guided projection reading. Navigates to required fields using the writer schema and verifies they are present and non-null. Full schema must be available because Avro's wire format is positional.

topics:
  page-views:
    validation:
      enabled: true
      required_fields:
        - "url"
        - "session_id"

Parses the JSON body and checks that each required field is present and non-null. Optionally validates against a JSON Schema when a .json schema file is provided.

Omit the validation block or set enabled: false. Payloads pass through unchecked.

Ordering

If message A arrives before message B for the same ordering key, A is written to the bus before B.

Configuration

topics:
  user-events:
    target: "user-events-v1"
    ordering:
      enabled: true
      partition_key_header: "x-user-id"
      idle_ttl_secs: 30

Clients must provide the ordering key in the configured header:

curl -X POST http://localhost:8080/v1/publish/user-events \
  -H "x-user-id: user-123" \
  --data-binary '...'

Missing header returns 400.

How it works

Sluice uses a per-key actor pool:

  1. First message for key user-123 spawns a tokio task for that key
  2. Subsequent messages queue into the same channel
  3. The actor processes messages one at a time, FIFO
  4. After idle_ttl_secs of silence, the actor evicts
  5. Next message spawns a fresh actor

This scales with active key cardinality, not total. 50 million user IDs but 10k active in a 30s window = ~10k actors.

SettingDefaultDescription
enabledtrueEnforce ordering for this topic
partition_key_headerx-partition-keyHTTP header containing the ordering key
idle_ttl_secs30Seconds before an idle actor evicts

Without ordering, messages go directly to the bus with no sequencing.

Per-Topic Tuning

Requires a license key. Per-topic tuning fields are ignored without a license key.

Topics can override bus-level defaults. Put tuning fields directly on the topic. The available fields depend on your bus type.

Settings inherit from the bus section. Only set what you want to override. A topic with any of these fields gets its own dedicated Kafka producer.

topics:
  clickstream: {}                        # no overrides, shared default producer

  payments:
    acks: "all"
    linger_ms: 0                         # no batching delay
    batch_size: 16384

  analytics:
    bootstrap_servers: "other-kafka:9092" # different cluster
    acks: "1"
    linger_ms: 50
    batch_size: 262144
    extra:
      compression.type: "lz4"

Available fields: acks, linger_ms, batch_size, bootstrap_servers, extra

Controls publisher batching. A batch is sent as soon as any threshold is reached.

topics:
  default-settings: {}                   # uses Pub/Sub defaults

  high-throughput:
    message_count_threshold: 500         # default: 100
    byte_threshold: 5242880              # default: 1048576 (1 MB)
    delay_threshold_ms: 50               # default: 10

  low-latency:
    message_count_threshold: 1
    delay_threshold_ms: 1

Available fields: message_count_threshold, byte_threshold, delay_threshold_ms

Config Schema Reference

license_key: "sl_live_..."      # optional, unlocks paid features
listen_addr: "0.0.0.0:8080"    # optional, default: "0.0.0.0:8080"
bus: { ... }                    # required, see kafka or pubsub guides
schema_override: { ... }        # optional, dev/testing only
topics: { ... }                 # optional, map of topic configs
my-topic:
  target: "bus-native-name"           # optional, defaults to key name ("my-topic")
  validation: { ... }                 # optional
  ordering: { ... }                   # optional
  # Kafka tuning (ignored when bus is pubsub):
  acks: "all"                         # optional
  linger_ms: 0                        # optional
  batch_size: 16384                   # optional
  bootstrap_servers: "other:9092"     # optional, inherits from bus
  extra: { ... }                      # optional, librdkafka properties
  # Pub/Sub tuning (ignored when bus is kafka):
  message_count_threshold: 100        # optional
  byte_threshold: 1048576             # optional
  delay_threshold_ms: 10              # optional
validation:
  enabled: true                # optional, default: true
  required_fields:             # optional, default: []
    - "user_id"
    - "timestamp"
ordering:
  enabled: true                      # optional, default: true
  partition_key_header: "x-user-id"  # optional, default: "x-partition-key"
  idle_ttl_secs: 30                  # optional, default: 30
schema_override:
  schemas_dir: "schemas/"      # directory with local schema files

Full Example

listen_addr: "0.0.0.0:8080"

bus:
  type: kafka
  bootstrap_servers: "broker1:9092,broker2:9092"
  schema_registry_url: "http://registry:8081"
  acks: "all"
  linger_ms: 10
  batch_size: 131072
  extra:
    compression.type: "zstd"
    enable.idempotence: "true"

topics:
  user-events:
    target: "user.events.v1"
    validation:
      enabled: true
      required_fields: [user_id, event_type, timestamp]
    ordering:
      enabled: true
      partition_key_header: "x-user-id"
      idle_ttl_secs: 60
    linger_ms: 1
    batch_size: 65536

  page-views:                    # target defaults to "page-views"
    validation:
      enabled: true
      required_fields: [url, session_id]

  system-metrics:                # target defaults to "system-metrics"
    acks: "0"
    linger_ms: 100
    batch_size: 262144

Schemas are fetched from the topic's attached schema in GCP. No schema configuration is needed in Sluice.

listen_addr: "0.0.0.0:8080"

bus:
  type: pubsub
  project_id: "my-gcp-project"

topics:
  user-events:
    validation:
      enabled: true
      required_fields: [user_id, event_type, timestamp]
    ordering:
      enabled: true
      partition_key_header: "x-user-id"
      idle_ttl_secs: 60

  page-views:
    validation:
      enabled: true
      required_fields: [url, session_id]

  system-metrics:
    message_count_threshold: 500
    byte_threshold: 5242880
    delay_threshold_ms: 100

Use schema_override to load schemas from local files instead of the bus-native registry. Works with both Kafka and Pub/Sub.

bus:
  type: kafka
  bootstrap_servers: "localhost:9092"
  schema_registry_url: "http://localhost:8081"

schema_override:
  schemas_dir: "schemas/"

topics:
  user-events:
    validation:
      enabled: true
      required_fields: [user_id, event_type, timestamp]