Группы и ребалансы
С этой лекции начинаем модуль про консьюмера. В первой лекции (Первый консьюмер на franz-go) мы запустили один консьюмер и читали топик в цикле — этого хватило, чтобы понять, как устроен PollFetches и как корректно завершаться. Тут смотрим, что происходит, когда консьюмеров становится несколько и Kafka должна как-то поделить между ними партиции. В нарративе Brew это kitchen-service: в промо-пятницу одного экземпляра не хватает, кухню масштабируют до нескольких копий в одной группе, и партиции brew.orders.v1 делятся между ними.
Группа — это и есть тот механизм деления.
Что такое consumer group
У каждого консьюмера в Kafka есть group.id — короткая строка, которая склеивает несколько отдельных процессов в одну логическую единицу. Брокер видит запрос JoinGroup от клиента и по group.id понимает: «эти подписаны на один и тот же топик и зовутся одной группой — раздам им партиции пополам». Один и тот же топик, но разные group.id — это две независимые группы, каждая получает все сообщения целиком.
Главное правило одно: внутри одной группы каждая партиция в любой момент времени читается ровно одним консьюмером. Не двумя сразу. Если консьюмеров больше, чем партиций — лишние сидят без работы (idle members). Если меньше — кто-то держит сразу несколько партиций. Это и даёт горизонтальное масштабирование чтения: добавил ноду в группе — нагрузка по партициям перераспределилась.
За координацию отвечает один из брокеров — group coordinator. Он выбирается по hash(group.id) % num_partitions(__consumer_offsets), и именно он обрабатывает heartbeat'ы, JoinGroup, SyncGroup, OffsetCommit. Когда падает coordinator (а это просто один из брокеров) — группа автоматически переезжает к другому.
Rebalance — что это и когда случается
Rebalance — это перерасклад партиций по членам группы. Триггеры:
- член зашёл в группу (новый консьюмер стартанул);
- член вышел — Ctrl+C, или kill -9, или упал по OOM, или контейнер ребутнули;
- член пропустил session.timeout (см. ниже) — coordinator считает его мёртвым;
- топик расширили — появились новые партиции, их надо кому-то отдать;
- админ позвал ребаланс руками через kafka-consumer-groups.sh.
В любом из этих случаев coordinator открывает фазу ребаланса: сначала собирает заявки членов (JoinGroup), потом раздаёт новый ассайнмент (SyncGroup) и говорит «работайте». До этого — все стоят. Это и есть боль ребаланса: пока он идёт, группа простаивает, лаг растёт.
Стратегии распределения
Решение «кому какие партиции» принимает не coordinator, а лидер группы — один из членов, которому brokers делегировали роль assignor'а. Локально это значит: ты сам выбираешь стратегию через kgo.Balancers(...) на клиенте. Для всех членов одной группы она должна совпадать, иначе coordinator не сможет выбрать общий протокол.
Стратегий несколько, и они принципиально разные:
Range — самая старая. Партиции каждого топика делятся на непрерывные диапазоны и раздаются членам по алфавиту member_id. Просто, но плохо балансируется при нескольких топиках: один член может получить partition 0 двух разных топиков, а другой — никто.
Round-robin — партиции всех топиков выкладываются в один список и раздаются по кругу. Лучше распределяет, но при изменении состава группы перетасовывает почти всё — каждый ребаланс утягивает все партиции, даже те, что могли бы остаться у тех же владельцев.
Sticky — пытается сохранить «прилипчивость»: при ребалансе старается оставить партиции у тех же членов, у кого они уже были, и переместить только то, что нужно для балансировки. Главный плюс — меньше работы по восстановлению локального состояния (кэши, in-flight'ы) после ребаланса.
Cooperative-sticky — sticky плюс другой ребаланс-протокол. Про протокол ниже.
В этой лекции мы используем sticky и cooperative-sticky — старые range/round-robin интересны только как историческая точка. На современных кластерах ставь cooperative-sticky по умолчанию и переключайся на sticky, только если есть конкретная причина.
Eager vs cooperative — это разные ребалансы, не разные стратегии
Тут есть штука, которую часто путают. Стратегия (sticky, round-robin, range, cooperative-sticky) — это алгоритм распределения партиций. Eager и cooperative — это протокол ребаланса, то есть то, как именно coordinator и члены группы общаются во время перераспределения.
Eager (он же «stop-the-world»):
- Триггер ребаланса — новый член зашёл, например.
- Coordinator говорит всем: «сдайте все свои партиции». Каждый член вызывает
OnPartitionsRevokedдля всех своих партиций. - Все участвуют в JoinGroup → SyncGroup, получают новый ассайнмент.
- Каждый член вызывает
OnPartitionsAssignedдля нового набора.
Между шагами 2 и 4 группа стоит. Если у тебя на члене было 100 партиций, и реально переезжает только 1 из них — ты всё равно сдашь все 100 и заберёшь обратно 99 тех же самых. Полная остановка.
Cooperative (incremental):
- Триггер ребаланса.
- Coordinator считает новый план распределения и говорит каждому члену, какие конкретно его партиции должны переехать. Член вызывает
OnPartitionsRevokedтолько для них. - Coordinator завершает первый раунд, отдаёт уже сдавшиеся партиции тем, кому они теперь принадлежат.
- Если план ещё не финализирован — второй раунд.
Партиции, которые остаются у того же члена, не отзываются вообще. Если у тебя на члене 100 партиций и переезжает 1 — ты потеряешь 1 и отдашь её другому, остальные 99 продолжат читаться без перерыва. Лаг по ним не вырастает.
Цена кооперативного протокола — два round-trip'а вместо одного и сложнее реализация на стороне клиента. На практике это окупается: на больших группах с долгой обработкой eager-ребаланс убивает SLO.
В одной группе нельзя смешивать eager и cooperative. Это разные SyncGroup-протоколы; coordinator выберет тот, который заявили все, и если хоть один член заявил eager — все откатываются на eager. Поэтому миграция со старого протокола на cooperative делается через rolling restart с временным двойным набором (sticky + cooperative-sticky одновременно, потом убираешь sticky).
Тайминги: heartbeat, session timeout, max poll interval
Координатор должен как-то понимать, что член ещё жив. Для этого член отправляет heartbeat каждые HeartbeatInterval (на проводе — heartbeat.interval.ms). franz-go делает это автоматически в фоне, отдельной горутиной — приложению ничего вызывать не нужно.
Если coordinator не получил heartbeat'а от члена дольше, чем SessionTimeout (session.timeout.ms), он объявляет члена мёртвым и начинает ребаланс без него. Дефолты franz-go v1.21.0: SessionTimeout = 45 секунд (Kafka 3.0+ нормализовался на этом значении после KIP-735), HeartbeatInterval = 3 секунды (pkg/kgo/config.go:641-643). То есть надо пропустить ~15 heartbeat'ов подряд, чтобы тебя выкинули. Это запас на сетевые задержки и паузы GC — короткий блип сети не должен приводить к ребалансу.
Третий тайминг — RebalanceTimeout (rebalance.timeout.ms). Дефолт franz-go — 60 секунд (config.go:642). Это окно, в течение которого все члены группы должны успеть отдать партиции, докоммитить offset'ы и переподписаться после JoinGroup. Если член не успел рейджойнить за это время, coordinator считает его выпавшим и продолжает ребаланс без него.
В Java-клиенте есть отдельная ручка max.poll.interval.ms (дефолт 5 минут), которая на клиентской стороне тоже отслеживает паузу между poll() и при превышении заставляет консьюмера самостоятельно выйти из группы. На проводе она едет в том же поле JoinGroupRequest, что и rebalance.timeout.ms. franz-go такой клиентской автоэвакуации не делает: heartbeat-горутина продолжает стучать, даже если обработка одной записи занимает полчаса. Получишь проблему только если в этот момент произойдёт ребаланс — тогда из-за зависшего обработчика клиент не успеет рейджойнить за RebalanceTimeout, и брокер исключит его из группы.
Соотношение трёх таймеров на словах:
HeartbeatInterval— как часто стучусь, что я живой;SessionTimeout— насколько долго брокер ждёт стук;RebalanceTimeout— насколько долго брокер ждёт, что я закончу свою часть ребаланса и переподпишусь.
Если боишься, что зомбированный процесс будет долго висеть в группе и не отдавать партиции — снижай SessionTimeout. Если боишься false positives на сетевых блипах — поднимай SessionTimeout. Если у тебя обработчик может реально удерживать партицию минутами и при этом ты хочешь, чтобы ребаланс прошёл чисто — поднимай RebalanceTimeout (тут уже франза-специфика: клиент ничего не оборвёт, но коммит в OnPartitionsRevoked обязан успеть).
Что показывает наш код
Бинарник один — cmd/loud-member. Запускаешь несколько копий в одной группе, каждая печатает событие ребаланса и текущий список ассайнов. Никакой полезной работы — только наблюдение за тем, как Kafka двигает партиции по копиям.
Идентификация копий — через переменную окружения MEMBER_ID. Она попадает в ClientID, в InstanceID, в каждый printEvent, в стартовый банер процесса:
opts := []kgo.Opt{
kgo.ConsumerGroup(o.group),
kgo.ConsumeTopics(o.topic),
kgo.Balancers(balancer),
kgo.HeartbeatInterval(o.heartbeat),
kgo.SessionTimeout(o.sessionTimeout),
kgo.ClientID(fmt.Sprintf("lecture-03-01-loud-%s", o.memberID)),
kgo.InstanceID(fmt.Sprintf("loud-member-%s", o.memberID)),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.OnPartitionsAssigned(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
printEvent(o.memberID, "ASSIGNED", m)
}),
kgo.OnPartitionsRevoked(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
printEvent(o.memberID, "REVOKED", m)
}),
kgo.OnPartitionsLost(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
printEvent(o.memberID, "LOST", m)
}),
}Тут пара мест, на которые стоит посмотреть. InstanceID — опциональный «статический» member id (group.instance.id в спеке). Если он задан, при кратковременном отвале (рестарт пода в k8s, например) coordinator не будет торопиться объявлять члена мёртвым: партиции вернутся ему же, как только он переподключится. Без InstanceID каждый рестарт = новый член = ребаланс. Для учебной демки сюда же забили имя по MEMBER_ID — оно совпадает между запусками, и coordinator это видит.
Хука три:
OnPartitionsAssigned— coordinator выдал тебе новый набор партиций.OnPartitionsRevoked— coordinator штатно забирает у тебя часть (или все) партиции в рамках планового ребаланса. До завершения ребаланса ты ещё член группы и можешь докоммитить offset'ы — это безопасное место для финального commit'а.OnPartitionsLost— отличается отRevokedровно одним: его дёргают на «фатальных» групповых ошибках (IllegalGeneration,UnknownMemberID, провал аутентификации, истёкший session.timeout). Партиции уже потеряны, ты больше не их владелец. Коммит здесь почти наверняка отвергнет coordinator — на это и завязана разница сOnPartitionsRevoked, где коммитить ещё можно и нужно.
Стратегия выбирается флагом -strategy:
func pickBalancer(name string) (kgo.GroupBalancer, error) {
switch strings.ToLower(strings.TrimSpace(name)) {
case "sticky":
return kgo.StickyBalancer(), nil
case "cooperative-sticky", "cooperative", "coop":
return kgo.CooperativeStickyBalancer(), nil
default:
return nil, fmt.Errorf("unknown strategy %q (поддерживаем sticky | cooperative-sticky)", name)
}
}Каждый запуск — один балансер. Несколько одновременно в одной группе нельзя (см. eager vs cooperative выше). По умолчанию cooperative-sticky — потому что хочется наблюдать incremental-ребаланс, в нём виднее разница между «эту партицию сдаём» и «эту оставляем».
Сам цикл чтения тривиальный — PollFetches, печать партиции/ключа/значения. Запись — для антуража: чтобы было видно, что член не просто стоит и слушает, но реально качает данные.
Что наблюдать
Топик у нас на 6 партиций (make topic-create-6p). Запусков три, в трёх терминалах:
-
Терминал 1:
make run-1. Стартует первая копия, MEMBER_ID=1. JoinGroup → SyncGroup, всё за пару сотен миллисекунд. В выводе видноASSIGNED: lecture-03-01-groups=[0 1 2 3 4 5]— все 6 партиций его. Если до этого былmake seed, начинает читать. -
Терминал 2:
make run-2. Вторая копия. В терминале 1 —REVOKED: lecture-03-01-groups=[3 4 5], в терминале 2 —ASSIGNED: lecture-03-01-groups=[3 4 5]. Группа балансируется пополам. На cooperative первый член отзывает только то, что переезжает, не все 6. На eager (sticky) — отзовёт все 6 и получит обратно [0 1 2]. -
Терминал 3:
make run-3. Третья копия. Итог — 2/2/2 распределение. На cooperative первые два терминала видятREVOKEDтолько на ту партицию, которая уехала к третьему члену. -
Ctrl+C на терминале 2. В терминале 1 —
ASSIGNEDдополнительно, в терминале 3 —ASSIGNEDдополнительно. Партиции ушедшего второго распределились между оставшимися. Cooperative гладко: терминалы 1 и 3 не теряют свои текущие партиции, только добавляют новые. -
kill -9 второго (без Ctrl+C). Тут интересно: процесс убит резко, ничего не успевает корректно сдать. Coordinator ждёт session.timeout (по дефолту в нашем коде — 30 секунд), потом начинает ребаланс. До истечения таймера группа в нормальном состоянии, партиции убитого никто не читает, лаг по ним растёт. Это и есть «цена внезапной смерти» — секунды простоя на эти партиции.
Чтобы убить ровно нужный MEMBER_ID без угадывания PID:
pgrep -f 'loud-member.*MEMBER_ID=2' | xargs kill -9Что взять с собой
- Consumer group — это просто общий group.id. Внутри группы каждая партиция читается ровно одним консьюмером.
- Rebalance — перерасклад партиций. Триггеры: вход/выход члена, расширение топика, session timeout, ручной триггер админа.
- Стратегий несколько, по умолчанию ставь
cooperative-sticky. Sticky без cooperative — eager-протокол с stop-the-world ребалансом. - В одной группе все члены должны заявить совместимый ребаланс-протокол. Cooperative + eager → группа схлопнется до eager.
- Тайминги ребаланса в franz-go — три независимые ручки:
HeartbeatInterval(дефолт 3 с, «как часто стучусь»),SessionTimeout(45 с, «сколько брокер ждёт стук»),RebalanceTimeout(60 с, «сколько брокер ждёт, что я закончу свою часть ребаланса»). Java-клиентскийmax.poll.interval.msс автоэвакуацией клиента из группы в franz-go отсутствует — длинная обработка не приведёт к самоэвикции, проблема всплывёт только при одновременном ребалансе. - InstanceID (
group.instance.id) даёт статическую идентификацию: при рестарте партиции вернутся тому же логическому члену, без ребаланса. - OnPartitionsLost — отдельный хук на «выкинут coordinator'ом по таймауту». В нём нельзя коммитить offset'ы, в OnPartitionsRevoked — можно.
В следующей лекции (Коммиты offset'ов) спустимся на уровень offset-коммитов: разберём auto-commit и его дубли при рестарте, manual sync/async, MarkCommitRecords + CommitMarkedOffsets и пару других ручек. Тут мы offset'ы трогали по касательной — пора разобраться предметно.
Запуск
Стенд из корня репозитория должен быть поднят (docker compose up -d).
make topic-create-6p # создать топик с 6 партициями
make seed # залить 60 сообщений (опционально, чтобы было что читать)Дальше в трёх терминалах:
# терминал 1
make run-1
# терминал 2 (запускаешь, когда первый уже работает)
make run-2
# терминал 3
make run-3Поэкспериментировать с eager-протоколом:
make run-eager # MEMBER_ID=1 + sticky
# во втором терминале:
STRATEGY=sticky make run-2Посмотреть состояние группы со стороны брокера (полезно сравнить с тем, что напечатали хуки):
make group-describeСбросить committed offset'ы (после этого следующий run-* опять читает с earliest):
make group-delete