Skip to content

Kafka client

wireform-kafka is a native Haskell client for the Apache Kafka wire protocol. It talks directly to Kafka brokers over TCP or TLS, with no JVM, no librdkafka, and no FFI shim in the data path. Everything from connection management to SASL authentication to record-batch compression is implemented in Haskell.

LayerModulesWhat it does
Wire protocolKafka.Protocol.*Varints, compact strings, tagged fields, CRC32C (hardware-accelerated), version negotiation
Generated messagesKafka.Protocol.Generated.*One module per Kafka API key, emitted from upstream JSON schemas by kafka-codegen
NetworkingKafka.Network.*TCP / TLS connections, SASL handshake (PLAIN, SCRAM-SHA-256/512, OAUTHBEARER, AWS MSK IAM)
Magic-ring transportKafka.Network.RingTransport, Kafka.Network.FrameParserBridges a broker Network.Connection onto the wireform-network magic-ring transport; streaming frame parser reads zero-copy slices off the ring (60-65 % faster end-to-end than the classic per-frame connectionGetExact + runGet shape; see the benchmarks)
CompressionKafka.Compression.*, Kafka.Compression.Ringgzip, snappy, lz4, zstd record-batch codecs; the Ring variant takes a raw Ptr Word8 source (e.g. a ring slice) and writes plaintext straight into a caller-supplied destination magic ring via direct libz / liblz4 / libzstd / libsnappy FFI
High-level clientKafka.Client.*Producer, Consumer, AdminClient, Transaction
Mock brokerKafka.Client.Mock.*Deterministic in-process broker for tests
TelemetryKafka.Telemetry.OpenTelemetrySemantic-convention spans for produce/consume/admin

The umbrella module Kafka re-exports the high-level client surface so you can get started with a single import.

A Producer maintains a connection pool and a background sender thread that batches records for efficiency. The typical lifecycle uses a bracket:

import Kafka
main :: IO ()
main = do
let cfg = defaultProducerConfig
{ producerBootstrap = "localhost:9092"
}
withProducer cfg $ \producer -> do
result <- sendMessage producer ProducerRecord
{ prTopic = "events"
, prKey = Just "user-42"
, prValue = "{\"action\":\"login\"}"
, prHeaders = mempty
, prPartition = Nothing
, prTimestamp = Nothing
}
case result of
Right meta -> putStrLn $ "Wrote to partition " <> show (rmPartition meta)
Left err -> putStrLn $ "Send failed: " <> err

If you define a Topic with key and value types, publish handles serialization automatically via HasSerde:

publish producer myTopic myKey myValue

Set producerDelivery on the config:

ValueMeaning
AtMostOnceFire and forget. Fastest, may lose records.
AtLeastOnceRetries until ack. Default. Records may be duplicated on retry.
ExactlyOnceIdempotent producer + transactions. No duplicates, no loss.

flushProducer blocks until every buffered record has been sent or has failed. Call it before shutdown if you need delivery confirmation.

The consumer manages group membership, partition assignment, and offset commits. Two APIs are available:

runConsumer from Kafka.Client.Group takes a per-record handler and manages the poll loop, rebalancing, and commit cycle for you:

import Kafka
main :: IO ()
main = do
let cfg = defaultGroupConfig
{ groupConsumerConfig = defaultConsumerConfig
{ consumerBootstrap = "localhost:9092"
}
, groupId = "my-service"
, groupTopics = ["events"]
}
runConsumer cfg $ \record -> do
putStrLn $ "Got: " <> show (crValue record)

runBatchedConsumer gives you the full ConsumerRecords batch per poll cycle when you need to process records in bulk.

withConsumer gives you a Consumer handle for fine-grained control:

withConsumer cfg $ \consumer -> do
subscribe consumer ["events"]
forever $ do
records <- poll consumer 1000
mapM_ process (consumerRecordsAll records)
commitSync consumer
FunctionBehavior
commitSyncBlock until offsets are committed
commitAsyncFire-and-forget commit
commitSyncOffsetsCommit specific partition/offset pairs
seek / seekToBeginning / seekToEndRewind or fast-forward
offsetsForTimesFind offsets by timestamp

Enabled by default (consumerAutoCommit = True). Disable it when you need explicit control over when offsets advance.

