Kafka CookbookКонсьюмерКонкурентность и lag
0 / 42 (0%)

Конкурентность и lag

В прошлой лекции мы научились ловить ошибки и разносить их по DLQ и retry-цепочкам. Бизнес-обработчик там был мокнутый, в нём time.Sleep(50ms) — и каждое сообщение обходилось примерно в эту цену. Один консьюмер, один поток, один for r := range batch. Пока сообщений мало — ок. Дальше начинается вот что.

Допустим, в промо-пятницу в brew.orders.v1 хлынуло 5 тысяч заказов в секунду. Готовка одного — 10 миллисекунд. Один поток kitchen-service вытянет 100 в секунду. Lag растёт линейно. Через минуту кухня отстаёт на 5 минут, через час — на час. Pipeline не вышел из строя; он просто медленный. С точки зрения мониторинга это «здоровый, но безнадёжно отстающий» консьюмер. Хуже бывает только «лежит».

Параллелить обработку. Звучит просто. На практике сразу всплывает ограничение, про которое легко забыть.

Единственная гарантия порядка — per-partition

Внутри одной партиции записи уложены в журнал и читаются строго в том порядке, в котором были записаны. Между партициями — порядок не гарантирован вообще никак. Хочешь, чтобы события одного заказа обрабатывались последовательно? Клади его order_id ключом — все записи этого ключа лягут в одну партицию. Других гарантий тут нет.

Это значит вот что. Если просто запустить пул из 16 goroutine'ов и швырять туда записи в круговую — события одного заказа могут попасть в разные goroutine'ы, и какая из них закончит первой, заранее не известно. Статус READY обработается раньше PAID — заказ уезжает курьеру неоплаченным, поддержка получает тикет. Pipeline стал быстрым и сломанным.

Два чистых способа масштабироваться, без жертвы ordering:

  1. Больше партиций + больше консьюмеров в группе. Каждый консьюмер берёт несколько партиций, внутри каждой партиции обрабатывает последовательно. Простой и надёжный путь, но потолок задаётся числом партиций — а его задирать нельзя бесконечно (метаданные топика растут, ребалансы тяжелеют). И partition'ы нельзя уменьшать.
  2. Per-key worker pool внутри одного консьюмера. Пул горутин, hash от key вычисляет worker'а. Один и тот же ключ всегда у одного worker'а, разные ключи параллелятся. Один консьюмер обслуживает ту же партицию, но внутри неё — параллельно по ключам.

Эта лекция — про второй путь. Первый штатный, у него нет подвохов кроме «не задирай партиции до бесконечности». А вот второй — с подвохами.

Per-key worker pool

Идея: PollFetches возвращает батч записей. Сами их обрабатывать перестаём — раскладываем по worker-каналам. Worker — горутина с собственным chan *kgo.Record. Выбор worker'а — hash(record.Key) % N, где N — размер пула.

go
func workerFor(key []byte, n int) int {
    if len(key) == 0 {
        return 0
    }
    h := fnv.New32a()
    _, _ = h.Write(key)
    return int(h.Sum32() % uint32(n))
}

fnv32a хорош тем, что детерминированный и равномерный для типичных order_id и uuid'ов. С пустым ключом сложнее — порядок для безключевых записей и так не определён, поэтому отправляем их в worker 0; если хочется размазать нагрузку, можно вместо этого делать round-robin (для этой лекции тонкость не принципиальна).

Дальше worker'ы крутятся в простом цикле:

go
for r := range ch {
    time.Sleep(o.workDelay)
    tr.markDone(r.Topic, r.Partition, r.Offset)
    processed.Add(1)
}

И вот тут ловушка.

Out-of-order completion

Worker'ы заканчивают записи в произвольном порядке. Worker[3] может справиться с offset=12 быстрее, чем worker[1] — с offset=10 (записи в одной партиции с разными ключами уехали к разным worker'ам). Если в такой момент закоммитить «на верхнюю границу того, что обработали» — а это offset=12 — и упасть, на рестарте Kafka отдаст нам всё, начиная с offset=13. Запись 10 потеряна.

