Topology optimization
Your source code describes intent. The compiler rewrites the topology into an efficient execution plan while preserving semantics. This page covers what happens automatically, what you control, and how to verify the result.
Why topologies get optimized
Section titled “Why topologies get optimized”Write three separate map operations:
source "input" serde serde >>> mapValues fn1 >>> mapValues fn2 >>> mapValues fn3 >>> sink "output" serde serdeRunning this literally would allocate intermediate records after each map, traverse the stream three times, and add synchronization overhead between steps. The compiler fuses them into a single pass that applies all three functions to each record once. Same result, better performance.
Two optimization layers
Section titled “Two optimization layers”| Layer | What it does | Enabled by default? |
|---|---|---|
| AST fusion | Merges adjacent operators, eliminates redundancy | Yes |
| Graph rewrites | Changes internal topic layout | No (opt-in) |
AST fusion is always safe. Graph rewrites affect internal topics and require careful rollout.
AST-level optimizations (always on)
Section titled “AST-level optimizations (always on)”These rewrites reduce node count and eliminate overhead. They run automatically unless you disable them.
Operator fusion
Section titled “Operator fusion”Combine adjacent operators of the same type.
| Pattern | Becomes | Benefit |
|---|---|---|
Two mapValues | One mapValues with composed functions | One pass, not two |
Two filter | One filter with AND of predicates | Check once, not twice |
mapValues then filter | Fused predicate | No intermediate allocations |
-- You writesource >>> mapValues upper >>> filter (not . null) >>> sink
-- Runtime seessource >>> mapAndFilter (\v -> let u = upper v in (not (null u), u)) >>> sinkRepartition optimization
Section titled “Repartition optimization”Remove or reorder shuffles. Repartitioning is expensive: it requires network I/O to Kafka. The compiler eliminates unnecessary repartitions:
| Pattern | Action | Why |
|---|---|---|
| Two repartitions with no key change between | Remove second | Redundant shuffle |
| Repartition before a key-changing op | Remove first | Would be invalidated anyway |
| Pure ops between repartitions | Hoist before first | Run on smaller pre-shuffle data |
Identity elimination
Section titled “Identity elimination”Remove no-op operations:
-- These disappear entirelymapValues id -- Does nothingfilter (const True) -- Keeps everythingthrough "same-topic" -- Reads what it just wroteAuto-insert missing repartitions
Section titled “Auto-insert missing repartitions”Add required shuffles you forgot. Stateful operations (count, aggregate, reduce) need records partitioned by key. If your topology would process mis-partitioned records, the compiler inserts a repartition automatically.
-- You write (potentially buggy)source >>> mapValues extractKey >>> count
-- Compiler insertssource >>> mapValues extractKey >>> repartition >>> countThis prevents silent wrong answers from distributed counting bugs.
Graph-level optimizations (opt-in)
Section titled “Graph-level optimizations (opt-in)”These changes affect the internal topic layout on your Kafka brokers. They are disabled by default because they require coordination with deployment.
| Rewrite | Effect | When to enable |
|---|---|---|
REUSE_KTABLE_SOURCE_TOPICS | Skip repartition when source topic already has right key | New topologies with KTable sources |
MERGE_REPARTITION_TOPICS | Combine adjacent repartitions into one | Complex topologies with many shuffles |
SINGLE_STORE_SELF_JOIN | Use one store instead of two for self-joins | Stream-stream self-joins |
They change which internal topics exist. Rolling out a topology with different optimization settings than before can orphan topics or require state migration.
To enable:
import qualified Kafka.Streams.Topology.Optimization as Opt
let cfg = Opt.defaultStreamsConfig { Opt.topologyOptimization = Opt.OptimizeAll }Inspecting optimizations
Section titled “Inspecting optimizations”The compiler can report what it changed.
View optimization statistics
Section titled “View optimization statistics”import qualified Kafka.Streams.Topology.Free.Optimize as Opt
stats <- Opt.optimizationStats myTopology-- OptimizationStats-- { osBefore = 15 -- Nodes in original topology-- , osAfter = 9 -- Nodes after optimization-- , osRulesFired = [("fuseMaps", 3), ("collapseRepartition", 1)]-- }Check this when performance surprises you.
Disable optimizations (for debugging)
Section titled “Disable optimizations (for debugging)”topo <- Opt.compileNoOptimize myTopology-- Compile exactly what you wrote, no rewritesUse when you suspect an optimization is buggy or want to compare optimized vs unoptimized behavior.
Golden-file testing
Section titled “Golden-file testing”Pin your topology shape in CI:
testTopologyShape :: IO ()testTopologyShape = do optimized <- Opt.optimizeWith Opt.noOptimization myTopology >>= F.compileTopology golden <- readFile "test/golden/topology.json" assertEqual optimized goldenThis fails CI if your source changes produce a different compiled shape.
When optimizations don’t happen
Section titled “When optimizations don’t happen”The compiler deliberately avoids some fusions to preserve semantics.
Async boundaries
Section titled “Async boundaries”Two async I/O operators stay separate. Each has independent timeout/retry config, its own backpressure buffer, and merging would hide which call is failing. If you want one async worker, write one asyncMapValues with composed logic.
Side-effect ordering
Section titled “Side-effect ordering”Peek operations stay in place. Side effects (logging, metrics) have observable order, and moving them would change when they fire. That breaks debugging and monitoring.
Post-async sync work
Section titled “Post-async sync work”A sync mapValues after an asyncMapValues stays separate. The sync work runs on the stream thread anyway, the overhead is just a function call, and keeping it distinct clarifies the async boundary.
Common issues and fixes
Section titled “Common issues and fixes”| Symptom | Likely cause | Fix |
|---|---|---|
| More repartitions than expected | Missing key extractor, forcing auto-insert | Add explicit groupBy with correct key |
| Optimization didn’t fuse | Operators have different types or side effects | Restructure to group same-type ops |
| Topology shape changed unexpectedly | Auto-generated operator names shifted | Add explicit Named annotations |
Summary
Section titled “Summary”- AST fusion runs automatically and is always safe
- Graph rewrites are opt-in and change internal topics
- Inspect with
optimizationStatsto see what changed - Disable with
compileNoOptimizefor debugging - Golden-file in CI to catch unexpected shape changes
Related reading
Section titled “Related reading”- Dynamic topology changes: Understanding what you can change at runtime
- Observability: Monitoring the optimized topology in production