Kafka CookbookКонсьюмерГруппы и ребалансы
0 / 42 (0%)

Группы и ребалансы

С этой лекции начинаем модуль про консьюмера. В первой лекции (Первый консьюмер на franz-go) мы запустили один консьюмер и читали топик в цикле — этого хватило, чтобы понять, как устроен PollFetches и как корректно завершаться. Тут смотрим, что происходит, когда консьюмеров становится несколько и Kafka должна как-то поделить между ними партиции. В нарративе Brew это kitchen-service: в промо-пятницу одного экземпляра не хватает, кухню масштабируют до нескольких копий в одной группе, и партиции brew.orders.v1 делятся между ними.

Группа — это и есть тот механизм деления.

Что такое consumer group

У каждого консьюмера в Kafka есть group.id — короткая строка, которая склеивает несколько отдельных процессов в одну логическую единицу. Брокер видит запрос JoinGroup от клиента и по group.id понимает: «эти подписаны на один и тот же топик и зовутся одной группой — раздам им партиции пополам». Один и тот же топик, но разные group.id — это две независимые группы, каждая получает все сообщения целиком.

Главное правило одно: внутри одной группы каждая партиция в любой момент времени читается ровно одним консьюмером. Не двумя сразу. Если консьюмеров больше, чем партиций — лишние сидят без работы (idle members). Если меньше — кто-то держит сразу несколько партиций. Это и даёт горизонтальное масштабирование чтения: добавил ноду в группе — нагрузка по партициям перераспределилась.

За координацию отвечает один из брокеров — group coordinator. Он выбирается по hash(group.id) % num_partitions(__consumer_offsets), и именно он обрабатывает heartbeat'ы, JoinGroup, SyncGroup, OffsetCommit. Когда падает coordinator (а это просто один из брокеров) — группа автоматически переезжает к другому.

Rebalance — что это и когда случается

Rebalance — это перерасклад партиций по членам группы. Триггеры:

  • член зашёл в группу (новый консьюмер стартанул);
  • член вышел — Ctrl+C, или kill -9, или упал по OOM, или контейнер ребутнули;
  • член пропустил session.timeout (см. ниже) — coordinator считает его мёртвым;
  • топик расширили — появились новые партиции, их надо кому-то отдать;
  • админ позвал ребаланс руками через kafka-consumer-groups.sh.

В любом из этих случаев coordinator открывает фазу ребаланса: сначала собирает заявки членов (JoinGroup), потом раздаёт новый ассайнмент (SyncGroup) и говорит «работайте». До этого — все стоят. Это и есть боль ребаланса: пока он идёт, группа простаивает, лаг растёт.

Стратегии распределения

Решение «кому какие партиции» принимает не coordinator, а лидер группы — один из членов, которому brokers делегировали роль assignor'а. Локально это значит: ты сам выбираешь стратегию через kgo.Balancers(...) на клиенте. Для всех членов одной группы она должна совпадать, иначе coordinator не сможет выбрать общий протокол.

Стратегий несколько, и они принципиально разные:

Range — самая старая. Партиции каждого топика делятся на непрерывные диапазоны и раздаются членам по алфавиту member_id. Просто, но плохо балансируется при нескольких топиках: один член может получить partition 0 двух разных топиков, а другой — никто.

Round-robin — партиции всех топиков выкладываются в один список и раздаются по кругу. Лучше распределяет, но при изменении состава группы перетасовывает почти всё — каждый ребаланс утягивает все партиции, даже те, что могли бы остаться у тех же владельцев.

