Skip to content

Running in containers

A wireform-kafka-streams process keeps its working state on local disk. The RocksDB backend (Kafka.Streams.State.KeyValue.RocksDB, built with the +rocksdb Cabal flag) writes column-family files under <stateDir>/<storeName>/; the WAL-backed Kafka.Streams.State.KeyValue.Persistent backend does the same. Either way, “the database lives in the container” is the model.

Containers break four assumptions that model relies on.

First, the filesystem is expected to outlive the process. Second, the hostname should be stable across restarts. Third, memory consumption should be predictable. Fourth, there should be time to flush state on shutdown.

In containers, none of these hold by default.

This page covers how to put each one back.

A streams instance owns RocksDB directories under stateDir. On unclean shutdown, RocksDB’s WAL replay on next open handles partial-write recovery (this is why Kafka.Streams.State.KeyValue.RocksDB.rocksDBKeyValueStore sets storeFlush = pure (): the WAL is the flush). The Kafka changelog topic behind every store is independent durability; the local copy is an optimisation.

You therefore have four strategies, in order of decreasing cost on restart:

StrategyCold-boot costWhen to pick
Persistent volume at stateDirNear-zero: WAL replay onlyDefault for any non-trivial stateful topology
Ephemeral disk + numStandbyReplicas >= 1Metadata flip on failover; full replay if every replica diesWhen you’d rather pay 2× changelog bandwidth than mount a volume
Ephemeral disk + Riffle snapshot storeO(time-since-last-snapshot) replay from object store + tailMulti-TB state where full changelog replay would gate rollouts
Ephemeral disk + plain changelogO(state-size / replay-bandwidth) on every cold startSmall stores or recreatable workloads only

The runtime opens one RocksDB directory per (storeName, taskId) under stateDir. For a StatefulSet pod app-0 with numStreamThreads = 4, the layout is roughly:

/var/lib/wireform/state/
view-counts/0_0/ -- task 0_0
view-counts/0_1/ -- task 0_1
...
user-profiles/1_0/
...

The volume must be writable by the process user, and the filesystem must support fsync semantics RocksDB expects (ext4, xfs are fine; NFS is not: RocksDB’s WAL atomicity assumptions don’t hold and compaction throughput collapses).

flowchart TD
  Start([Pod starts]) --> CheckDir{stateDir<br/>has existing<br/>RocksDB?}
  CheckDir -- yes --> WALReplay[RocksDB opens<br/>WAL replay]
  CheckDir -- no --> CheckSnap{Snapshot store<br/>configured?}
  WALReplay --> TailReplay[Replay changelog tail<br/>from last committed offset]
  CheckSnap -- yes --> FetchSnap[Fetch snapshot blob<br/>from object store]
  FetchSnap --> SnapTail[Replay changelog tail<br/>since snapshot offset]
  CheckSnap -- no --> FullReplay[Replay entire changelog<br/>from earliest]
  TailReplay --> Ready([Task RUNNING])
  SnapTail --> Ready
  FullReplay --> Ready

The right branch is the one a container with no persistent volume always takes. Sizing rollouts around it is the cause of most “rolling deploys take three hours” incidents.

Three identities matter:

IdentitySet bySurvival requirement
Consumer-group memberapplicationId + member metadataStable across the group’s lifetime; one per process
Task assignmentThe assignor, based on member metadataCooperative-sticky prefers to keep the same tasks on the same member if its identity is stable
RocksDB directorystateDir mountStable iff the volume is reattached to the same pod

For the assignor to recognise a restarted pod as “the same member who just left”, its clientId and (when set) applicationServer need to be stable across restarts:

import qualified Kafka.Streams.Config as C
import qualified System.Environment as Env
mkCfg :: IO C.StreamsConfig
mkCfg = do
hostname <- Env.getEnv "HOSTNAME"
pure C.defaultStreamsConfig
{ C.applicationId = "view-counter"
, C.bootstrapServers = ["broker:9092"]
, C.clientId = "view-counter-" <> T.pack hostname
, C.applicationServer = Just (T.pack hostname <> ".view-counter.svc.cluster.local:8080")
, C.numStreamThreads = 4
, C.numStandbyReplicas = 1
, C.stateDir = "/var/lib/wireform/state"
}

A StatefulSet gives you hostname = view-counter-0, view-counter-1, … and rebinds the same PersistentVolumeClaim to the same ordinal on restart. The clientId derived from HOSTNAME is then automatically stable.

For Kubernetes specifically: a Deployment does not give you stable identity; pod names are random suffixes that change on every restart. Use a StatefulSet whenever the topology has state.

The cooperative-sticky assignor in Kafka.Streams.Runtime.RebalanceProtocol tries to keep tasks where they were. If the restarted member shows up under a brand-new clientId, the assignor has no record of its prior ownership and reassigns from scratch: which means task reshuffles even though the underlying pod is “the same”. Stable identity makes the rebalance a metadata flip instead of a data movement.

The full reconciliation walk is in Scaling and rebalancing → Processes across the group.

RocksDB allocates off-heap. Per store, a non-trivial slice of memory goes to:

  • The block cache (decompressed .sst block pages).
  • The memtable (per-column-family write buffer).
  • Index and filter blocks pinned in memory.
  • Iterator-held pinned blocks during range scans.

A streams pod with N stores and T tasks holds N × T RocksDB directories, each with its own buffers unless you’ve wired a shared Cache and WriteBufferManager through a RocksDBConfig customisation. Without that sharing, total RocksDB memory grows linearly with the number of tasks the pod owns: and the peak is hit during a rebalance, when tasks are being opened on the gaining instance before the losing instance has fully released them.