Поэтому commit'ить можно только «по нижней непрерывной границе»: максимальный offset O такой, что все записи <= O уже обработаны. Между ним и тем, что обработали out-of-order, — окошко «висящих» offset'ов, которые ждут, пока придут отстающие.

Делается это per-partition tracker'ом. Смысл — два поля:

  • next — следующий offset, который мы вернёмся читать после рестарта (то есть «не обработан»);
  • done — set offset'ов выше next, которые уже готовы.

Логика markDone — добавили offset в done, потом крутим цикл: пока done содержит next, удаляем из done и инкрементируем next. Так склеиваем непрерывную полосу.

go
pt.done[offset] = struct{}{}
for {
    if _, ok := pt.done[pt.next]; !ok {
        break
    }
    delete(pt.done, pt.next)
    pt.next++
}

Если done = {10, 11, 13}, а next = 10 — после прохода next станет 12 (12 ещё не пришёл, цикл встал). Закоммитить можно offset=12. Когда придёт 12 — next прыгнет до 14. Запись 13 не потеряется и не закоммитится преждевременно.

Эту структуру держим под мьютексом. Worker'ы пишут параллельно, commit-loop читает snapshot. Лок короткий, шум на throughput незначительный.

Commit отдельно от обработки

Раз worker'ы и tracker живут в разных goroutine'ах, ручной commit логично вынести в свой тикер. Раз в commit-interval (у нас по умолчанию 500 мс) tracker отдаёт snapshot — что-то вроде {topic: {partition: EpochOffset{...}}} — и мы дёргаем cl.CommitOffsetsSync. Если за этот интервал ничего не двинулось — повторно закоммитим тот же offset, ничего страшного.

go
cl.CommitOffsetsSync(ctx, snap, func(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
    if err != nil && !errors.Is(err, context.Canceled) {
        fmt.Fprintf(os.Stderr, "commit error: %v\n", err)
    }
})

При выходе по Ctrl+C нельзя просто закрыть клиент. Порядок такой:

  1. Останавливаем диспетчер — больше не дозакидываем в worker-каналы.
  2. Закрываем worker-каналы.
  3. Ждём, пока worker'ы добьют то, что у них уже на руках.
  4. Останавливаем тикер коммитов и делаем последний sync-commit.
  5. И только после этого cl.Close().

Иначе закоммитим offset раньше, чем worker'ы успели отработать — те же дубли при следующем старте.

Lag

Lag консьюмера — это разница между log end offset (LEO) партиции и committed offset группы. Сколько записей лежит в Kafka и ждёт, пока мы их прочитаем.

plaintext
lag = LEO - committed

Если lag = 0 — мы догнали голову лога. Если lag растёт — обработка не успевает за продьюсером. Если lag растёт стабильно (по графику видно прямую линию вверх) — значит throughput consumer'а меньше throughput producer'а; масштабироваться надо именно тут.

Lag меряют per-partition, а потом агрегируют. Перекос говорит о hot key или hot partition: общий lag в норме, а одна партиция отстала на миллион. Тут добавление worker'ов не поможет — лечится в ключах (см. лекцию Ключи и партиционирование и runbook в Troubleshooting runbook).

kadm.Lag в Go

В franz-go всё уже собрано: kadm.Client.Lag(ctx, group). Возвращает DescribedGroupLags — map по имени группы, внутри per-topic per-partition GroupMemberLag. Удобно, потому что под капотом считается: DescribeGroups + FetchOffsets + ListEndOffsets + сравнение. Самим городить три запроса не надо.

Наш lag-watcher — короткий цикл по тикеру:

go
lags, err := admin.Lag(reqCtx, group)
if err != nil {
    return fmt.Errorf("admin.Lag: %w", err)
}
dl, ok := lags[group]
if !ok {
    fmt.Printf("[%s] group=%q: не описана\n", now, group)
    return nil
}

Дальше идёт перебор dl.Lag.Sorted() — это []GroupMemberLag с полями Topic, Partition, Commit, End, Lag. Печатаем строкой вида lecture-03-05-events/2=147 (commit=8500 end=8647) — видно partition, текущий lag и пара чисел, из которых он посчитан.