Sticky — пытается сохранить «прилипчивость»: при ребалансе старается оставить партиции у тех же членов, у кого они уже были, и переместить только то, что нужно для балансировки. Главный плюс — меньше работы по восстановлению локального состояния (кэши, in-flight'ы) после ребаланса.

Cooperative-sticky — sticky плюс другой ребаланс-протокол. Про протокол ниже.

В этой лекции мы используем sticky и cooperative-sticky — старые range/round-robin интересны только как историческая точка. На современных кластерах ставь cooperative-sticky по умолчанию и переключайся на sticky, только если есть конкретная причина.

Eager vs cooperative — это разные ребалансы, не разные стратегии

Тут есть штука, которую часто путают. Стратегия (sticky, round-robin, range, cooperative-sticky) — это алгоритм распределения партиций. Eager и cooperative — это протокол ребаланса, то есть то, как именно coordinator и члены группы общаются во время перераспределения.

Eager (он же «stop-the-world»):

  1. Триггер ребаланса — новый член зашёл, например.
  2. Coordinator говорит всем: «сдайте все свои партиции». Каждый член вызывает OnPartitionsRevoked для всех своих партиций.
  3. Все участвуют в JoinGroup → SyncGroup, получают новый ассайнмент.
  4. Каждый член вызывает OnPartitionsAssigned для нового набора.

Между шагами 2 и 4 группа стоит. Если у тебя на члене было 100 партиций, и реально переезжает только 1 из них — ты всё равно сдашь все 100 и заберёшь обратно 99 тех же самых. Полная остановка.

Cooperative (incremental):

  1. Триггер ребаланса.
  2. Coordinator считает новый план распределения и говорит каждому члену, какие конкретно его партиции должны переехать. Член вызывает OnPartitionsRevoked только для них.
  3. Coordinator завершает первый раунд, отдаёт уже сдавшиеся партиции тем, кому они теперь принадлежат.
  4. Если план ещё не финализирован — второй раунд.

Партиции, которые остаются у того же члена, не отзываются вообще. Если у тебя на члене 100 партиций и переезжает 1 — ты потеряешь 1 и отдашь её другому, остальные 99 продолжат читаться без перерыва. Лаг по ним не вырастает.

Цена кооперативного протокола — два round-trip'а вместо одного и сложнее реализация на стороне клиента. На практике это окупается: на больших группах с долгой обработкой eager-ребаланс убивает SLO.

В одной группе нельзя смешивать eager и cooperative. Это разные SyncGroup-протоколы; coordinator выберет тот, который заявили все, и если хоть один член заявил eager — все откатываются на eager. Поэтому миграция со старого протокола на cooperative делается через rolling restart с временным двойным набором (sticky + cooperative-sticky одновременно, потом убираешь sticky).

Тайминги: heartbeat, session timeout, max poll interval

Координатор должен как-то понимать, что член ещё жив. Для этого член отправляет heartbeat каждые HeartbeatInterval (на проводе — heartbeat.interval.ms). franz-go делает это автоматически в фоне, отдельной горутиной — приложению ничего вызывать не нужно.

Если coordinator не получил heartbeat'а от члена дольше, чем SessionTimeout (session.timeout.ms), он объявляет члена мёртвым и начинает ребаланс без него. Дефолты franz-go v1.21.0: SessionTimeout = 45 секунд (Kafka 3.0+ нормализовался на этом значении после KIP-735), HeartbeatInterval = 3 секунды (pkg/kgo/config.go:641-643). То есть надо пропустить ~15 heartbeat'ов подряд, чтобы тебя выкинули. Это запас на сетевые задержки и паузы GC — короткий блип сети не должен приводить к ребалансу.

Третий тайминг — RebalanceTimeout (rebalance.timeout.ms). Дефолт franz-go — 60 секунд (config.go:642). Это окно, в течение которого все члены группы должны успеть отдать партиции, докоммитить offset'ы и переподписаться после JoinGroup. Если член не успел рейджойнить за это время, coordinator считает его выпавшим и продолжает ребаланс без него.

В Java-клиенте есть отдельная ручка max.poll.interval.ms (дефолт 5 минут), которая на клиентской стороне тоже отслеживает паузу между poll() и при превышении заставляет консьюмера самостоятельно выйти из группы. На проводе она едет в том же поле JoinGroupRequest, что и rebalance.timeout.ms. franz-go такой клиентской автоэвакуации не делает: heartbeat-горутина продолжает стучать, даже если обработка одной записи занимает полчаса. Получишь проблему только если в этот момент произойдёт ребаланс — тогда из-за зависшего обработчика клиент не успеет рейджойнить за RebalanceTimeout, и брокер исключит его из группы.

Соотношение трёх таймеров на словах:

  • HeartbeatInterval — как часто стучусь, что я живой;
  • SessionTimeout — насколько долго брокер ждёт стук;
  • RebalanceTimeout — насколько долго брокер ждёт, что я закончу свою часть ребаланса и переподпишусь.

Если боишься, что зомбированный процесс будет долго висеть в группе и не отдавать партиции — снижай SessionTimeout. Если боишься false positives на сетевых блипах — поднимай SessionTimeout. Если у тебя обработчик может реально удерживать партицию минутами и при этом ты хочешь, чтобы ребаланс прошёл чисто — поднимай RebalanceTimeout (тут уже франза-специфика: клиент ничего не оборвёт, но коммит в OnPartitionsRevoked обязан успеть).

Что показывает наш код

Бинарник один — cmd/loud-member. Запускаешь несколько копий в одной группе, каждая печатает событие ребаланса и текущий список ассайнов. Никакой полезной работы — только наблюдение за тем, как Kafka двигает партиции по копиям.

Идентификация копий — через переменную окружения MEMBER_ID. Она попадает в ClientID, в InstanceID, в каждый printEvent, в стартовый банер процесса:

go
opts := []kgo.Opt{
    kgo.ConsumerGroup(o.group),
    kgo.ConsumeTopics(o.topic),
    kgo.Balancers(balancer),
    kgo.HeartbeatInterval(o.heartbeat),
    kgo.SessionTimeout(o.sessionTimeout),
    kgo.ClientID(fmt.Sprintf("lecture-03-01-loud-%s", o.memberID)),
    kgo.InstanceID(fmt.Sprintf("loud-member-%s", o.memberID)),
    kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
    kgo.OnPartitionsAssigned(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
        printEvent(o.memberID, "ASSIGNED", m)
    }),
    kgo.OnPartitionsRevoked(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
        printEvent(o.memberID, "REVOKED", m)
    }),
    kgo.OnPartitionsLost(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
        printEvent(o.memberID, "LOST", m)
    }),
}

