Конкурентность и lag
В прошлой лекции мы научились ловить ошибки и разносить их по DLQ и retry-цепочкам. Бизнес-обработчик там был мокнутый, в нём time.Sleep(50ms) — и каждое сообщение обходилось примерно в эту цену. Один консьюмер, один поток, один for r := range batch. Пока сообщений мало — ок. Дальше начинается вот что.
Допустим, в промо-пятницу в brew.orders.v1 хлынуло 5 тысяч заказов в секунду. Готовка одного — 10 миллисекунд. Один поток kitchen-service вытянет 100 в секунду. Lag растёт линейно. Через минуту кухня отстаёт на 5 минут, через час — на час. Pipeline не вышел из строя; он просто медленный. С точки зрения мониторинга это «здоровый, но безнадёжно отстающий» консьюмер. Хуже бывает только «лежит».
Параллелить обработку. Звучит просто. На практике сразу всплывает ограничение, про которое легко забыть.
Единственная гарантия порядка — per-partition
Внутри одной партиции записи уложены в журнал и читаются строго в том порядке, в котором были записаны. Между партициями — порядок не гарантирован вообще никак. Хочешь, чтобы события одного заказа обрабатывались последовательно? Клади его order_id ключом — все записи этого ключа лягут в одну партицию. Других гарантий тут нет.
Это значит вот что. Если просто запустить пул из 16 goroutine'ов и швырять туда записи в круговую — события одного заказа могут попасть в разные goroutine'ы, и какая из них закончит первой, заранее не известно. Статус READY обработается раньше PAID — заказ уезжает курьеру неоплаченным, поддержка получает тикет. Pipeline стал быстрым и сломанным.
Два чистых способа масштабироваться, без жертвы ordering:
- Больше партиций + больше консьюмеров в группе. Каждый консьюмер берёт несколько партиций, внутри каждой партиции обрабатывает последовательно. Простой и надёжный путь, но потолок задаётся числом партиций — а его задирать нельзя бесконечно (метаданные топика растут, ребалансы тяжелеют). И partition'ы нельзя уменьшать.
- Per-key worker pool внутри одного консьюмера. Пул горутин, hash от key вычисляет worker'а. Один и тот же ключ всегда у одного worker'а, разные ключи параллелятся. Один консьюмер обслуживает ту же партицию, но внутри неё — параллельно по ключам.
Эта лекция — про второй путь. Первый штатный, у него нет подвохов кроме «не задирай партиции до бесконечности». А вот второй — с подвохами.
Per-key worker pool
Идея: PollFetches возвращает батч записей. Сами их обрабатывать перестаём — раскладываем по worker-каналам. Worker — горутина с собственным chan *kgo.Record. Выбор worker'а — hash(record.Key) % N, где N — размер пула.
func workerFor(key []byte, n int) int {
if len(key) == 0 {
return 0
}
h := fnv.New32a()
_, _ = h.Write(key)
return int(h.Sum32() % uint32(n))
}fnv32a хорош тем, что детерминированный и равномерный для типичных order_id и uuid'ов. С пустым ключом сложнее — порядок для безключевых записей и так не определён, поэтому отправляем их в worker 0; если хочется размазать нагрузку, можно вместо этого делать round-robin (для этой лекции тонкость не принципиальна).
Дальше worker'ы крутятся в простом цикле:
for r := range ch {
time.Sleep(o.workDelay)
tr.markDone(r.Topic, r.Partition, r.Offset)
processed.Add(1)
}И вот тут ловушка.
Out-of-order completion
Worker'ы заканчивают записи в произвольном порядке. Worker[3] может справиться с offset=12 быстрее, чем worker[1] — с offset=10 (записи в одной партиции с разными ключами уехали к разным worker'ам). Если в такой момент закоммитить «на верхнюю границу того, что обработали» — а это offset=12 — и упасть, на рестарте Kafka отдаст нам всё, начиная с offset=13. Запись 10 потеряна.
Поэтому commit'ить можно только «по нижней непрерывной границе»: максимальный offset O такой, что все записи <= O уже обработаны. Между ним и тем, что обработали out-of-order, — окошко «висящих» offset'ов, которые ждут, пока придут отстающие.
Делается это per-partition tracker'ом. Смысл — два поля:
next— следующий offset, который мы вернёмся читать после рестарта (то есть «не обработан»);done— set offset'ов вышеnext, которые уже готовы.
Логика markDone — добавили offset в done, потом крутим цикл: пока done содержит next, удаляем из done и инкрементируем next. Так склеиваем непрерывную полосу.
pt.done[offset] = struct{}{}
for {
if _, ok := pt.done[pt.next]; !ok {
break
}
delete(pt.done, pt.next)
pt.next++
}Если done = {10, 11, 13}, а next = 10 — после прохода next станет 12 (12 ещё не пришёл, цикл встал). Закоммитить можно offset=12. Когда придёт 12 — next прыгнет до 14. Запись 13 не потеряется и не закоммитится преждевременно.
Эту структуру держим под мьютексом. Worker'ы пишут параллельно, commit-loop читает snapshot. Лок короткий, шум на throughput незначительный.
Commit отдельно от обработки
Раз worker'ы и tracker живут в разных goroutine'ах, ручной commit логично вынести в свой тикер. Раз в commit-interval (у нас по умолчанию 500 мс) tracker отдаёт snapshot — что-то вроде {topic: {partition: EpochOffset{...}}} — и мы дёргаем cl.CommitOffsetsSync. Если за этот интервал ничего не двинулось — повторно закоммитим тот же offset, ничего страшного.
cl.CommitOffsetsSync(ctx, snap, func(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "commit error: %v\n", err)
}
})При выходе по Ctrl+C нельзя просто закрыть клиент. Порядок такой:
- Останавливаем диспетчер — больше не дозакидываем в worker-каналы.
- Закрываем worker-каналы.
- Ждём, пока worker'ы добьют то, что у них уже на руках.
- Останавливаем тикер коммитов и делаем последний sync-commit.
- И только после этого
cl.Close().
Иначе закоммитим offset раньше, чем worker'ы успели отработать — те же дубли при следующем старте.
Lag
Lag консьюмера — это разница между log end offset (LEO) партиции и committed offset группы. Сколько записей лежит в Kafka и ждёт, пока мы их прочитаем.
lag = LEO - committedЕсли lag = 0 — мы догнали голову лога. Если lag растёт — обработка не успевает за продьюсером. Если lag растёт стабильно (по графику видно прямую линию вверх) — значит throughput consumer'а меньше throughput producer'а; масштабироваться надо именно тут.
Lag меряют per-partition, а потом агрегируют. Перекос говорит о hot key или hot partition: общий lag в норме, а одна партиция отстала на миллион. Тут добавление worker'ов не поможет — лечится в ключах (см. лекцию Ключи и партиционирование и runbook в Troubleshooting runbook).
kadm.Lag в Go
В franz-go всё уже собрано: kadm.Client.Lag(ctx, group). Возвращает DescribedGroupLags — map по имени группы, внутри per-topic per-partition GroupMemberLag. Удобно, потому что под капотом считается: DescribeGroups + FetchOffsets + ListEndOffsets + сравнение. Самим городить три запроса не надо.
Наш lag-watcher — короткий цикл по тикеру:
lags, err := admin.Lag(reqCtx, group)
if err != nil {
return fmt.Errorf("admin.Lag: %w", err)
}
dl, ok := lags[group]
if !ok {
fmt.Printf("[%s] group=%q: не описана\n", now, group)
return nil
}Дальше идёт перебор dl.Lag.Sorted() — это []GroupMemberLag с полями Topic, Partition, Commit, End, Lag. Печатаем строкой вида lecture-03-05-events/2=147 (commit=8500 end=8647) — видно partition, текущий lag и пара чисел, из которых он посчитан.
Что делает наш код
Три бинарника плюс seed.
cmd/sequential/main.go — baseline. Один поток, time.Sleep(work-delay) на запись, ручной commit после батча. Печатает rate раз в секунду. Этот тот самый «здоровый, но медленный» консьюмер, на котором будет видно рост lag'а.
cmd/concurrent-pool/main.go — пул worker'ов с tracker'ом. Стартует N горутин, каждая на своём канале. Диспетчер раскладывает по hash от key. Async-commit раз в 500 мс. На том же топике с тем же work-delay должен показать N× throughput при условии, что ключей хотя бы N (иначе worker'ы упрутся в один канал).
cmd/lag-watcher/main.go — отдельный процесс. Опрашивает kadm.Lag раз в 2 секунды и печатает per-partition разрыв. Группа выбирается флагом — -group=lecture-03-05-sequential либо -group=lecture-03-05-pool.
cmd/seed/main.go — генератор нагрузки. Льёт N записей с K различными ключами через franz-go async producer с lz4-сжатием. На локальном стенде даёт десятки тысяч сообщений в секунду — больше, чем sequential успевает съесть, и достаточно, чтобы pool мог разогнаться.
Сам commit-snapshot из tracker'а — то самое, что отделяет out-of-order пул от привычного CommitRecords:
out := make(map[string]map[int32]kgo.EpochOffset, len(t.parts))
for tp, pt := range t.parts {
topic := out[tp.topic]
if topic == nil {
topic = make(map[int32]kgo.EpochOffset)
out[tp.topic] = topic
}
topic[tp.partition] = kgo.EpochOffset{Epoch: pt.epoch, Offset: pt.next}
}pt.next — это и есть «нижняя непрерывная граница плюс один», то самое значение, которое Kafka хочет видеть в OffsetCommitRequest как «отсюда продолжать после рестарта». Epoch берём из последнего виденного record'а в партиции — нужен для fence на смену лидера; если поставить -1, Kafka примет, но пропустит проверку.
Tradeoffs
Pool с tracker'ом — мощный инструмент, но не бесплатный.
Memory. Если придёт батч из 10 тысяч записей и worker'ы лагают, в done-set'ах полежит до 10 тысяч offset'ов. Пока tracker.next не догнал — занимает память. На длительных стопах одного worker'а (например, медленный downstream) done может разрастись неприлично. Защита — buffered channel с разумным размером (у нас 1024 на worker) и backpressure через PollFetches: пока канал забит, диспетчер блокируется на отправке, фетчи естественно тормозятся.
Rebalance. Когда партицию отзывают, в done для неё могут лежать обработанные offset'ы выше next. Если next ниже — мы их обработали, но не закоммитили; на новом владельце эта партиция начнёт читать с committed = next, и обработанные offset'ы выше будут перечитаны. Это at-least-once в чистом виде, но на ребалансах окно потенциальных дублей побольше, чем у sequential (там окно = 1 батч). В OnPartitionsRevoked мы дропаем tracker для уехавших партиций, чтобы не путаться при возврате; финальный commit перед drop делать опционально и отдельной процедурой.
Ordering — только per-key. Разные ключи в одной партиции внутри пула обработаются как угодно. Если бизнес-логика опирается не только на per-key, но и на «события в одной партиции вообще» — pool не подойдёт. Тогда либо больше партиций (откатываемся к sequential per-partition), либо более жёсткий пул, где запись идёт по (partition, hash(key)) — чуть дороже, но сохраняет per-partition ordering на хвостах.
Hot key. Если один ключ активнее всех других — его worker станет узким местом, остальные будут простаивать. Симптом — lag растёт, а CPU на 90% свободен. Лечится composite key (см. runbook Troubleshooting runbook) или per-record sharding c sub-key'ями.
Async commit может проиграть гонку с упадом процесса. Раз в 500 мс — это от 0 до 500 мс необработанной (но обработанной workers!) истории, которую перечитают на рестарте. Хочется меньше окно — коммиты чаще; чаще коммиты — выше нагрузка на coordinator. Стандартный tradeoff.
Прогон руками
Терминалов нужно несколько. Один — на seed (льёт записи). Второй — на sequential или pool (обработка). Третий — на lag-watcher (показывает разрыв).
make topic-create # 6 партиций (потолок параллелизма sequential = 6)
make seed-fast # SEED_MESSAGES=100000 SEED_KEYS=32 по умолчаниюДальше в один терминал:
make run-seq # work-delay=10ms, один поток → ~100 msg/secВ другой:
make run-lag # LAG_GROUP=lecture-03-05-sequential по умолчаниюЧерез минуту в lag-watcher'е увидишь per-partition lag — растёт линейно, total в десятках тысяч. Stop seq (Ctrl+C), запусти pool:
make group-delete-all # чтобы pool читал с earliest и не наследовал sequential's commits
make seed-fast # дольём ещё, если хочется
make run-pool # workers=8 по умолчаниюPool с тем же work-delay должен дать ~800 msg/sec (8 worker'ов × 10ms). Lag-watcher для группы pool'а:
make run-lag LAG_GROUP=lecture-03-05-poolLag сначала растёт (накопилось до старта), потом начинает падать. Если при ваших настройках seed льёт быстрее, чем pool обрабатывает — lag всё равно растёт, но медленнее, чем у sequential.
Чисто для проверки tracker'а — поставь WORKERS=1 пулу. Получится sequential, но через канал. Лаг будет тот же. Поставь WORKERS=16 и WORK_DELAY=50ms — в идеале 320 msg/sec, в реальности упрётся в number of unique keys × CPU; если ключей мало, многие worker'ы простаивают.
Что ещё попробовать
- Поставь
SEED_KEYS=4иWORKERS=16. Большинство worker'ов будут простаивать — параллелизм ограничен числом уникальных ключей. На метрике это будет «total throughput почти как у sequential, хоть worker'ов в 16 раз больше». - Сделай
SEED_KEYS=1. Pool превратится в sequential — все записи у одного worker'а. На графике — одна и та же линия как уcmd/sequential. - Подними
WORK_DELAY=100msобоим. Sequential: 10 msg/sec, pool с 8 worker'ами: 80 msg/sec. Видна формула throughput = workers / work-delay, ограниченная только распределением ключей. - Запусти pool в двух копиях с одной и той же группой
lecture-03-05-pool. Ребаланс отдаст партиции пополам. Каждый процесс держит свой пул из 8 worker'ов. Всего параллелизм — до 16 (если ключей хватает). - Убей пул
Ctrl+Cпосередине. Перезапусти. Черезmake group-describe-poolвидно committed offset'ы; должны быть строго ниже LEO. В худшем случае — между committed и LEO «зависшие done» — те самые перечитываемые при рестарте. - Поменяй
COMMIT_INTERVAL=2s. Чаще будет окно потерь при kill, но ниже нагрузка на coordinator. На сильно загруженных группах коммит-нагрузка реальная.
Дальше
Out-of-order tracker — это то, что в Kafka Streams и Flink делается под капотом и называется «watermark / barrier». Идея та же — продвигаем коммитный курсор по нижней непрерывной границе. В Транзакции и EOS этот же tracker превращается в часть consume-process-produce: коммит offset'а и produce'ы участвуют в одной транзакции, и тогда дублей при рестарте нет вообще. Дороже, не для каждого случая.
Лекции Consume-process-produce и Retry и DLQ deep dive опираются на то, что мы тут отделили обработку от commit'а. С транзакциями добавится ещё один шаг — SendOffsetsToTransaction, который атомарно увязывает «обработали и записали в downstream» с «закоммитили исходный offset».
Если pool с per-key sharding'ом перестаёт справляться — возвращаемся к первому варианту: больше партиций. Это нормально. Pool — оптимизация поверх Kafka-партиционирования, а не его замена.