Kafka CookbookКонсьюмерКоммиты offset'ов
0 / 42 (0%)

Коммиты offset'ов

В прошлой лекции мы запустили несколько копий консьюмера в одной группе и смотрели, как Kafka делит партиции. Кода обработки сообщений там почти не было — только PollFetches и printf. Тут пойдём глубже: что значит «прочитать сообщение», когда консьюмер сообщает Kafka, что «всё, я с этим offset'ом разобрался», и как это незаметно становится главным источником багов. Для kitchen-service это вопрос денег: ошибётся с моментом коммита offset'а по brew.orders.v1 — либо потеряет заказы (приготовить забыли), либо приготовит дубли (одно и то же дважды).

Что такое committed offset

У каждой группы внутри Kafka хранится таблица: «для группы G, топика T, партиции P последний прочитанный offset — N». Эта таблица — обычный compact-топик __consumer_offsets (compact, потому что нам нужна только последняя запись на ключ (group, topic, partition)). Когда консьюмер запускается, он спрашивает у coordinator: «с какого offset мне начать?». Coordinator смотрит в __consumer_offsets и отвечает.

Committed offset N означает простое: «всё, что лежит до offset N (не включая) — для группы G обработано». Следующий poll отдаст N. Если консьюмер падает, его поднимают заново, и он стартует с того же N. Чем точнее этот N отражает реально сделанную работу — тем меньше у тебя дублей или потерь после рестарта.

Самое неочевидное здесь — N это не «получил по сети», и не «положил в локальный буфер». Это твоё обещание брокеру. Дашь обещание раньше, чем реально обработал — потеряешь сообщения. Дашь позже, чем обработал — получишь дубли при следующем рестарте. Третьего нет.

Auto-commit и почему он lying-by-default

Дефолтное поведение franz-go (как и большинства клиентов) — auto-commit включён. Каждые 5 секунд (AutoCommitInterval) фоновая горутина забирает offset, который вернул последний PollFetches, и шлёт его на брокер. Звучит удобно: ничего вызывать не надо, всё само.

Тонкость в формулировке. Auto-commit коммитит то, что получил твой код через PollFetches, а не то, что обработал. Это разные вещи. Между «получил» и «реально обработал» лежит твоя бизнес-логика, которая может занимать секунды, минуты, часы. И если auto-commit пнул брокер «прочитано до 200» в момент, когда приложение реально дошло только до 150 — а потом приложение упало — после рестарта консьюмер получит offset 200 как стартовую точку. Записи 150–199 потеряны. Это at-most-once с тихой потерей данных, и ты её даже не увидишь без специального теста.

Обратная история. Auto-commit ещё не успел сработать, ты обработал 150 записей и упал. Committed offset так и остался на 0 (или где он был при старте). Рестарт — и группа тебе отдаёт всё с самого начала. Дубли. Это тот самый «at-least-once с дублями», про который много пишут.

Какой из двух вариантов случится — зависит от тайминга. Запустился, поллил 200 → начал обрабатывать → через 5 секунд auto-commit (committed=200) → упал на 150 → рестарт → потери. Или: запустился → поллил → упал на 50 → committed остался на 0 → рестарт → дубли с 0 по 49. Лотерея.

В этой лекции мы воспроизводим именно второй сценарий (дубли) — он показательнее. Демка пишет каждое обработанное сообщение в файл processed-auto.log, потом мы запускаем консьюмер с crash-after, он валится без коммита, потом мы перезапускаем и видим, что часть offset'ов в логе появилась дважды. Если хочешь увидеть first сценарий с потерями — увеличь WORK_DELAY так, чтобы цикл точно пережил 5+ секунд auto-commit'а, и крашни уже после. Поведение симметричное и одинаково плохое.

Manual sync — гарантии в обмен на латенси

Лечение очевидно: коммитить тогда, когда обработка реально закончилась. Самое прямое — kgo.DisableAutoCommit() плюс ручной cl.CommitRecords(ctx, records...) после батча.

Семантика чёткая. Поллим батч. Обрабатываем все записи в нём. Если обработка дошла до конца — коммитим весь батч одним вызовом. Если упали в середине — коммита не было, на рестарте получим тот же батч заново. Дубли возможны (это всё ещё at-least-once), но окно ровно один батч, не вся сессия с момента предыдущего auto-commit'а.

Цена — CommitRecords ходит на брокер и ждёт ack. Это блокирующий вызов внутри poll-цикла. На каждом батче +5–20 мс, иногда больше. Если батчи мелкие (пара записей), оверхед заметный. Если батчи нормальные (сотни), цена размазана.