Тут пара мест, на которые стоит посмотреть. InstanceID — опциональный «статический» member id (group.instance.id в спеке). Если он задан, при кратковременном отвале (рестарт пода в k8s, например) coordinator не будет торопиться объявлять члена мёртвым: партиции вернутся ему же, как только он переподключится. Без InstanceID каждый рестарт = новый член = ребаланс. Для учебной демки сюда же забили имя по MEMBER_ID — оно совпадает между запусками, и coordinator это видит.

Хука три:

  1. OnPartitionsAssigned — coordinator выдал тебе новый набор партиций.
  2. OnPartitionsRevoked — coordinator штатно забирает у тебя часть (или все) партиции в рамках планового ребаланса. До завершения ребаланса ты ещё член группы и можешь докоммитить offset'ы — это безопасное место для финального commit'а.
  3. OnPartitionsLost — отличается от Revoked ровно одним: его дёргают на «фатальных» групповых ошибках (IllegalGeneration, UnknownMemberID, провал аутентификации, истёкший session.timeout). Партиции уже потеряны, ты больше не их владелец. Коммит здесь почти наверняка отвергнет coordinator — на это и завязана разница с OnPartitionsRevoked, где коммитить ещё можно и нужно.

Стратегия выбирается флагом -strategy:

go
func pickBalancer(name string) (kgo.GroupBalancer, error) {
    switch strings.ToLower(strings.TrimSpace(name)) {
    case "sticky":
        return kgo.StickyBalancer(), nil
    case "cooperative-sticky", "cooperative", "coop":
        return kgo.CooperativeStickyBalancer(), nil
    default:
        return nil, fmt.Errorf("unknown strategy %q (поддерживаем sticky | cooperative-sticky)", name)
    }
}

Каждый запуск — один балансер. Несколько одновременно в одной группе нельзя (см. eager vs cooperative выше). По умолчанию cooperative-sticky — потому что хочется наблюдать incremental-ребаланс, в нём виднее разница между «эту партицию сдаём» и «эту оставляем».

Сам цикл чтения тривиальный — PollFetches, печать партиции/ключа/значения. Запись — для антуража: чтобы было видно, что член не просто стоит и слушает, но реально качает данные.

Что наблюдать