Transactions give you atomic multi-partition produces combined with consumer offset commits. This is how you build exactly-once pipelines.

import Kafka
main :: IO ()
main = do
let cfg = defaultProducerConfig
{ producerBootstrap = "localhost:9092"
, producerTransactional = Just "my-txn-id"
, producerIdempotent = True
}
withProducer cfg $ \producer -> do
txn <- bindTransaction producer
initTransactions txn
withTransaction txn $ do
sendInTransaction txn (ProducerRecord { .. })
commitOffsetsInTransaction txn consumerGroupMeta offsets

withTransaction calls beginTransaction, runs your action, and either commits or aborts on exception. The transaction coordinator on the broker ensures that either all partitions see the records and offset commits, or none do.

AdminClient provides control-plane operations:

withAdminClient defaultAdminClientConfig { adminBootstrap = "localhost:9092" } $ \admin -> do
createTopics admin [NewTopic "events" 6 3]
topics <- listTopics admin
groups <- listConsumerGroups admin
describeConfigs admin [ConfigResource BrokerResource "0"]

Supported operations include topic CRUD, consumer group management, config inspection and mutation, ACL management, log dir inspection, partition reassignment, transaction control, and cluster metadata.

TLS and SASL are configured on the ConnectionConfig, which is shared across producer, consumer, and admin:

let conn = defaultConnectionConfig
{ connBootstrap = "broker.example.com:9094"
, connUseTls = True
, connSasl = Just (SaslScram ScramSha256 "user" "pass")
}
MechanismConstructorNotes
PLAINSaslPlain user passUsername/password in the clear (use with TLS)
SCRAM-SHA-256/512SaslScram alg user passChallenge-response; password never sent in the clear
OAUTHBEARERSaslOAuthBearer tokenProviderCallback that returns a JWT
AWS MSK IAMSaslAwsMskIam region credsAWS Signature V4 for Amazon MSK

The SASL handshake runs automatically when a connection is established; you don’t need to call any auth functions manually.

Kafka.Client.Env parses standard KAFKA_* environment variables (the same names used by librdkafka and the JVM client) and overlays them onto your config. This happens automatically when you call createProducer or createConsumer.

Variables include KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL, KAFKA_SASL_MECHANISM, KAFKA_SASL_USERNAME, KAFKA_SASL_PASSWORD, KAFKA_GROUP_ID, and others.

Programmatic config always takes precedence; env vars only fill in fields you haven’t set.

Kafka.Client.Mock.Cluster provides a deterministic, in-process Kafka broker simulation. It uses STM internally and advances time via tickClock, so tests are fast and reproducible:

import Kafka.Client.Mock.Cluster
test :: IO ()
test = do
cluster <- newMockCluster
createTopic cluster "events" 3
appendToPartition cluster "events" 0 record
slice <- fetchSlice cluster "events" 0 0 100
-- verify slice contents

The mock supports consumer groups (join, leave, rebalance), transactions (begin, commit, abort, fence), leader epochs, and offset management. It does not simulate network latency or partial failures (use Kafka.Client.Mock.Fault for fault injection).

Record batches are compressed transparently based on producerCompression:

CodecFlagNotes
NoneNoCompressionDefault
GzipGzipCompressionBroad compatibility, higher CPU
SnappySnappyCompressionFast, moderate ratio
LZ4Lz4CompressionFast, good ratio (recommended for throughput)
ZstdZstdCompressionBest ratio, moderate CPU

The consumer decompresses automatically based on the batch header; no config needed on the read side.

The Serde type pairs a serializer and deserializer:

data Serde a = Serde
{ serialize :: a -> ByteString
, deserialize :: ByteString -> Either Text a
}

The HasSerde typeclass provides a default serde for a type. Built-in instances cover ByteString, Text, Int16/Int32/Int64, Word16/Word32/Word64, Float, Double, UUID, and any ToJSON/FromJSON type (via jsonSerde).

The Kafka Streams DSL (documented separately under Kafka Streams) uses HasSerde to resolve serdes automatically for stream and table types.

Kafka.Telemetry.OpenTelemetry adds semantic-convention spans for produce, consume, and admin operations. Pass your TracerProvider to the config and spans appear automatically.