Ещё одна тонкость — порядок. CommitRecords берёт максимальный offset на партицию из переданных записей и коммитит его. Если ты вызываешь его не последовательно по растущим offset'ам — можешь откатить commit назад. На практике это означает: коммить весь батч сразу одним вызовом и не разбивай.

Manual async — компромисс через mark + flush

Sync-commit на каждый батч — это блокировка цикла. Можно дешевле, если не ждать ack синхронно.

Идея — kgo.AutoCommitMarks(). Этот режим говорит franz-go: «обычный auto-commit отключён; коммить только то, что я пометил как обработанное через MarkCommitRecords». Получается гибрид: ты сам отмечаешь каждую обработанную запись, а фоновая горутина периодически синкает помеченные offset'ы на брокер.

Гарантия другая. Mark — это локальный state в клиенте: «эту запись я считаю обработанной, можно коммитить». Flush этого state на брокер происходит либо асинхронно по таймеру (AutoCommitInterval), либо явно через cl.CommitMarkedOffsets(ctx). Между MarkCommitRecords и реальным flush'ем — окно потерь. Если процесс упадёт в этом окне, на рестарте получишь дубли в размере окна.

Размер окна задаётся AutoCommitInterval. Поставил 200 мс — окно 200 мс работы, обычно это десятки сообщений на низком/среднем рейте. Поставил 5 секунд — окно сильно больше. Чтобы окно было предсказуемым на выходе из батча, имеет смысл руками вызывать CommitMarkedOffsets между батчами — это и быстро (если фоновый flush уже всё разлил, вызов почти ничего не делает), и гарантирует «по выходу из батча всё помечённое ушло».

В нашей демке именно так и сделано: MarkCommitRecords на каждой записи + CommitMarkedOffsets после батча. Между этими двумя точками работает фоновый таймер AutoCommitInterval, поэтому на длинных батчах он успевает ослабить нагрузку и часть commit'ов происходит в фоне, а в конце батча у нас всегда честный sync-flush.

Как это выглядит в коде

Дизайн всех трёх главных циклов одинаковый: открыть лог-файл для записи processed-сообщений, запустить poll-цикл, на каждое сообщение «работать» (sleep + лог), при crash-afteros.Exit(1) без Close. Различается только то, что мы делаем с offset'ами.

auto-commit/main.go — вообще ничего: дефолтный auto-commit делает работу за нас.

go
opts := []kgo.Opt{
    kgo.ConsumerGroup(o.group),
    kgo.ConsumeTopics(o.topic),
    kgo.AutoCommitInterval(o.commitEvery),  // на демке = 2s
    // ...
}

manual-sync/main.go — выключаем auto-commit и явно коммитим батч.

go
opts := []kgo.Opt{
    kgo.ConsumerGroup(o.group),
    kgo.ConsumeTopics(o.topic),
    kgo.DisableAutoCommit(),
    // ...
}
// ...
batch := make([]*kgo.Record, 0)
fetches.EachRecord(func(r *kgo.Record) { batch = append(batch, r) })
// обработать каждую запись из batch...
err := cl.CommitRecords(commitCtx, batch...)

Обрати внимание: батч сначала собирается целиком, потом обрабатывается, потом одним вызовом коммитится. Если упали в середине — CommitRecords не вызвался, committed offset остался на начале батча.

manual-async/main.goAutoCommitMarks плюс mark на каждой записи плюс ручной flush между батчами.

go
opts := []kgo.Opt{
    kgo.ConsumerGroup(o.group),
    kgo.ConsumeTopics(o.topic),
    kgo.AutoCommitMarks(),
    kgo.AutoCommitInterval(o.commitEvery),  // 500ms — окно потерь
    // ...
}
// ...
fetches.EachRecord(func(r *kgo.Record) {
    // обработка...
    cl.MarkCommitRecords(r)
})
// между батчами — явный flush, чтобы не зависеть только от фонового таймера:
err := cl.CommitMarkedOffsets(flushCtx)

Каждый MarkCommitRecords — это локальный mark, не сетевой вызов. Дёшево. Сетевая часть случается раз в AutoCommitInterval или в явный CommitMarkedOffsets.

Демка: считаем дубли по log-файлу

В каждом из трёх главных циклов на каждое реально обработанное сообщение пишется строчка в файл вида partition,offset,key,value. После того как мы прогнали цикл несколько раз с crash-after и без, у нас остаётся лог. По нему легко считать:

  • общее число строк = «сколько раз приложение что-либо обработало»
  • уникальных пар partition,offset = «сколько разных сообщений прошло обработку»
  • разница = дубли

Цель make count-autocount-sync, count-async) делает ровно это:

sh
$ make count-auto
processed-auto.log: total=30 unique-offsets=20 duplicates=10