Что делает наш код

Три бинарника плюс seed.

cmd/sequential/main.go — baseline. Один поток, time.Sleep(work-delay) на запись, ручной commit после батча. Печатает rate раз в секунду. Этот тот самый «здоровый, но медленный» консьюмер, на котором будет видно рост lag'а.

cmd/concurrent-pool/main.go — пул worker'ов с tracker'ом. Стартует N горутин, каждая на своём канале. Диспетчер раскладывает по hash от key. Async-commit раз в 500 мс. На том же топике с тем же work-delay должен показать throughput при условии, что ключей хотя бы N (иначе worker'ы упрутся в один канал).

cmd/lag-watcher/main.go — отдельный процесс. Опрашивает kadm.Lag раз в 2 секунды и печатает per-partition разрыв. Группа выбирается флагом — -group=lecture-03-05-sequential либо -group=lecture-03-05-pool.

cmd/seed/main.go — генератор нагрузки. Льёт N записей с K различными ключами через franz-go async producer с lz4-сжатием. На локальном стенде даёт десятки тысяч сообщений в секунду — больше, чем sequential успевает съесть, и достаточно, чтобы pool мог разогнаться.

Сам commit-snapshot из tracker'а — то самое, что отделяет out-of-order пул от привычного CommitRecords:

go
out := make(map[string]map[int32]kgo.EpochOffset, len(t.parts))
for tp, pt := range t.parts {
    topic := out[tp.topic]
    if topic == nil {
        topic = make(map[int32]kgo.EpochOffset)
        out[tp.topic] = topic
    }
    topic[tp.partition] = kgo.EpochOffset{Epoch: pt.epoch, Offset: pt.next}
}

pt.next — это и есть «нижняя непрерывная граница плюс один», то самое значение, которое Kafka хочет видеть в OffsetCommitRequest как «отсюда продолжать после рестарта». Epoch берём из последнего виденного record'а в партиции — нужен для fence на смену лидера; если поставить -1, Kafka примет, но пропустит проверку.

Tradeoffs

Pool с tracker'ом — мощный инструмент, но не бесплатный.

Memory. Если придёт батч из 10 тысяч записей и worker'ы лагают, в done-set'ах полежит до 10 тысяч offset'ов. Пока tracker.next не догнал — занимает память. На длительных стопах одного worker'а (например, медленный downstream) done может разрастись неприлично. Защита — buffered channel с разумным размером (у нас 1024 на worker) и backpressure через PollFetches: пока канал забит, диспетчер блокируется на отправке, фетчи естественно тормозятся.

Rebalance. Когда партицию отзывают, в done для неё могут лежать обработанные offset'ы выше next. Если next ниже — мы их обработали, но не закоммитили; на новом владельце эта партиция начнёт читать с committed = next, и обработанные offset'ы выше будут перечитаны. Это at-least-once в чистом виде, но на ребалансах окно потенциальных дублей побольше, чем у sequential (там окно = 1 батч). В OnPartitionsRevoked мы дропаем tracker для уехавших партиций, чтобы не путаться при возврате; финальный commit перед drop делать опционально и отдельной процедурой.

Ordering — только per-key. Разные ключи в одной партиции внутри пула обработаются как угодно. Если бизнес-логика опирается не только на per-key, но и на «события в одной партиции вообще» — pool не подойдёт. Тогда либо больше партиций (откатываемся к sequential per-partition), либо более жёсткий пул, где запись идёт по (partition, hash(key)) — чуть дороже, но сохраняет per-partition ordering на хвостах.

Hot key. Если один ключ активнее всех других — его worker станет узким местом, остальные будут простаивать. Симптом — lag растёт, а CPU на 90% свободен. Лечится composite key (см. runbook Troubleshooting runbook) или per-record sharding c sub-key'ями.

Async commit может проиграть гонку с упадом процесса. Раз в 500 мс — это от 0 до 500 мс необработанной (но обработанной workers!) истории, которую перечитают на рестарте. Хочется меньше окно — коммиты чаще; чаще коммиты — выше нагрузка на coordinator. Стандартный tradeoff.

Прогон руками

Терминалов нужно несколько. Один — на seed (льёт записи). Второй — на sequential или pool (обработка). Третий — на lag-watcher (показывает разрыв).

sh
make topic-create          # 6 партиций (потолок параллелизма sequential = 6)
make seed-fast             # SEED_MESSAGES=100000 SEED_KEYS=32 по умолчанию

Дальше в один терминал:

sh
make run-seq               # work-delay=10ms, один поток → ~100 msg/sec

В другой:

sh
make run-lag               # LAG_GROUP=lecture-03-05-sequential по умолчанию

Через минуту в lag-watcher'е увидишь per-partition lag — растёт линейно, total в десятках тысяч. Stop seq (Ctrl+C), запусти pool:

sh
make group-delete-all      # чтобы pool читал с earliest и не наследовал sequential's commits
make seed-fast             # дольём ещё, если хочется
make run-pool              # workers=8 по умолчанию

Pool с тем же work-delay должен дать ~800 msg/sec (8 worker'ов × 10ms). Lag-watcher для группы pool'а:

sh
make run-lag LAG_GROUP=lecture-03-05-pool

Lag сначала растёт (накопилось до старта), потом начинает падать. Если при ваших настройках seed льёт быстрее, чем pool обрабатывает — lag всё равно растёт, но медленнее, чем у sequential.

Чисто для проверки tracker'а — поставь WORKERS=1 пулу. Получится sequential, но через канал. Лаг будет тот же. Поставь WORKERS=16 и WORK_DELAY=50ms — в идеале 320 msg/sec, в реальности упрётся в number of unique keys × CPU; если ключей мало, многие worker'ы простаивают.

Что ещё попробовать

  • Поставь SEED_KEYS=4 и WORKERS=16. Большинство worker'ов будут простаивать — параллелизм ограничен числом уникальных ключей. На метрике это будет «total throughput почти как у sequential, хоть worker'ов в 16 раз больше».
  • Сделай SEED_KEYS=1. Pool превратится в sequential — все записи у одного worker'а. На графике — одна и та же линия как у cmd/sequential.
  • Подними WORK_DELAY=100ms обоим. Sequential: 10 msg/sec, pool с 8 worker'ами: 80 msg/sec. Видна формула throughput = workers / work-delay, ограниченная только распределением ключей.
  • Запусти pool в двух копиях с одной и той же группой lecture-03-05-pool. Ребаланс отдаст партиции пополам. Каждый процесс держит свой пул из 8 worker'ов. Всего параллелизм — до 16 (если ключей хватает).
  • Убей пул Ctrl+C посередине. Перезапусти. Через make group-describe-pool видно committed offset'ы; должны быть строго ниже LEO. В худшем случае — между committed и LEO «зависшие done» — те самые перечитываемые при рестарте.
  • Поменяй COMMIT_INTERVAL=2s. Чаще будет окно потерь при kill, но ниже нагрузка на coordinator. На сильно загруженных группах коммит-нагрузка реальная.

Дальше

Out-of-order tracker — это то, что в Kafka Streams и Flink делается под капотом и называется «watermark / barrier». Идея та же — продвигаем коммитный курсор по нижней непрерывной границе. В Транзакции и EOS этот же tracker превращается в часть consume-process-produce: коммит offset'а и produce'ы участвуют в одной транзакции, и тогда дублей при рестарте нет вообще. Дороже, не для каждого случая.

Лекции Consume-process-produce и Retry и DLQ deep dive опираются на то, что мы тут отделили обработку от commit'а. С транзакциями добавится ещё один шаг — SendOffsetsToTransaction, который атомарно увязывает «обработали и записали в downstream» с «закоммитили исходный offset».

Если pool с per-key sharding'ом перестаёт справляться — возвращаемся к первому варианту: больше партиций. Это нормально. Pool — оптимизация поверх Kafka-партиционирования, а не его замена.

·Модуль 03

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Консьюмер / Конкурентность и lag