Skip to content

Topology optimization

Your source code describes intent. The compiler rewrites the topology into an efficient execution plan while preserving semantics. This page covers what happens automatically, what you control, and how to verify the result.

Write three separate map operations:

source "input" serde serde
>>> mapValues fn1
>>> mapValues fn2
>>> mapValues fn3
>>> sink "output" serde serde

Running this literally would allocate intermediate records after each map, traverse the stream three times, and add synchronization overhead between steps. The compiler fuses them into a single pass that applies all three functions to each record once. Same result, better performance.

LayerWhat it doesEnabled by default?
AST fusionMerges adjacent operators, eliminates redundancyYes
Graph rewritesChanges internal topic layoutNo (opt-in)

AST fusion is always safe. Graph rewrites affect internal topics and require careful rollout.

These rewrites reduce node count and eliminate overhead. They run automatically unless you disable them.

Combine adjacent operators of the same type.

PatternBecomesBenefit
Two mapValuesOne mapValues with composed functionsOne pass, not two
Two filterOne filter with AND of predicatesCheck once, not twice
mapValues then filterFused predicateNo intermediate allocations
-- You write
source >>> mapValues upper >>> filter (not . null) >>> sink
-- Runtime sees
source >>> mapAndFilter (\v -> let u = upper v in (not (null u), u)) >>> sink

Remove or reorder shuffles. Repartitioning is expensive: it requires network I/O to Kafka. The compiler eliminates unnecessary repartitions:

PatternActionWhy
Two repartitions with no key change betweenRemove secondRedundant shuffle
Repartition before a key-changing opRemove firstWould be invalidated anyway
Pure ops between repartitionsHoist before firstRun on smaller pre-shuffle data

Remove no-op operations:

-- These disappear entirely
mapValues id -- Does nothing
filter (const True) -- Keeps everything
through "same-topic" -- Reads what it just wrote

Add required shuffles you forgot. Stateful operations (count, aggregate, reduce) need records partitioned by key. If your topology would process mis-partitioned records, the compiler inserts a repartition automatically.

-- You write (potentially buggy)
source >>> mapValues extractKey >>> count
-- Compiler inserts
source >>> mapValues extractKey >>> repartition >>> count

This prevents silent wrong answers from distributed counting bugs.

These changes affect the internal topic layout on your Kafka brokers. They are disabled by default because they require coordination with deployment.

RewriteEffectWhen to enable
REUSE_KTABLE_SOURCE_TOPICSSkip repartition when source topic already has right keyNew topologies with KTable sources
MERGE_REPARTITION_TOPICSCombine adjacent repartitions into oneComplex topologies with many shuffles
SINGLE_STORE_SELF_JOINUse one store instead of two for self-joinsStream-stream self-joins

They change which internal topics exist. Rolling out a topology with different optimization settings than before can orphan topics or require state migration.

To enable:

import qualified Kafka.Streams.Topology.Optimization as Opt
let cfg = Opt.defaultStreamsConfig
{ Opt.topologyOptimization = Opt.OptimizeAll
}

The compiler can report what it changed.

import qualified Kafka.Streams.Topology.Free.Optimize as Opt
stats <- Opt.optimizationStats myTopology
-- OptimizationStats
-- { osBefore = 15 -- Nodes in original topology
-- , osAfter = 9 -- Nodes after optimization
-- , osRulesFired = [("fuseMaps", 3), ("collapseRepartition", 1)]
-- }

Check this when performance surprises you.

topo <- Opt.compileNoOptimize myTopology
-- Compile exactly what you wrote, no rewrites

Use when you suspect an optimization is buggy or want to compare optimized vs unoptimized behavior.

Pin your topology shape in CI:

testTopologyShape :: IO ()
testTopologyShape = do
optimized <- Opt.optimizeWith Opt.noOptimization myTopology
>>= F.compileTopology
golden <- readFile "test/golden/topology.json"
assertEqual optimized golden

This fails CI if your source changes produce a different compiled shape.

The compiler deliberately avoids some fusions to preserve semantics.

Two async I/O operators stay separate. Each has independent timeout/retry config, its own backpressure buffer, and merging would hide which call is failing. If you want one async worker, write one asyncMapValues with composed logic.

Peek operations stay in place. Side effects (logging, metrics) have observable order, and moving them would change when they fire. That breaks debugging and monitoring.

A sync mapValues after an asyncMapValues stays separate. The sync work runs on the stream thread anyway, the overhead is just a function call, and keeping it distinct clarifies the async boundary.

SymptomLikely causeFix
More repartitions than expectedMissing key extractor, forcing auto-insertAdd explicit groupBy with correct key
Optimization didn’t fuseOperators have different types or side effectsRestructure to group same-type ops
Topology shape changed unexpectedlyAuto-generated operator names shiftedAdd explicit Named annotations
  • AST fusion runs automatically and is always safe
  • Graph rewrites are opt-in and change internal topics
  • Inspect with optimizationStats to see what changed
  • Disable with compileNoOptimize for debugging
  • Golden-file in CI to catch unexpected shape changes