Это означает: 30 раз приложение что-то обработало, но реально уникальных сообщений было только 20 — десять обработано дважды. Это и есть цена auto-commit'а с искусственным крашем перед 5-секундным интервалом.

Полный сценарий демки выглядит так. Сначала готовим стенд:

sh
make topic-create
make topic-load                # 30 сообщений в топик
make group-delete-all          # очистить committed offset'ы всех трёх групп
make clean-logs                # удалить старые processed-*.log

Дальше первый «крашевый» прогон auto-commit:

sh
make run-auto CRASH=10         # обработает 10, упадёт без commit'а

И второй прогон, который должен дочитать остаток:

sh
make run-auto CRASH=0          # читает оставшиеся; Ctrl+C когда лог перестанет расти
make count-auto

Если auto-commit за время первой обработки не успел сработать — увидишь 10 дублей (offset'ы 0..9 обработаны дважды) плюс 20 уникальных дочитанных. То же самое повторяешь для run-sync и run-async. На manual-sync дубли тоже будут — ровно один батч, который не успели закоммитить. На manual-async дубли тоже возможны, но окно меньше — то, что было помечено, но ещё не успело flush'нуться.

Что значит «commit offset N» и куда оно физически едет

Внутренне все три варианта делают одно и то же: шлют брокеру OffsetCommit запрос с парой (topic, partition) → offset. Брокер, который выполняет роль coordinator'а группы, пишет это в compact-топик __consumer_offsets ключом (group, topic, partition). Compact гарантирует, что для одной группы и одной партиции в логе хранится только последняя запись (предыдущие схлопываются compaction'ом).

Это значит две практические штуки. Первая — committed offset'ы переживают рестарт брокера и переезд coordinator'а: они физически на диске, как обычные сообщения. Вторая — __consumer_offsets сам по себе обычный топик; его можно описать через kafka-topics.sh --describe, посмотреть размер, увидеть какой brоker сейчас coordinator. Группы не магия, они построены поверх обычного механизма Kafka.

kafka-consumer-groups.sh --describe --group <name> (он же make group-describe-auto в нашей лекции) читает именно эти committed offset'ы и сравнивает с current LEO — так получается lag. Lag = LEO − committed. Большой lag = группа отстаёт. И теперь видно, откуда он берётся: это просто две числа, одно из commit'а группы, другое из лога партиции.

Что выбрать в реальном проекте

Если коротко: дефолтный auto-commit — только в демках, тестах и одноразовых утилитах. Для production-кода используй ручной commit.

Sync-commit на каждый батч — выбор по умолчанию, когда обработка идёт хоть сколько-то значимыми пачками (десятки-сотни записей). Латенси commit'а добавит миллисекунды, и это нормальная цена. Понимать, что окно дублей — один батч.

AutoCommitMarks + mark на каждой записи + явный flush между батчами — выбор, когда каждый лишний sync-commit заметен на throughput'е (мелкие батчи, очень высокий рейт, или бизнес-логика быстрая и любая блокировка цикла дорога). Окно дублей — AutoCommitInterval, его явно настраивай под свой риск-профиль.

Если нужны вообще нулевые дубли — одной commit-стратегией не обойтись. Тут уже подключают exactly-once через транзакции (Транзакции и EOS) или идемпотентный обработчик (Гарантии обработки — следующая лекция). Голый offset commit на стороне консьюмера никогда не даст exactly-once, как ни настраивай.

Что попробовать руками

  • запусти make run-auto CRASH=15, не дождавшись 2 секунд auto-commit'а — потом make run-auto CRASH=0, увидишь дубли в processed-auto.log;
  • увеличь WORK_DELAY=400ms и CRASH=20 — обработка растянется на 8 секунд, auto-commit успеет сработать; потом второй прогон покажет, что часть offset'ов потеряна (потеря = total < SEED_MESSAGES);
  • сделай тот же эксперимент на run-sync и run-async — на sync дубли будут ровно размером батча; на async — размером commit-окна;
  • посмотри make group-describe-auto после каждого прогона: видно committed offset на каждую партицию;
  • удали committed offset группы (make group-delete-auto) и снова запусти — увидишь, что группа стартует с earliest, и весь топик читается заново, как при первом запуске.

Дальше

Эта лекция закрыла механику commit'ов. Следующая (Гарантии обработки) — про то, как одной commit-стратегией exactly-once не сделаешь, и зачем в обработчике нужна идемпотентность плюс dedup-таблица. Там Postgres, INSERT ... ON CONFLICT DO NOTHING и kill -9 в середине обработки уже не дают дублей в БД — потому что обработчик защищён, а не консьюмер.

·Модуль 03

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Консьюмер / Коммиты offset'ов