Топик у нас на 6 партиций (make topic-create-6p). Запусков три, в трёх терминалах:

  • Терминал 1: make run-1. Стартует первая копия, MEMBER_ID=1. JoinGroup → SyncGroup, всё за пару сотен миллисекунд. В выводе видно ASSIGNED: lecture-03-01-groups=[0 1 2 3 4 5] — все 6 партиций его. Если до этого был make seed, начинает читать.

  • Терминал 2: make run-2. Вторая копия. В терминале 1 — REVOKED: lecture-03-01-groups=[3 4 5], в терминале 2 — ASSIGNED: lecture-03-01-groups=[3 4 5]. Группа балансируется пополам. На cooperative первый член отзывает только то, что переезжает, не все 6. На eager (sticky) — отзовёт все 6 и получит обратно [0 1 2].

  • Терминал 3: make run-3. Третья копия. Итог — 2/2/2 распределение. На cooperative первые два терминала видят REVOKED только на ту партицию, которая уехала к третьему члену.

  • Ctrl+C на терминале 2. В терминале 1 — ASSIGNED дополнительно, в терминале 3 — ASSIGNED дополнительно. Партиции ушедшего второго распределились между оставшимися. Cooperative гладко: терминалы 1 и 3 не теряют свои текущие партиции, только добавляют новые.

  • kill -9 второго (без Ctrl+C). Тут интересно: процесс убит резко, ничего не успевает корректно сдать. Coordinator ждёт session.timeout (по дефолту в нашем коде — 30 секунд), потом начинает ребаланс. До истечения таймера группа в нормальном состоянии, партиции убитого никто не читает, лаг по ним растёт. Это и есть «цена внезапной смерти» — секунды простоя на эти партиции.

Чтобы убить ровно нужный MEMBER_ID без угадывания PID:

sh
pgrep -f 'loud-member.*MEMBER_ID=2' | xargs kill -9

Что взять с собой

  • Consumer group — это просто общий group.id. Внутри группы каждая партиция читается ровно одним консьюмером.
  • Rebalance — перерасклад партиций. Триггеры: вход/выход члена, расширение топика, session timeout, ручной триггер админа.
  • Стратегий несколько, по умолчанию ставь cooperative-sticky. Sticky без cooperative — eager-протокол с stop-the-world ребалансом.
  • В одной группе все члены должны заявить совместимый ребаланс-протокол. Cooperative + eager → группа схлопнется до eager.
  • Тайминги ребаланса в franz-go — три независимые ручки: HeartbeatInterval (дефолт 3 с, «как часто стучусь»), SessionTimeout (45 с, «сколько брокер ждёт стук»), RebalanceTimeout (60 с, «сколько брокер ждёт, что я закончу свою часть ребаланса»). Java-клиентский max.poll.interval.ms с автоэвакуацией клиента из группы в franz-go отсутствует — длинная обработка не приведёт к самоэвикции, проблема всплывёт только при одновременном ребалансе.
  • InstanceID (group.instance.id) даёт статическую идентификацию: при рестарте партиции вернутся тому же логическому члену, без ребаланса.
  • OnPartitionsLost — отдельный хук на «выкинут coordinator'ом по таймауту». В нём нельзя коммитить offset'ы, в OnPartitionsRevoked — можно.

В следующей лекции (Коммиты offset'ов) спустимся на уровень offset-коммитов: разберём auto-commit и его дубли при рестарте, manual sync/async, MarkCommitRecords + CommitMarkedOffsets и пару других ручек. Тут мы offset'ы трогали по касательной — пора разобраться предметно.

Запуск

Стенд из корня репозитория должен быть поднят (docker compose up -d).

sh
make topic-create-6p     # создать топик с 6 партициями
make seed                # залить 60 сообщений (опционально, чтобы было что читать)

Дальше в трёх терминалах:

sh
# терминал 1
make run-1
 
# терминал 2 (запускаешь, когда первый уже работает)
make run-2
 
# терминал 3
make run-3

Поэкспериментировать с eager-протоколом:

sh
make run-eager           # MEMBER_ID=1 + sticky
# во втором терминале:
STRATEGY=sticky make run-2

Посмотреть состояние группы со стороны брокера (полезно сравнить с тем, что напечатали хуки):

sh
make group-describe

Сбросить committed offset'ы (после этого следующий run-* опять читает с earliest):

sh
make group-delete
·Модуль 03

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Консьюмер / Группы и ребалансы