Stream processing in Go (franz-go + Pebble)
In Stream processing: concepts we talked about ideas: event-time, windows, watermark, KStream/KTable. Time to get hands-on. Let's take a real Brew task: analytics-service wants a live top-drinks board across the whole network — what is being ordered most right now. Stream processing needs state — drink counters live somewhere between orders. And that state must survive restarts, otherwise the top-drinks board falls apart on the first kill -9.
The problem: there is no native Kafka Streams for Go. In Java — there is a library, straight from Confluent. In Go — nothing. The closest things (Watermill, for example) are about message routing, not stateful streams. So we build by hand: a Kafka client + a local embedded KV store + a changelog topic for durability.
In our case that is franz-go + Pebble + a compacted topic brew.drink-count-changelog. The result is a simplified copy of the Kafka Streams model: state lives on disk, updates are simultaneously copied to Kafka, if the disk is lost state is restored from the changelog from the beginning. No watermarks, no time windows, no complex topology — just enough to see three key mechanics on working code. The top-drinks board is cumulative: we count servings across the whole order stream and emit a top-N snapshot every flush seconds.
Why we need state
Stateless processing: a record arrives, you do something with it, write it somewhere — forget it. map, filter, flatMap. Restart the process, nothing is lost.
Stateful is different. We compute count, sum, top-N, unique users per hour. The second record depends on what we saw in the first. Memory has to be stored somewhere. Options at a glance.
- In-memory only. Just a
map[string]intin a goroutine. Fast, zero dependencies, afterkill -9everything resets to zero. Suitable strictly for demo scripts. - External DB. Postgres, Redis, any KV. Overhead on every increment — a network round-trip. At 50k msg/sec on the stream that already hurts.
- Embedded store + changelog. Write to a local LSM (Pebble/RocksDB), simultaneously send a copy of changes to a compacted Kafka topic. Performance like a local DB (millisecond network round-trips disappear), durability at Kafka level. This is exactly "what Kafka Streams does".
We build the third option. Pebble here — because it is pure Go, no CGo (RocksDB via CGo is its own build-time pain). Pebble is CockroachDB's own LSM engine, the foundation of their storage layer — more than enough for our sandbox.
Pebble in brief
LSM tree, embedded, key-value. The API is very simple: Set, Get, Delete, iteration. Writes to disk (by default to the specified directory), flushes the memtable to disk periodically. By design — a relative of RocksDB.
What matters to us from the API:
pebble.Open(dir, opts)— open or create a DB on disk.db.Set(key, value, sync)— write.db.Get(key)→(value, closer, err)— read (closer.Close()is required after use).db.NewIter(opts)→ iterator over the full range.db.Flush()— force the memtable to disk.
The pebble.Sync vs pebble.NoSync option controls fsync. In our code we collect all of a polling round's Sets into a *pebble.Batch without sync, then commit the batch with pebble.Sync — one fsync per batch instead of one per record. In production, combined with a changelog, teams often use NoSync even on batch commit plus a periodic Flush: durability is provided by Kafka, the local disk is only needed for speed.
Architecture of our drink-count
Three topics and one local directory.
brew.orders.v1— input. Brew orders in JSON; for eachitems[]line we takedrinkandquantityand increment the drink counter.brew.drink-count-changelog— compacted topic. For every counter update we write(drink, current_count). Compaction in Kafka guarantees that only the latest value is retained per key, so size does not grow linearly.brew.drink-counts— output. Everyflushseconds (5 by default) we emit the current top-N drinks as a snapshot.
And the ./state/ directory — Pebble stores its LSM there. Delete the directory — lose local state. Run cmd/changelog-restorer — restore from the changelog.
One-way flow, no loops:
brew.orders.v1 ──> [drink-count] ──┬──> brew.drink-count-changelog (compact)
├──> brew.drink-counts (top-N snapshot)
└──> ./state/ (Pebble)And the reverse direction, only for state restart:
brew.drink-count-changelog ──> [changelog-restorer] ──> ./state/The drink-count loop
The most important thing — the order of three durable writes in one polling round: the changelog produce, the Pebble batch, and the offset commit. Swap them and you either lose increments on a crash, or get duplicates on restart.
Correct order: changelog → Pebble → offset commit. Each step has a reason.
First, accumulate increments in an in-memory overlay (no Pebble writes yet) and build the matching changelog records. Each order is parsed by the pure function countDrinks, which pulls the drinks and their quantities out of the JSON:
overlay := make(map[string]uint64)
var produces []*kgo.Record
fetches.EachRecord(func(rec *kgo.Record) {
deltas, err := countDrinks(rec.Value) // map[drink]quantity from one order
if err != nil {
return // skip a malformed order, don't crash the pipeline
}
for drink, delta := range deltas {
cur, ok := overlay[drink]
if !ok {
cur, _ = readUint64(d.store, []byte(drink))
}
cur += delta
overlay[drink] = cur
produces = append(produces, &kgo.Record{
Topic: d.changelogTopic,
Key: []byte(drink),
Value: encodeUint64(cur),
})
}
})The overlay matters: within one batch the same drink can appear in several orders, and we need every produce to carry the running counter, not the stale Pebble value.
Then publish the changelog in one ProduceSync, persist the overlay to Pebble in one batch, and only after that commit the offsets:
if err := d.client.ProduceSync(rpcCtx, produces...).FirstErr(); err != nil {
return fmt.Errorf("changelog produce: %w", err)
}
batch := d.store.NewBatch()
for drink, count := range overlay {
_ = batch.Set([]byte(drink), encodeUint64(count), nil)
}
if err := batch.Commit(pebble.Sync); err != nil {
return fmt.Errorf("pebble batch commit: %w", err)
}
if err := d.client.CommitUncommittedOffsets(commitCtx); err != nil {
return fmt.Errorf("commit offsets: %w", err)
}Why this order. If we committed offsets first and then wrote the changelog — and were killed in that gap — after restart drink-count would consider that batch processed, but the changelog has no record of it. Then if we lose Pebble and try to restore — counters come back lower. The loss is silent: nobody alerts you on a counter that quietly underreports.
Why changelog before Pebble. If a crash hits between them, the changelog has the new values and Pebble has the old ones. On restart, the offset has not been committed, so reprocessing the same input batch produces the same new values, the changelog gets duplicate writes for the same keys (compaction collapses them later), and Pebble catches up. End state is consistent. If we had written Pebble first and crashed before the changelog, restorer from changelog would yield older values than Pebble — and Pebble itself would re-increment on replay because the overlay starts from whatever Pebble already has, inflating the counter by one batch.
The whole pipeline still gives at-least-once, not exactly-once. A crash after Pebble commit but before offset commit will reprocess the batch on restart — Pebble re-increments because the overlay sees the already-updated values, and the changelog gets the inflated counts. To eliminate that, you need transactional producer semantics around the whole block: kgo.NewGroupTransactSession plus Begin/End(TryCommit) — see Consume-process-produce. For the top-drinks board, an inflation of one or two on a rare crash is an acceptable trade: it does not mislead the Brew dashboard.
Output: top-N snapshot
Every flush seconds a background goroutine walks Pebble and emits the current top-N drinks. Print to stdout — for human eyes; write to brew.drink-counts — so the analytics-service dashboard or mart can consume it.
func (d *drinkCounter) flushTopN(ctx context.Context) error {
rows, err := d.collectAll()
sortDrinks(rows) // by count descending, ties broken by drink name
if len(rows) > d.topN {
rows = rows[:d.topN]
}
// print to stdout
// ProduceSync top-N to outputTopic
}Writing to outputTopic here is a Produce without a transaction, without a combined offset commit. The top-drinks snapshot is published as-is — if it is lost, the next one will arrive in 5 seconds. This is normal semantics for metric snapshots. If downstream cannot handle duplicates (we may have sent top-N and then triggered a new flush before the previous one was acknowledged) — add an idempotency key with a timestamp and discard stale entries on the consumer.
Compacted changelog: what and why
brew.drink-count-changelog is a topic with cleanup.policy=compact. What that means. A regular topic retains all records until retention expires. A compacted topic guarantees at least the latest record for every key. Older versions of the same key are eventually removed by compaction (a background process in the broker).
Why we need this. Drink-count has seen latte a thousand times — and written to the changelog a thousand times. After compaction, only the last one or two records remain in the physical log out of that thousand (the exact number depends on timing and min.cleanable.dirty.ratio). The changelog size grows linearly with the number of unique drinks, not with the number of increments.
This is how you keep a "materialized view" of state in Kafka. By analogy with KTable — we have a compacted topic plus a local store, and they agree on the latest value per key.
The topic is created with specific configs:
docker exec kafka-1 /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka-1:9092 --create \
--topic brew.drink-count-changelog \
--config cleanup.policy=compact \
--config segment.ms=60000 \
--config min.cleanable.dirty.ratio=0.01segment.ms=60000 plus min.cleanable.dirty.ratio=0.01 — parameters to make compaction happen frequently on small volumes. In production they are typically much larger: compaction is not cheap.
Restore: from scratch via the changelog
The scenario: disk died, Pebble is gone. Run cmd/changelog-restorer. It reads brew.drink-count-changelog from the beginning, puts the pairs into Pebble, and stops at the high-watermark of each partition.
First, determine how far to read:
end, err := admin.ListEndOffsets(rpcCtx, topic)
// ...
end.Each(func(o kadm.ListedOffset) {
if o.Offset > 0 {
out[o.Partition] = o.Offset
}
})Then read without a consumer group (we do not need a committed offset, we need a snapshot of the entire compacted log), track the maximum offset manually, and compare:
fetches.EachRecord(func(rec *kgo.Record) {
if rec.Offset+1 > maxOffsets[rec.Partition] {
maxOffsets[rec.Partition] = rec.Offset + 1
}
if len(rec.Value) == 0 {
// tombstone — key no longer exists
_ = store.Delete(rec.Key, pebble.NoSync)
return
}
// ... pebble.Set(key, value)
})
if reachedEnd(maxOffsets, endOffsets) {
break
}A tombstone is a record with value=nil in the compacted log. It means "delete this key, it no longer exists for me." In our drink-count we never write tombstones (a drink counter can only increase), but the restorer handles them correctly regardless — in case of manual edits or future model changes (for example, a drink pulled from the menu).
After all partitions are read up to the end offset, call Flush() — Pebble flushes accumulated data to disk. After that you can start drink-count with the standard make run — it will find state in place and continue from the point at which the changelog was at restore time.
One detail: between the restore moment and the drink-count start new records may have already arrived in the changelog (if something else is writing in parallel). That is fine. Drink-count at startup picks up its last committed offset from the consumer group, starts reading brew.orders.v1 from that point — and also catches up with the changelog for any new updates. Self-consistency is preserved.
Running
The sandbox must be running (docker compose up -d from the root).
Create topics once:
make topic-create-allIn one terminal — feed input:
make seed-ordersA loop of a dozen test Brew orders goes into brew.orders.v1 once per second. You can also push a custom order via kafka-console-producer.sh manually — JSON format {"order_id":...,"shop_id":...,"items":[{"drink":...,"quantity":...}]}.
In another terminal — drink-count:
make runEvery 5 seconds it prints the top-10 drinks and the current number of processed orders. Watch the counters grow. Kill it (Ctrl+C), start again — counters continue from the same value because Pebble remained on disk.
To see restore — delete the state directory and restore from the changelog:
rm -rf ./state
make restore
make runAfter make restore the ./state/ directory is populated again, and drink-count finds its drink counters at startup.
Clean up after the lesson:
make topic-delete-all
rm -rf ./stateWhere to go next
What we built is a stateful processing model at minimum viable complexity. Many things are missing, and it is useful to know that each of them is absent here.
- Time windows. Our top-drinks board does not need event-time — it counts "everything over all time." Real streams almost always want windows (for example, "top drinks in the last hour", see Stream processing: concepts). On top of our scheme this looks like: the Pebble key is not
drinkbut<drink>:<window-start>, plus a separate process closes windows by watermark and deletes old keys. - Joins. Stream-stream and stream-table joins are a large separate topic. Briefly: both sides need to be repartitioned by the join key, then a local cache (KTable-side) must be held in Pebble.
- Backpressure. In our code
flushLoopruns independently of processing. If the incoming message rate greatly exceeds the flush rate to Kafka — the buffer grows. For production:cl.PauseFetchPartitionson outputTopic overload (pattern from Delivery to external systems). - Exactly-once. To eliminate duplicates on crashes, you need producer transactions wrapping the "changelog produce + Pebble update + offset commit" block. In franz-go v1.21.0 the public entry point is
kgo.NewGroupTransactSession— see Consume-process-produce. - State sharding. With a large number of input partitions, a single node with a single Pebble is a bottleneck. Kafka Streams splits state by key partition; each node holds its own shard. Here — one process, one state. Scales via consumer group: each member takes its partitions and holds its own Pebble; the changelog is still shared.
- Metrics and observability. Input topic lag, state size, changelog-publish lag, top-N flush latency. That is Monitoring and metrics.
Everything listed is built on the same foundation. Pebble + changelog + the correct order of "changelog → state → commit." The surrounding machinery changes, not the essence.
Key takeaways
- Stateful streams without a state store are an illusion. In-memory works until it crashes; you need either external storage (slow) or embedded + changelog (faster and durable).
- Pebble + compacted changelog topic — a working scheme for Go. Not Kafka Streams, but sufficient for most practical tasks.
- Operation order matters more than it seems. Changelog → state → commit. Any reordering produces bad semantics (lost or inconsistent counter), and you will notice that in production long after the first incident.
- A compacted topic is a materialized snapshot, not a log. All reasoning about retention does not apply to it; size is bounded by the number of unique keys, not the number of records.
In Kafka Connect we go in a different direction — Kafka Connect and declarative ETL without custom code. For cases where Pebble + Go is overkill.