The container memory limit therefore needs:

container_limit = haskell_rts_resident
+ rocksdb_per_store * stores_per_task * tasks_per_pod
+ page_cache_headroom
+ per_record_inflight * commit_interval_records

The most common production failure mode is sizing the container to “RTS resident + a small constant” and getting OOM-killed during the first heavy compaction. Compaction can transiently double RocksDB memory.

Kafka.Streams.State.KeyValue.RocksDB.RocksDBConfig exposes the options the underlying binding accepts. The practical pattern is to build a single config in the boot path and reuse it for every store the topology constructs:

import qualified Kafka.Streams.State.KeyValue.RocksDB as RDB
bootRocksDBConfig :: FilePath -> RDB.RocksDBConfig
bootRocksDBConfig dir = (RDB.defaultRocksDBConfig dir)
{ RDB.rdbWriteSync = False
}

For deployments with many tasks per pod, a wrapper that shares a Cache across R.open calls is worth the bytes. Track the rocksdb-haskell-kadena API for the underlying Options you’d thread through.

RocksDB’s on-disk footprint is not the same shape as the changelog topic on the broker. Budget the volume for:

  • The steady-state size of every store on this task (sum across task assignments).
  • Compaction headroom: at least 1× the largest column family, on top of steady state, so L0→Ln compactions have room to rewrite .sst files.
  • The WAL, which is bounded but non-trivial in burst write workloads.
  • A buffer for the next deploy’s rebalance window, where the pod may transiently hold both its old and gaining-but-not-yet-released task directories.

A rule of thumb: provision 2× expected steady-state state size on fast local SSD. The Riffle tiered KV store (Kafka.Streams.State.KeyValue.Tiered) can move cold keys to object storage, which is the right escape hatch when state size grows past what’s affordable on local SSD - but it doesn’t change the hot-tier sizing on the pod.

Watch the runbook for unbounded state-dir growth.

closeKafkaStreams walks the shutdown path: finish the in-flight commit cycle → flush memtables → close RocksDB → leave the group. If the container is SIGKILL-ed before that completes:

  • Records in the transactional buffer are dropped (under EOS the next owner will reprocess from the last committed offset, so no data loss: just extra work).
  • The pod leaves the group via session timeout instead of a clean Leave, so the rebalance starts later than it could.
  • RocksDB recovers from its WAL on next open: but it’s another full WAL replay cycle, which adds to startup time.

Kubernetes default is terminationGracePeriodSeconds: 30. For a production stateful topology with commitIntervalMs = 30_000, that is not enough: the in-flight cycle alone can take up to that long. Set the grace period to at least commitIntervalMs + 30 s, or to whatever bound your slowest commit2PC sink advertises plus a safety margin.

Pair the grace period with a preStop hook that calls closeKafkaStreams (typically by sending the process a SIGTERM your runtime translates into the close call) and waits for the internal state machine to reach NOT_RUNNING before exiting.

RocksDB opens many files. Per column family:

  • The current memtable file.
  • The WAL log file.
  • Every live .sst at every level: count is determined by your compaction settings (level0_file_num_compaction_trigger, max_bytes_for_level_base, etc.).
  • Iterators pin additional file descriptors for their lifetime.

A pod owning a few dozen tasks easily reaches several thousand open files. Most container runtimes inherit a generous default nofile soft limit, but explicit Pod-level overrides via securityContext or initContainers are common and have been known to lower it. Verify with ulimit -n inside the running container, not from the manifest.

Set nofile to at least 65536 for any stateful streams pod.

The Kubernetes YAML is outside this library’s scope, but the shape deployments converge to looks like:

ResourceWhy
StatefulSetStable pod hostnames (app-0, app-1, …) and stable PVC binding per ordinal
volumeClaimTemplates on a local-SSD StorageClassMount at stateDir; survives pod restarts
applicationServer = $HOSTNAME.<svc>:<port>Cross-instance IQ routing finds the right pod after rebalance
clientId = "<app>-$HOSTNAME"Stable assignor identity; cooperative-sticky reuses prior assignment
terminationGracePeriodSecondscommitIntervalMs/1000 + 30Enough wall-clock to flush, commit, and leave cleanly
resources.limits.memory covers heap + RocksDB + headroomAvoids OOM during compaction spikes
readinessProbe distinguishes RUNNING from REBALANCINGDon’t take a pod out of rotation during a normal rebalance
livenessProbe triggers only on ERROR / hung commit cycleDon’t restart a healthy pod that’s mid-replay

The probes should consult setStateListener output (or equivalent metric) rather than just “did the process start”; a pod can be process-alive but partition-unowned for the full probingRebalanceIntervalMs window during a rebalance, and you don’t want that to count as ready.

8. Which storage strategy for which workload

Section titled “8. Which storage strategy for which workload”
You have…Pick
Stateless topology (map / filter / branch only)Ephemeral disk; nothing to persist
Small state (< 10 GB per task) you can afford to replayEphemeral disk + numStandbyReplicas = 1
Bounded state, want fast rolloutsPersistent volume + numStandbyReplicas = 1
Large state, can tolerate a snapshot-cadence-bound replayRiffle snapshot store + ephemeral disk + SnapshotPointer standby
Multi-TB state where hot working set is smallTiered KV store (RocksDB hot + object-store cold)
You don’t run on Kubernetes; Nomad / bare ECS / Fly volumesSame principles: stable identity + persistent mount + grace period