Stream processing на Go (franz-go + Pebble)
В Stream processing: концепции мы говорили про идеи: event-time, окна, watermark, KStream/KTable. Тут пора потрогать. Возьмём живую задачу Brew: analytics-service хочет в реальном времени держать топ напитков по всей сети — что сейчас заказывают чаще всего. Stream processing'у нужно state'е — счётчики напитков где-то живут между заказами. И state этот надо переживать рестарты, иначе топ напитков рассыпается на первой же kill -9.
Беда в том, что для Go нативного Kafka Streams нет. В Java — есть библиотека, прямо от Confluent. В Go — пусто. Самые близкие штуки (Watermill, например) — это про message routing, не про stateful streams. Так что собираем руками: kafka-клиент + локальный embedded KV-store + changelog topic для durability.
В нашем случае это franz-go + Pebble + compacted-топик brew.drink-count-changelog. Получается упрощённая копия модели Kafka Streams: state живёт на диске, обновления параллельно копируются в Kafka, при потере диска state восстанавливается из changelog'а с beginning'а. Без watermark'ов, без окон по времени, без сложной топологии — просто чтобы увидеть три ключевые механики на работающем коде. Топ напитков — кумулятивный: считаем порции по всему потоку заказов, снэпшот top-N эмитим раз в flush секунд.
Зачем нам state
Stateless-обработка: пришла запись, сделал с ней что-то, записал куда-то — забыл. map, filter, flatMap. Перезапустил процесс, ничего не потерял.
Stateful — другое дело. Считаем count, sum, top-N, unique users за час. Вторая запись зависит от того, что мы видели в первой. Память где-то надо держать. Варианты на пальцах.
- In-memory-only. Просто
map[string]intв горутине. Быстро, ноль зависимостей, послеkill -9всё обнулилось. Подходит ровно для демо-скриптов. - Внешняя БД. Postgres, Redis, любой KV. Накладные на каждый инкремент — сетевой round-trip. На потоке 50k msg/sec уже больно.
- Embedded store + changelog. Пишем в локальный LSM (Pebble/RocksDB), параллельно копию изменений отправляем в compacted-топик Kafka. Производительность как у локальной БД (миллисекундные сетевые round-trip'ы пропадают), durability — кафочного уровня. Это и есть «как делает Kafka Streams».
Третий вариант мы и собираем. Pebble тут — потому что чистый Go, без CGo (RocksDB через CGo — отдельная боль на сборках). Pebble — это LSM-движок CockroachDB, на нём же построено их собственное хранилище — для нашего sandbox'а более чем достаточно.
Pebble в двух словах
LSM-дерево, embedded, key-value. API очень простой: Set, Get, Delete, итерация. Хранит на диск (по дефолту в указанную директорию), периодически сбрасывает memtable на диск. По принципам — родственник RocksDB.
Что нам важно из API:
pebble.Open(dir, opts)— открыть/создать БД на диске.db.Set(key, value, sync)— записать.db.Get(key)→(value, closer, err)— прочитать (closer.Close()обязателен после использования).db.NewIter(opts)→ итератор по всему диапазону.db.Flush()— форсировать сброс memtable на диск.
Опция pebble.Sync против pebble.NoSync решает про fsync. В нашем коде мы складываем все Set'ы одного батча в *pebble.Batch без sync'а, а потом коммитим батч с pebble.Sync — fsync один раз на батч, а не на каждую запись. На проде в комбинации с changelog'ом часто берут NoSync даже на коммит батча плюс периодический Flush: durability обеспечивает Kafka, локальный диск нужен только для скорости.
Архитектура нашего drink-count
Три топика и одна локальная директория.
brew.orders.v1— input. Заказы Brew в JSON; для каждой позицииitems[]берёмdrinkиquantityи инкрементим счётчик напитка.brew.drink-count-changelog— compacted-топик. На каждое обновление счётчика пишем(drink, current_count). Compaction в Kafka гарантирует, что для каждого ключа сохранится только последнее значение, размер не растёт линейно.brew.drink-counts— output. Раз вflushсекунд (5 по умолчанию) эмитим текущий top-N напитков как снэпшот.
И директория ./state/ — туда Pebble складывает свой LSM. Удалил директорию — потерял локальный state. Запустил cmd/changelog-restorer — восстановил с changelog'а.
Поток в одну сторону, без петель:
brew.orders.v1 ──> [drink-count] ──┬──> brew.drink-count-changelog (compact)
├──> brew.drink-counts (top-N snapshot)
└──> ./state/ (Pebble)И обратное направление, только для рестарта state'а:
brew.drink-count-changelog ──> [changelog-restorer] ──> ./state/Цикл drink-count'а
Самое важное — порядок трёх долговечных операций в одном цикле polling'а: produce в changelog, batch в Pebble и commit offset'а. Если их перепутать, можно либо потерять инкременты при краше, либо словить дубли при рестарте.
Правильный порядок: changelog → Pebble → commit offset'а. У каждого шага свой смысл.
Сначала накапливаем инкременты в in-memory overlay (без записи в Pebble) и собираем соответствующие changelog-записи. Каждый заказ разбираем чистой функцией countDrinks — она достаёт из JSON напитки и их количество:
overlay := make(map[string]uint64)
var produces []*kgo.Record
fetches.EachRecord(func(rec *kgo.Record) {
deltas, err := countDrinks(rec.Value) // map[drink]quantity из одного заказа
if err != nil {
return // битый заказ пропускаем, pipeline не роняем
}
for drink, delta := range deltas {
cur, ok := overlay[drink]
if !ok {
cur, _ = readUint64(d.store, []byte(drink))
}
cur += delta
overlay[drink] = cur
produces = append(produces, &kgo.Record{
Topic: d.changelogTopic,
Key: []byte(drink),
Value: encodeUint64(cur),
})
}
})Overlay нужен потому, что в одном батче один и тот же напиток может встретиться в нескольких заказах, и каждой changelog-записи нужен текущий бегущий счётчик, а не устаревшее значение из Pebble.
Дальше публикуем changelog одним ProduceSync, фиксируем overlay в Pebble одним batch'ем и только после этого коммитим offset'ы:
if err := d.client.ProduceSync(rpcCtx, produces...).FirstErr(); err != nil {
return fmt.Errorf("changelog produce: %w", err)
}
batch := d.store.NewBatch()
for drink, count := range overlay {
_ = batch.Set([]byte(drink), encodeUint64(count), nil)
}
if err := batch.Commit(pebble.Sync); err != nil {
return fmt.Errorf("pebble batch commit: %w", err)
}
if err := d.client.CommitUncommittedOffsets(commitCtx); err != nil {
return fmt.Errorf("commit offsets: %w", err)
}Почему такой порядок. Если бы мы сначала закоммитили offset'ы, потом писали changelog и в этой щели нас прибило бы — после рестарта drink-count считал бы себя успешно прошедшим этот батч, но в changelog'е изменений нет. Если потом потеряем Pebble и попробуем восстановиться — счётчики уедут вниз. Хуже всего, что эта потеря — тихая: никто не оповестит про счётчик, который молча занижает.
Почему changelog раньше Pebble. Если краш произойдёт между ними, в changelog уже лежат новые значения, а Pebble остался со старыми. На рестарте offset не закоммичен, поэтому тот же входной батч переобрабатывается, в changelog уезжают те же новые значения (compaction схлопнет дубликаты по ключу), и Pebble догоняется. Конечное состояние согласовано. Если бы мы писали Pebble первым и крашнулись до changelog'а, restorer из changelog'а дал бы значения ниже того, что уже лежит в Pebble — а сам Pebble на replay-е переинкрементировался бы, потому что overlay стартует с того, что уже есть в Pebble, и счётчик уехал бы на один батч вперёд.
Вся цепочка всё равно даёт at-least-once, не exactly-once. Краш между commit'ом Pebble и commit'ом offset'а на рестарте приведёт к переобработке батча — Pebble переинкрементирует, потому что overlay видит уже обновлённые значения, и changelog получит завышенные счётчики. Чтобы это убрать, нужны транзакционные семантики продьюсера на весь блок: kgo.NewGroupTransactSession плюс Begin/End(TryCommit) — лекция Consume-process-produce. Для топа напитков завышение на один-два после редкого краша — приемлемая цена: дашборд Brew от этого не врёт.
Output: top-N снэпшот
Раз в flush секунд фоновая горутина проходит по Pebble и эмитит текущий top-N напитков. Печать в stdout — для глаз, запись в brew.drink-counts — чтобы дашборд или витрина analytics-service могли это потреблять.
func (d *drinkCounter) flushTopN(ctx context.Context) error {
rows, err := d.collectAll()
sortDrinks(rows) // по count убыванию, при равенстве — по имени напитка
if len(rows) > d.topN {
rows = rows[:d.topN]
}
// печать в stdout
// ProduceSync top-N в outputTopic
}Запись в outputTopic тут — Produce без транзакции, без commit'а offset'а вместе с ним. Снэпшот топа напитков публикуется «как есть» — если он пропадёт, через 5 секунд будет следующий. Это нормальная семантика для метрических снэпшотов. Если downstream не переваривает дубли (мы могли отправить top-N и успеть сделать новый flush до того, как прошлый дошёл) — клади idempotency-key с timestamp'ом и отбрасывай старьё на consumer'е.
Compacted changelog: что и почему
brew.drink-count-changelog — топик с cleanup.policy=compact. Что это значит. Обычный топик хранит все записи до retention'а. Compacted — для каждого ключа гарантирует наличие как минимум последней записи. Старые версии того же ключа со временем удаляются compaction'ом (фоновая работа в брокере).
Зачем нам это. Drink-count видел latte тысячу раз — и тысячу раз дописал в changelog. После compaction'а в физическом логе из этой тысячи останется только одна-две последних записи (точнее зависит от тайминга и min.cleanable.dirty.ratio). Размер changelog'а растёт линейно с числом уникальных напитков, не с числом инкрементов.
Это и есть способ держать в Kafka «материализованную view» на state. По аналогии с KTable — у нас compacted-топик плюс local store, и они согласованы по последнему значению на ключ.
Топик создаётся со специальными конфигами:
docker exec kafka-1 /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka-1:9092 --create \
--topic brew.drink-count-changelog \
--config cleanup.policy=compact \
--config segment.ms=60000 \
--config min.cleanable.dirty.ratio=0.01segment.ms=60000 плюс min.cleanable.dirty.ratio=0.01 — параметры, чтобы compaction случался часто на маленьком объёме. На проде они обычно сильно больше: compaction не дешёвый.
Restore: с нуля из changelog'а
Сценарий «диск умер, Pebble пропал». Запускаем cmd/changelog-restorer. Он читает brew.drink-count-changelog с beginning'а, кладёт пары в Pebble, останавливается на high-watermark'е каждой партиции.
Сначала узнаём, докуда читать:
end, err := admin.ListEndOffsets(rpcCtx, topic)
// ...
end.Each(func(o kadm.ListedOffset) {
if o.Offset > 0 {
out[o.Partition] = o.Offset
}
})Дальше читаем без consumer-group (нам не нужен committed offset, нужен снэпшот целого compacted-лога), отслеживаем максимальный offset вручную и сравниваем:
fetches.EachRecord(func(rec *kgo.Record) {
if rec.Offset+1 > maxOffsets[rec.Partition] {
maxOffsets[rec.Partition] = rec.Offset + 1
}
if len(rec.Value) == 0 {
// tombstone — ключа больше нет
_ = store.Delete(rec.Key, pebble.NoSync)
return
}
// ... pebble.Set(key, value)
})
if reachedEnd(maxOffsets, endOffsets) {
break
}Tombstone — запись с value=nil в compacted-логе. Семантически означает «удали этот ключ, для меня его больше нет». В нашем drink-count мы tombstone никогда не пишем (счётчик напитка может только расти), но restorer всё равно их корректно обрабатывает — на случай ручных правок или будущих эволюций модели (например, напиток сняли с меню).
После того как все партиции дочитаны до end-offset'а, делаем Flush() — Pebble сбрасывает накопленное на диск. После этого можно стартовать drink-count со стандартным make run — он найдёт state на месте и продолжит с точки, в которой changelog был на момент restore'а.
Один нюанс: между моментом restore'а и моментом старта drink-count'а в changelog могли уже прилететь новые записи (если кто-то параллельно ещё пишет). Это нормально. Drink-count при старте подхватит свой last committed offset из consumer-group'ы, начнёт читать brew.orders.v1 с того же места — и заодно догонит changelog в части новых обновлений. Самосогласованность сохраняется.
Запуск
Стенд должен быть поднят (docker compose up -d из корня).
Один раз создать топики:
make topic-create-allВ одном терминале — заливать input:
make seed-ordersЦикл из десятка тестовых заказов Brew идёт в brew.orders.v1 секунду в секунду. Можно подкинуть свой заказ через kafka-console-producer.sh руками — формат JSON {"order_id":...,"shop_id":...,"items":[{"drink":...,"quantity":...}]}.
В другом терминале — drink-count:
make runКаждые 5 секунд он печатает top-10 напитков и текущее число обработанных заказов. Глянь, как растут счётчики. Поубивай его (Ctrl+C), запусти снова — счётчики продолжаются с того же значения, потому что Pebble остался на диске.
Хочешь увидеть restore — снеси директорию state и восстанови из changelog'а:
rm -rf ./state
make restore
make runПосле make restore директория ./state/ снова заполнена, и drink-count при старте найдёт свои счётчики напитков.
Прибрать после лекции:
make topic-delete-all
rm -rf ./stateКуда расти
То, что мы собрали — модель stateful processing'а на минималках. Не хватает массы вещей, и про каждую полезно знать, что её здесь нет.
- Time windows. Нашему топу напитков не нужен event-time — он считает «всё за всё время». Реальные стримы почти всегда хотят окна (например, «топ напитков за последний час», см. Stream processing: концепции). На основе нашей схемы это делается так: ключ Pebble не
drink, а<drink>:<window-start>, плюс отдельный процесс закрывает окна по watermark'у и удаляет старые ключи. - Joins. Stream-stream и stream-table join'ы — отдельная большая тема. Базово: нужно репартиционировать обе стороны по join-ключу, потом держать local cache (KTable-side) в Pebble.
- Backpressure. В нашем коде
flushLoopидёт независимо от обработки. Если поток входящих сообщений сильно опережает скорость flush'а в Kafka — буфер растёт. Для production'а:cl.PauseFetchPartitionsпри перегрузе outputTopic'а (паттерн из Доставка во внешние системы). - Exactly-once. Чтобы избавиться от дублей при крашах, нужны транзакции producer'а вокруг блока «changelog produce + Pebble update + offset commit». В franz-go v1.21.0 публичная точка входа —
kgo.NewGroupTransactSession, паттерн из Consume-process-produce. - Шардинг state'а. При большом числе партиций input'а одна нода с одним Pebble — bottleneck. Kafka Streams делит state по партициям ключа, каждая нода держит свой shard. Тут — один процесс, один state. Расширяется через consumer-group: каждый member берёт свои партиции, держит свой Pebble; changelog'ом всё равно делятся.
- Метрики и наблюдаемость. Lag входного топика, размер state'а, lag changelog-publish'а, latency flush'а top-N. Это Мониторинг и метрики.
Всё перечисленное — поверх той же базы. Pebble + changelog + грамотный порядок «changelog → state → commit». Меняется обвязка, не суть.
Что унести
- Stateful streams без state store'а — это иллюзия. В памяти всё работает, пока не упадёт; нужно либо внешнее хранилище (медленно), либо embedded + changelog (быстрее и durable).
- Pebble + compacted changelog topic — рабочая схема для Go. Не Kafka Streams, но достаточно для большинства практических задач.
- Порядок операций важнее, чем кажется. Changelog → state → commit. Любая перестановка даёт неприятную семантику (потеря или несогласованный счётчик), и эту неприятность ты заметишь сильно позже первого продакшн-инцидента.
- Compacted topic — это материализованный snapshot, не лог. Все рассуждения про retention к нему не применимы; размер ограничен числом уникальных ключей, не числом записей.
В Kafka Connect уйдём в другую сторону — Kafka Connect и декларативный ETL без своего кода. Для тех случаев, где Pebble + Go — overkill.