Первый консьюмер на franz-go
В предыдущей лекции order-service записал в brew.orders.v1 десять OrderPlaced. Брокер вернул на каждую (partition, offset), сумма latest по партициям сошлась в 10. Эти десять записей сейчас лежат в логе и ждут читателя. Теперь смотрим на них с другой стороны - со стороны kitchen-service. В Brew этот сервис живёт на каждой кофейне; задача простая - вытаскивать новые заказы из brew.orders.v1 и показывать бариста, что сейчас готовить.
Это лекция про consumer. Программу, которая читает партицию и помнит, до куда дочитала. После брокерской стороны (топики, партиции, ISR, retention) и стороны записи (ProduceSync, ключ, acks) меняем угол ещё раз: код, который читает. Цель скромная. Прочитать те же 10 OrderPlaced, увидеть пары (partition, offset) в выводе и собрать в голове опорную модель консьюмер-группы. Сложные параметры - manual commit, гарантии, ребалансы - проговорим словами и оставим для модуля 03.
kgo.Client со стороны чтения - тот же долгоживущий объект
Один и тот же kgo.NewClient обслуживает и продьюсер, и консьюмер - разница в опциях. kitchen-service создаёт клиент один раз на старте процесса, держит до shutdown'а, через него и читает. Пул соединений до брокеров, кэш metadata, фоновые горутины - всё то же, что было у order-service в 01-05. Аналогия с *sql.DB остаётся в силе: один пул на весь процесс, никакого «открыл-закрыл на каждую операцию».
В franz-go есть два режима потребления. Первый - direct consumer, без группы. Ты сам перечисляешь топики и партиции, сам помнишь offset'ы и сам же их сохраняешь куда-нибудь сбоку. Полезно для админских утилит, для бэкапов и для случаев, когда позицию хранит внешняя БД (например, в Kafka Streams-подобных пайплайнах). В нашем курсе direct-режим встречается в утилитах вроде inspect и load-and-watch из 01-02 и 01-04; для бизнес-сервисов он редкий гость. Развёрнутый разбор - Коммиты offset'ов.
Второй режим - consumer-группа. Это то, что в 99% продакшен-кода и что включает kitchen-service в Brew. Группа берёт на себя:
- распределение партиций между членами (один топик
brew.orders.v1с 3 партициями плюс 3 инстансаkitchen-service= по одной партиции каждому); - хранение committed offset'ов в
__consumer_offsets(этот топик разбирали в Offsets и retention); - ребалансы - когда член группы добавляется или уходит, партиции перераспределяются автоматически;
- координацию через group coordinator на брокере (один из брокеров кластера выбирается координатором конкретной группы).
Аналогия для бэкендера. Consumer-группа похожа на пул воркеров поверх очереди задач, только источник задач это лог Kafka, а позицию каждого воркера группа помнит сама. PostgreSQL даёт похожую модель в LISTEN/NOTIFY плюс advisory locks, но там вся координация поверх таблиц; в Kafka она встроена в брокер и доступна одной опцией клиента.
Включается одной строчкой:
cl, _ := kgo.NewClient(
kgo.SeedBrokers(...),
kgo.ConsumerGroup("brew.kitchen-service"),
kgo.ConsumeTopics("brew.orders.v1"),
)ConsumerGroup("...") это group.id. По нему Kafka отличает один логический консьюмер (возможно, собранный из нескольких процессов) от другого. Два процесса с одинаковым group.id это одна группа, они делят партиции между собой. Два процесса с разными group.id это две независимые группы; у каждой свой собственный committed offset, и читают они одни и те же сообщения параллельно, не мешая друг другу. В Brew, например, kitchen-service пишется в группу brew.kitchen-service, а analytics-service читает тот же топик в группе brew.analytics; каждая группа держит свой темп, своё отставание и свою позицию.
ConsumeTopics("...") это какие топики подписать. Можно несколько; список фиксированный, можно ещё ConsumeRegex с pattern'ом (удобно, когда сервис читает все brew.*.events, не зная заранее точные имена).
PollFetches - как мы получаем сообщения
Сама работа консьюмера в цикле:
for {
fetches := cl.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
// обработка записи
})
}PollFetches это блокирующий вызов. Он ждёт, пока придёт хоть что-то от брокера, и возвращает kgo.Fetches. Это сразу список ответов от брокеров, внутри каждого список топиков, внутри каждого топика список партиций, внутри каждой партиции список записей. Многоуровневая логика, потому что один Fetch может прийти сразу от нескольких партиций нескольких топиков - так удобнее с точки зрения сетевого протокола.
Для прикладного кода это лестница, по которой обычно не лазают руками. Есть удобные обёртки:
fetches.EachRecord(fn)- пройтись по всем записям, не разбираясь, из какой партиции какая.fetches.EachPartition(fn)- пройтись пакетами по партициям. Полезно, если хочешь собрать batch и обработать всё из одной партиции одной транзакцией.fetches.Records()- собрать все записи в плоский срез.fetches.Errors()- список ошибок per-партиция.
Структура одной записи это kgo.Record. Там Topic, Partition, Offset, Key, Value, Headers, Timestamp - то самое, что отдавал брокер на produce. Со стороны консьюмера видно ровно то, что ушло в лог: Key="order-7", Value="OrderPlaced order_id=order-7", Partition=2, Offset=2 - те же координаты, которые ProduceSync вернул в 01-05.
Один важный момент про PollFetches - это не блокировка «навсегда». Он завершается, как только истекает контекст или клиент закрывается. Поэтому корневой ctx у нас runctx.New() - он отменится по SIGINT, PollFetches вернётся с context.Canceled в Errors(), мы ловим это, выходим из цикла. Никаких отдельных каналов для shutdown не нужно.
Одна партиция - один консьюмер группы
Главное правило группы. Внутри одной consumer-группы у одной партиции может быть максимум один читатель. Это не настройка, это устройство Kafka: координатор распределяет каждую партицию топика ровно одному члену группы и не позволяет двум членам читать одну и ту же партицию параллельно. Иначе offset поплыл бы непредсказуемо, и committed позицию было бы невозможно интерпретировать.
Из правила следует арифметика, которая определяет, как kitchen-service будет масштабироваться. Топик brew.orders.v1 создан с тремя партициями. Возможные раскладки:
- 1 инстанс
kitchen-service- все 3 партиции достаются ему. Параллелизма по чтению нет, всё в одной горутине-poller'е. - 2 инстанса - типичная раскладка 2:1, один тянет две партиции, другой одну. Cooperative-sticky assignor старается не двигать партиции без нужды.
- 3 инстанса - идеальный баланс, каждому по одной партиции. Это максимум полезной параллельности для этого топика.
- 4 и больше - один (или несколько) инстансов будут простаивать: они в группе, координатор про них знает, но партиций для них нет. Heartbeat'ы шлют, ничего не читают.
Отсюда правило, которое надо запомнить с первого раза: число партиций топика - это потолок параллельного чтения внутри одной группы. Хочется ускорить kitchen-service за счёт пятого инстанса - пятый инстанс ничего не даст, пока не увеличишь партиций в brew.orders.v1. И наоборот - на трёх партициях нет смысла держать пять реплик kitchen-service, два будут просто гореть в RAM.
Между разными группами правило не действует. analytics-service читает тот же brew.orders.v1 в группе brew.analytics, и ему «никто не мешает», даже если kitchen-service уже забрал все три партиции себе. Кафка раздаёт партиции внутри группы, а не глобально по кластеру.
Auto-commit и две ловушки
Тут начинается нюанс, ради которого половина модуля 03 написана.
В franz-go (и в Kafka в целом) по умолчанию включён auto-commit. Раз в auto.commit.interval.ms (5 секунд по дефолту) клиент берёт текущую позицию, до которой он точно дочитал, и коммитит её в __consumer_offsets. Удобно. Сам ничего не пишешь, оно работает.
И это - ловушка. Точнее, две.
Первая ловушка: коммит фиксирует то, что прочитано. Не то, что обработано. Когда PollFetches вернул OrderPlaced, запись уже считается «прочитанной» с точки зрения auto-commit'а. Если kitchen-service сейчас упадёт посередине обработки этой записи (пишет в локальную БД готовки, БД отвалилась, процесс умер) - committed offset уже мог уйти на эту запись или дальше. На рестарте мы её не перечитаем. Заказ потерян для бизнес-смысла кухни; в Kafka он по-прежнему лежит, но бариста его не увидит.
Вторая ловушка: интервал. Между двумя авто-коммитами 5 секунд. Если упадёшь в этом окне, на рестарте можешь получить дубль - записи, которые мы успели обработать, но Kafka не успела про это услышать, придут заново. Это at-least-once в плохом смысле - без идемпотентного хендлера дублирующий OrderPlaced отправит бариста готовить тот же капучино ещё раз.
Кратко: auto-commit-by-default не даёт ни at-most-once, ни at-least-once - возможны и потеря (первая ловушка), и дубль (вторая ловушка), в зависимости от того, где упадёшь относительно 5-секундного окна. Так себе гарантия. Лекции Коммиты offset'ов и Гарантии обработки разбирают это подробно: там и manual commit, и MarkCommitRecords + CommitMarkedOffsets, и dedup в БД, и идемпотентные хендлеры.
В нашем учебном коде auto-commit оставлен включённым специально, чтобы было о чём поговорить и было что чинить дальше. Сейчас работает он так:
PollFetchesвернул нам пакетOrderPlaced.- Мы их напечатали (никакой реальной обработки, бариста условный).
- Параллельно фоновой горутиной franz-go раз в 5 секунд шлёт
OffsetCommitс текущей позицией. - На SIGINT мы делаем
cl.Close(). Он останавливает auto-commit-горутину и корректно покидает группу. Финального sync-commit'а в этом сценарии не происходит: мы override-нулиOnPartitionsRevoked(чтобы видетьrevoked: ...в stderr), а это по docs franz-go отключает дефолтный commit-on-revoke. До пяти секунд последних чтений могут остаться незакоммиченными - на рестарте те жеOrderPlacedприлетят заново. Это и есть вторая ловушка во плоти; в модуле 03 чиним ручнымCommitUncommittedOffsets.
Поэтому в выводе будет видна строчка «kitchen-service остановлен по сигналу». Никаких обещаний про финальный коммит - честно говорим, что часть позиции может потеряться.
Корректное завершение
Шаблон такой:
ctx, cancel := runctx.New() // SIGINT/SIGTERM → ctx.Done()
defer cancel()
cl, _ := kafka.NewClient(...)
defer cl.Close() // покинуть группу и закрыть соединения (финальный commit добавим в модуле 03)
for {
fetches := cl.PollFetches(ctx)
if fetches.IsClientClosed() {
return
}
if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
if errors.Is(e.Err, context.Canceled) { return nil }
return fmt.Errorf("fetch %s/%d: %w", e.Topic, e.Partition, e.Err)
}
}
// обработка
}В шаблоне три обязательные детали. defer cl.Close() нужен, иначе клиент не покинет группу корректно (координатор узнает о смерти только по session.timeout.ms, и до тех пор партиции не достанутся другим членам). Сам по себе Close() финального commit'а не делает при кастомном OnPartitionsRevoked - это явно сказано в docs franz-go; до пяти секунд last reads остаются незакоммиченными. Проверка ошибок нужна, иначе контекст-cancel приведёт к бесконечному циклу с тихими ошибками (PollFetches возвращает пустой fetches.Records() и тут же снова блокируется). Передача ctx именно в PollFetches (без подмены на context.Background()) и есть точка, через которую SIGINT доходит до клиента. В лекциях 03-х к этому добавится manual commit; шаблон останется тот же, перед cl.Close() появится cl.CommitUncommittedOffsets(ctx).
Что показывает наш код
cmd/consumer/main.go делает четыре вещи:
- Создаёт
kgo.Clientв режиме consumer-группыbrew.kitchen-service, подписан на топикbrew.orders.v1. - На свежей группе (когда committed offset ещё нет) явно сбрасывается на earliest через
ConsumeResetOffset(...AtStart()). По умолчанию franz-go v1.21.0 уже выставляетConsumeResetOffset(NewOffset().AtStart())(см. godoc), так что эта опция дублирует дефолт - но фиксирует поведение явно и развязывает с возможным изменением дефолта в будущих версиях. Kafka-брокер, в отличие от клиента, вauto.offset.resetпо умолчанию ставитlatest- но это уже неважно, потому что клиентский reset разрешается на стороне franz-go. - В цикле читает
PollFetchesи печатает таблицуmember/partition/offset/key/value/broker-ts. Корректно завершается по SIGINT. - Печатает в stderr хуки
OnPartitionsAssignedиOnPartitionsRevoked- видно, какие партиции достались этому процессу. Полезно при наблюдении ребаланса (см. ниже проmake run-2nd).
Сборка опций клиента это пять строк:
opts := []kgo.Opt{
kgo.ConsumerGroup(o.group),
kgo.ConsumeTopics(o.topic),
kgo.ClientID(fmt.Sprintf("kitchen-service-%s", o.memberID)),
kgo.OnPartitionsAssigned(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
fmt.Fprintf(os.Stderr, "[member=%s] assigned: %v\n", o.memberID, m)
}),
kgo.OnPartitionsRevoked(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
fmt.Fprintf(os.Stderr, "[member=%s] revoked: %v\n", o.memberID, m)
}),
}
if o.fromStart {
opts = append(opts, kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()))
} else {
opts = append(opts, kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()))
}
cl, err := kafka.NewClient(opts...)ConsumeResetOffset(...AtStart()) это «при первом старте группы читай с earliest»; AtEnd() это «читай только то, что придёт после старта». На втором запуске уже есть committed offset в __consumer_offsets, и ResetOffset ни на что не влияет - он отрабатывает только при первом подключении группы к партиции или при OffsetOutOfRange.
Основной цикл это PollFetches плюс печать через EachRecord. По дороге проверяем ошибки: context.Canceled это SIGINT, всё остальное настоящий fail на fetch'е:
for {
fetches := cl.PollFetches(ctx)
if fetches.IsClientClosed() {
return nil
}
if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
if errors.Is(e.Err, context.Canceled) {
fmt.Println("kitchen-service остановлен по сигналу.")
return nil
}
return fmt.Errorf("fetch %s/%d: %w", e.Topic, e.Partition, e.Err)
}
}
fetches.EachRecord(func(r *kgo.Record) {
fmt.Fprintf(tw, "%s\t%d\t%d\t%s\t%s\t%s\n",
o.memberID, r.Partition, r.Offset,
string(r.Key), string(r.Value),
r.Timestamp.Format("15:04:05.000"),
)
})
_ = tw.Flush()
}defer cl.Close() корректно покидает группу и закрывает соединения, но финального коммита не делает - с кастомным OnPartitionsRevoked дефолтный commit-on-revoke отключён. До пяти секунд reads могут не уйти в __consumer_offsets, и при рестарте те же OrderPlaced прилетят заново. В модуле 03 чиним явным cl.CommitUncommittedOffsets(ctx) перед Close().
Ожидаемый вывод после make run на свежесозданной группе с 10 OrderPlaced в трёх партициях:
kitchen-service запущен: brew-topic="brew.orders.v1" group="brew.kitchen-service" member=1 from-start=true
читаем brew-orders; Ctrl+C - выход.
[member=1] assigned: map[brew.orders.v1:[0 1 2]]
MEMBER PARTITION OFFSET KEY VALUE BROKER-TS
1 1 0 order-0 OrderPlaced order_id=order-0 16:55:01.234
1 1 1 order-1 OrderPlaced order_id=order-1 16:55:01.241
1 1 2 order-7 OrderPlaced order_id=order-7 16:55:01.277
1 1 3 order-9 OrderPlaced order_id=order-9 16:55:01.289
1 0 0 order-2 OrderPlaced order_id=order-2 16:55:01.247
1 0 1 order-3 OrderPlaced order_id=order-3 16:55:01.253
1 0 2 order-6 OrderPlaced order_id=order-6 16:55:01.271
1 2 0 order-4 OrderPlaced order_id=order-4 16:55:01.259
1 2 1 order-5 OrderPlaced order_id=order-5 16:55:01.265
1 2 2 order-8 OrderPlaced order_id=order-8 16:55:01.283Несколько наблюдений по выводу.
Записи внутри одной партиции идут в порядке offset'ов: 0, 1, 2, 3. Это гарантия Kafka, она держится всегда. А между партициями порядок не определён вообще: что-то из 0, что-то из 1, что-то из 2 - клиент вычитывает их параллельно, и сборка в общем потоке зависит от тайминга PollFetches. Если запустить второй раз, конкретный порядок строк может быть другим. Ordering-per-partition это единственная гарантия порядка, которую Kafka даёт. Глобального порядка по топику не бывает.
Раскладка (order-id → partition) тут совпадает с той, что вернул ProduceSync в лекции 01-05: order-0/1/7/9 в партиции 1, order-2/3/6 в партиции 0, order-4/5/8 в партиции 2. Это и есть детерминированный partitioner: дефолтный хеш franz-go это murmur2, и murmur2(order_id) mod 3 даёт одну и ту же партицию для одного и того же ключа. Кухня видит ровно ту же группировку, что положил order-service.
Member везде «1», потому что у нас один процесс. Все три партиции достались ему - assigned map это и показывает.
Если запустить make run ещё раз с тем же group.id - таблица будет пустой. Committed offset уже стоит на 3/4/3 (по партициям 0/1/2), новых сообщений нет, консьюмер просто висит в PollFetches и ждёт. Это и есть «committed offset работает» - никакого бага здесь нет. Хочется перечитать всё с начала - есть make run-fresh, она дописывает к group.id случайный суффикс и получает свежую группу с пустыми offset'ами.
Два инстанса в одной группе - наблюдаем rebalance
Запусти в первом терминале:
make runВидишь, что member=1 получил все три партиции (assigned: map[...:[0 1 2]]). Дочитал, ждёт.
Во втором терминале:
make run-2ndПроисходит ребаланс - короткая пауза, в течение которой координатор группы пересобирает раскладку партиций между членами. У первого процесса в stderr появится строка [member=1] revoked: ... (часть партиций уходит) и сразу [member=1] assigned: ... с уже сокращённым списком. У второго процесса будет [member=2] assigned: ... с теми партициями, которые ему отдали. Стандартное распределение для cooperative-sticky (дефолт franz-go в свежих версиях) это две партиции одному, одну другому; какие именно зависит от реализации.
Это и весь ребаланс, который мы хотим увидеть в 01-06. Четыре assignor'а (range, round-robin, sticky, cooperative-sticky), разница между eager и incremental cooperative, downtime при разных стратегиях, тюнинг session.timeout.ms и heartbeat.interval.ms - всё это разворачивает Группы и ребалансы. Тут просто фиксируем: координатор автоматически перераспределяет нагрузку между членами, и для приложения это видно через два callback'а.
Закрой второй процесс (Ctrl+C). У первого опять revoked плюс assigned со всеми тремя партициями. Так и должно быть: партиции вернулись единственному оставшемуся члену.
Запуск
Стенд должен быть поднят (docker compose up -d из корня репозитория). До запуска консьюмера продьюсер Первый продьюсер должен уже отработать и положить в brew.orders.v1 хотя бы что-то.
# в 01-foundations/01-05-first-producer
make run
# в 01-foundations/01-06-first-consumer
make runЧтобы посмотреть на committed offset группы со стороны Kafka:
make group-describeЭто kafka-consumer-groups.sh --describe. Он печатает per-partition committed offset, lag (разницу между latest и committed), member-id, host клиента. После полного прочтения 10 OrderPlaced lag=0 на всех партициях. Если убить процесс посередине обработки и тут же дёрнуть group-describe, увидишь lag - auto-commit просто не успел.
Сбросить committed offset'ы группы (например, чтобы перечитать те же 10 сообщений с начала):
make group-deleteУдаление группы не трогает данные в логе - оно стирает запись в __consumer_offsets. На следующем запуске консьюмера группа считается «новой», от earliest (при from-start=true). В учебных лекциях это удобный приём; в проде такой сброс делать только понимая, что обнуляешь и какая именно группа.
Что забрать с собой
Опорная модель консьюмера, на которой стоит весь дальнейший модуль 03:
- Группа это логический консьюмер. group.id её имя. Один и тот же group.id на нескольких процессах = делёжка партиций. Разные group.id = независимые читатели одного и того же топика.
- Внутри группы одна партиция = один читатель. Параллелизм чтения упирается в число партиций топика. Лишние инстансы простаивают.
- PollFetches возвращает Fetches → Topics → Partitions → Records. На уровне приложения почти всегда работаешь через
EachRecordилиEachPartition. - Auto-commit лжёт по дефолту. Он коммитит позицию чтения, никак не связанную с фактом обработки в коде. В модуле 03 будем чинить.
- Завершение через
cl.Close()плюс ctx из runctx.cl.Close()корректно покидает группу и закрывает соединения; при override-нутомOnPartitionsRevokedфинального commit'а он не делает - до пяти секунд last reads могут потеряться. Чиним явнымCommitUncommittedOffsetsв модуле 03. БезctxвPollFetchesне вылезешь из цикла на SIGINT.
Дальше модуль 02. Возвращаемся к продьюсеру и копаем, что там было «по умолчанию»: Ключи и партиционирование, Acks и durability, Идемпотентный продьюсер, Батчинг и пропускная способность, Ошибки, retries и headers. А там и модуль 03 рядом - именно он сделает из этого «голого» консьюмера производственный.