Kafka CookbookОсновыРепликация и ISR
0 / 42 (0%)

Репликация и ISR

В предыдущей лекции Brew пересоздал топик brew.orders.v1 с тремя партициями и запустил inspect. На каждой строке вывода были три колонки:

plaintext
PARTITION  LEADER  REPLICAS  ISR      OFFLINE
0          2       [2 3 1]   [2 3 1]  -
1          3       [3 1 2]   [3 1 2]  -
...

LEADER тогда разобрали - это тот брокер, через который идёт запись в партицию. REPLICAS и ISR я махнул рукой: «про это будет 01-03». Сейчас как раз 01-03.

Тема разделится надвое. Сначала разберёмся, зачем вообще нужны эти три колонки и что они защищают. Потом запустим утилиту watch-isr и руками уроним брокер - увидим, как ISR сжимается, как восстанавливается и в какой момент acks=all падает с NotEnoughReplicas.

Зачем Brew репликация

Партиция - это файл на диске одного брокера. Если этот брокер упал, файла нет. Сообщения, которые в нём лежали, прочитать нельзя. Для brew.orders.v1 это значит: пропали заказы за период от последнего бэкапа до момента падения. На промо «бесплатный кофе по пятницам» под нагрузкой 8000 заказов в минуту - это тысячи потерянных платежей и злых клиентов в чате поддержки.

Решение очевидное: держать копии. Каждая партиция хранится сразу на нескольких брокерах. Это и есть репликация. Сколько копий держать - задаётся параметром replication.factor (RF) на уровне топика. На стенде Brew дефолт KAFKA_DEFAULT_REPLICATION_FACTOR=3, три ноды как раз позволяют хранить три копии каждой партиции - по одной на ноду.

Из RF растут две гарантии:

  • Доступность. При RF=3 можно потерять один-два брокера и продолжать читать и писать (зависит от настроек).
  • Сохранность. Сообщение записано - значит, оно уже на нескольких дисках, а не висит в RAM одной ноды.

Репликация не работает бесплатно. Каждое сообщение по сети летит RF раз, на дисках занимает RF×размер. На прод-кластере с трафиком Brew это уже считаемые деньги: при RF=3 счёт на диски и сеть утраивается. Поэтому RF=3 это здравый дефолт. RF=5 берут под критичные топики типа brew.payments.v1 в банковских системах. Выше - почти никогда не нужно.

Для Brew с тремя нодами RF=3 это потолок: пятая реплика просто негде разместиться. Если бы кластер расширили до пяти нод, можно было бы поднять RF до 5 для платёжного топика, оставив brew.orders.v1 на RF=3 - на топиках разные значения уживаются спокойно.

Leader, follower, replica

У каждой партиции есть один LEADER и остальные - followers. Все они одинаково лежат на дисках своих брокеров; разница только в роли.

Leader делает всю работу:

  1. Принимает запись от продьюсера (продьюсер всегда пишет в leader, не во follower).
  2. Записывает себе в локальный лог.
  3. Раздаёт followers по сети.
  4. Отвечает на запросы консьюмеров.

Followers просто реплицируют - тянут от leader'а свежие записи и сохраняют у себя. Они не отвечают на запросы продьюсеров и обычно не отвечают на запросы консьюмеров (для multi-DC есть Fetch From Follower, но это отдельная история - для одного дата-центра она не пригодится).

Leader выбирается контроллером (этого парня встретили в Архитектуре и KRaft, он живёт на одной из нод KRaft-кворума). Если leader партиции упал, контроллер выбирает нового из живых followers, и роль переходит. Об этом узнают и продьюсер, и консьюмер - оба перевыбирают leader'а на лету через metadata-refresh, без перезапуска приложения.

Когда order-service Brew пишет OrderPlaced в партицию 0 топика brew.orders.v1, под капотом происходит вот что. Клиент франца смотрит свой metadata-кеш, видит «партиция 0 - leader=2», открывает соединение с kafka-2, отправляет туда Produce. Если kafka-2 неожиданно упала, первый запрос вернёт ошибку «not leader», клиент сходит за свежими metadata, узнает нового leader'а (например, kafka-3) и повторит запрос туда. Для приложения это две лишних миллисекунды задержки на одном сообщении, не больше.

ISR - кто из followers «в синхроне»

Тут начинается мякотка. Followers тянут данные от leader'а асинхронно. Один follower может отстать на сотни миллисекунд, другой - лежит и не тянет ничего, пока его не починят. Какие из них считать живыми, а какие - отставшими?

Для этого есть множество ISR - In-Sync Replicas. Это подмножество REPLICAS, в котором лежат те, кто:

  • читал у leader'а свежие данные не дольше, чем replica.lag.time.max.ms (дефолт 30 секунд);
  • успевал догнать leader'а до этой границы по offset'у.

Если follower завис, отстал по сети, или его процесс рестартует - он выпадает из ISR. Это происходит с задержкой replica.lag.time.max.ms: пока таймер тикает, follower считается живым; когда таймер вышел, отчисляется. Когда вернётся в строй, догонит leader'а до конца лога и снова войдёт в ISR. Это естественное состояние: ISR живёт и дышит, постоянно пересчитывается контроллером.

Аналогия для бэкендера. ISR это что-то вроде healthcheck-пула в load balancer'е. У ноды есть таймаут на heartbeat: пропустил два подряд - вылетел из пула, перестал получать трафик. Вернулся, прошёл проверку - вошёл обратно. С тем отличием, что в Kafka «healthcheck» проверяет одно: успеваешь ли ты копировать данные за replica.lag.time.max.ms. Пинги тут роли не играют, доступность по сети без копирования - повод вылететь из ISR.

Важная штука. Только реплики из ISR могут стать новым leader'ом при failover'е (если выключен unclean.leader.election, а в Kafka 4.x он выключен по умолчанию - стенд Brew этот дефолт не переопределяет). Это значит - данные не потеряются: новый leader точно знает все сообщения, которые подтвердил старый leader. Ровно поэтому продьюсер с acks=all ждёт записи именно в ISR, реплики за его границами в кворуме не учитываются.

Кстати про admin-чат Brew. На прошлой неделе kafka-2 ночью ушла в обслуживание, бариста утром заметили только потому, что пришло уведомление «under-replicated partitions = 12, ISR=2/3». Кластер при этом работал: ISR=2 при min.insync.replicas=2 это всё ещё штатная работа. Восстановили kafka-2 за десять минут, реплики догнались - и under-replicated обнулилось. Никто из клиентов кофеен ничего не заметил.

min.insync.replicas - порог записи

Без этого параметра RF только половина гарантии. Он отвечает на вопрос: сколько ISR-реплик должно подтвердить запись, чтобы продьюсер с acks=all получил OK.

На стенде Brew KAFKA_MIN_INSYNC_REPLICAS=2 - это значит:

  • ISR=3 - запись с acks=all нормально подтверждается, всё хорошо.
  • ISR=2 - тоже подтверждается, кластер работает в «уменьшенном» режиме, но запись идёт.
  • ISR=1 - запись с acks=all падает с NotEnoughReplicas. Продьюсер ретраит (вдруг ISR вернётся в норму) - в franz-go по умолчанию RecordRetries и RecordDeliveryTimeout не ограничены, без явных лимитов попытки уходят вверх по экспоненте без потолка. Чтение при этом всё ещё работает.
  • ISR=0 - партиция офлайн целиком: ни записи, ни чтения. Этот случай редкий, обычно означает, что упал весь кластер; для Brew это уровень «звонят CTO».

Сочетание RF=3 + min.insync.replicas=2 + acks=all это стандартная durable-конфигурация. Можно потерять один брокер и продолжать писать. Двух - уже нельзя писать с durability-гарантией (только без неё, через acks=1 или acks=0, но тогда привет потери). Про сам параметр acks будет подробно в First producer, пока достаточно знать: acks=all означает «дождись записи на все реплики из ISR».

min.insync.replicas хранится на топике (или на брокере как дефолт). Под разные топики Brew может ставить разное:

  • brew.orders.v1 - min.insync=2, потому что потерять заказ это потерять выручку.
  • brew.payments.v1 - min.insync=2 (или даже 3, если хочется блокировать запись при любой деградации - финансовая совесть).
  • brew.kitchen.v1 - min.insync=2, кухонные события важны для оперативной работы баристы, потеря приводит к «забытым» заказам.
  • brew.telemetry.v1 - min.insync=1, чтобы метрики продолжали течь даже при двух упавших нодах. Кому нужна метрика об упавшем кластере, если сама метрика тоже не пишется?

Как это выглядит на стенде

plaintext
              partition: brew.orders.v1-0
              RF=3, min.insync.replicas=2
 
   |- kafka-1 (id=1) --- replica  -|
   |                               |
   |- kafka-2 (id=2) --- LEADER  --|-- ISR={1,2,3}  acks=all OK
   |                               |
   |- kafka-3 (id=3) --- replica  -|
 
 
   stop kafka-2 -> leader сваливает на kafka-3 (новый leader)
   replica id=2 выпадает из ISR через ~30s
 
   |- kafka-1 (id=1) --- replica  -|
   |                               |
   |- kafka-2 (down)               |-- ISR={1,3}    acks=all OK (2 из 3)
   |                               |
   |- kafka-3 (id=3) --- LEADER ---|   under-replicated = yes
 
 
   start kafka-2 -> догоняет, через ~5-30s возвращается в ISR
 
   |- kafka-1 (id=1) --- replica  -|
   |                               |
   |- kafka-2 (id=2) --- replica  -|-- ISR={1,2,3}  восстановлено
   |                               |
   |- kafka-3 (id=3) --- LEADER ---|

Кто именно стал новым leader'ом - зависит от того, какая из реплик в ISR была первой в Replicas-списке (preferred leader logic). Точные числа у тебя получатся другие. Главное, что схема та же.

Сценарий, который воспроизведём

Запускаем make run - программа создаёт топик brew.orders.v1 с RF=3 идемпотентно, дальше каждые 2 секунды печатает таблицу:

plaintext
[16:42:11]
PARTITION  LEADER  REPLICAS  ISR      UNDER-REPLICATED
0          2       [1 2 3]   [1 2 3]  no
1          3       [1 2 3]   [1 2 3]  no
2          1       [1 2 3]   [1 2 3]  no
---

ISR полный, leader на каждой ноде свой (контроллер раскидал лидерство по preferred-replica), под-репликации нет. Состояние «всё хорошо».

В соседнем терминале:

sh
make kill-broker

Это docker stop kafka-2. Первые тики watch-isr ещё покажут ISR=[1 2 3] на всех трёх партициях - брокер уже не отвечает, но лидер ещё держит его в ISR. Через replica.lag.time.max.ms (дефолт 30 секунд) лидер каждой партиции замечает, что id=2 давно не присылал fetch, и убирает его из ISR. Срабатывает для всех партиций примерно одновременно - таймер общий, потому что kafka-2 перестал слать fetch'и сразу для всех. Через ~30 секунд после docker stop watch-isr покажет:

plaintext
[16:42:51]
PARTITION  LEADER  REPLICAS  ISR    UNDER-REPLICATED
0          1       [1 2 3]   [1 3]  yes (missing [2])
1          3       [1 2 3]   [1 3]  yes (missing [2])
2          1       [1 2 3]   [1 3]  yes (missing [2])
---

Что произошло. Партиция 0 раньше имела leader=2 - а 2 упал. Контроллер выбрал нового leader'а из ISR (id=1), запись в неё продолжилась без даунтайма. Партиция 1 сразу была на leader=3, ей вообще ничего не нужно было делать. Партиция 2 жила на leader=1 - тоже без переключения. Под капотом успели произойти leader-election'ы и metadata-refresh у всех клиентов; в наших логах это не видно, но колонка LEADER показывает текущую правду.

Под-репликации видно у всех трёх партиций. Кластер всё ещё рабочий: min.insync.replicas=2, ISR=2 - порог достигнут, order-service пишет заказы как ни в чём не бывало. Но запас прочности съеден. Ещё одна нода вниз - и acks=all начнёт возвращать NotEnoughReplicas.

Возвращаем брокера:

sh
make restore-broker

Через несколько секунд видно, как id=2 догоняет leader'а и возвращается в ISR. Если ничего не писалось во время downtime, догон мгновенный (нечего догонять). Если писалось - пропорционально объёму. После полного догона:

plaintext
[16:43:25]
PARTITION  LEADER  REPLICAS  ISR      UNDER-REPLICATED
0          1       [1 2 3]   [1 2 3]  no
1          3       [1 2 3]   [1 2 3]  no
2          1       [1 2 3]   [1 2 3]  no
---

Внимание - leader'ы остались такие, какие были после failover'а. По умолчанию контроллер не перевыбирает leader обратно на «исторически правильную» ноду; для этого есть auto.leader.rebalance.enable и периодический leader rebalance, но выполняется он с задержкой. Поведение намеренное - экономия ресурсов на лишнем переключении. На прод-кластерах админы запускают kafka-leader-election.sh --election-type preferred руками или ждут авто-балансировки.

Когда ISR теряется полностью

Допустим, у Brew одновременно лягут kafka-2 и kafka-3. Останется один брокер. Сам по себе один узел держать durability с min.insync=2 не может - упёрлись.

plaintext
ISR={1}     min.insync.replicas=2     ->     запись acks=all -> NotEnoughReplicas

Что произойдёт:

  • Запись acks=all упирается в NOT_ENOUGH_REPLICAS. В franz-go это retryable-ошибка, и по умолчанию kgo.RecordRetries и kgo.RecordDeliveryTimeout равны бесконечности - продьюсер будет ретраить, пока ISR не вернётся в норму, и клиент order-service зависнет на отправке. Чтобы получить «не удалось принять заказ, попробуйте ещё раз» с понятным таймаутом, нужно явно ограничить попытки или дедлайн (например, kgo.RecordDeliveryTimeout(5*time.Second) на проде). kgo.RequestRetries тут не работает - его godoc прямо оговаривает, что он не применяется к produce-запросам.
  • Запись acks=1 всё ещё работает, но теряется durability на случай падения последнего leader'а.
  • Запись acks=0 улетает в одну сторону без подтверждения - продьюсер не узнает о потерянных сообщениях; для платёжного топика так писать нельзя.
  • Чтение работает, leader жив. kitchen-service может дочитать всё, что было раньше зафиксировано, и ничего не сломается.

Это и есть смысл min.insync.replicas. Kafka не делает вид, что всё хорошо, когда оно не хорошо. Ты явно говоришь: мне нужно минимум столько-то реплик. Меньше - стоп, не пишем. Лучше получить ошибку и алерт, чем потерять данные при следующем сбое.

В реальных инцидентах Brew логика такая. Если упала одна нода - алерт «under-replicated», on-call идёт чинить без спешки, клиенты не страдают. Если упали две - алерт «producers failing acks=all», on-call идёт чинить срочно, потому что order-service уже начал возвращать 503. Третий уровень («кластер вообще лёг») за пределами этой лекции - там уже план DR, переключение трафика на резервный регион и прочая инфраструктурная история.

Что показывает наш код

cmd/watch-isr/main.go делает три вещи. Создаёт топик идемпотентно через admin.CreateTopic (если он уже есть - использует его, не пересоздаёт; в этой лекции пересоздание мешает наблюдать состояние). Запускает таймер с заданным -interval. На каждом тике дёргает admin.ListTopics(ctx, topic) и печатает Partitions.Sorted().

Колонка UNDER-REPLICATED - это len(p.ISR) < len(p.Replicas). Когда yes - мы знаем, что какие-то реплики выпали; функция missing находит конкретные id'ы для подсказки.

Сам цикл наблюдения - обычный тикер с проверкой контекста:

go
t := time.NewTicker(interval)
defer t.Stop()
 
if err := tick(ctx, admin, topic); err != nil { ... }
for {
    select {
    case <-ctx.Done():
        return nil
    case <-t.C:
        if err := tick(ctx, admin, topic); err != nil {
            // Не выходим по разовой ошибке metadata: если упал брокер,
            // клиент сам переключится на живого. Просто логируем и
            // продолжаем - иначе watch-isr теряет смысл при failover'е.
            fmt.Fprintf(os.Stderr, "tick failed: %v\n", err)
        }
    }
}

Один тик - это ListTopics плюс печать. Логика under-replicated буквально сравнение длин:

go
for _, p := range td.Partitions.Sorted() {
    under := "no"
    if len(p.ISR) < len(p.Replicas) {
        under = fmt.Sprintf("yes (missing %v)", missing(p.Replicas, p.ISR))
    }
    fmt.Fprintf(tw, "%d\t%d\t%v\t%v\t%s\n",
        p.Partition, p.Leader, p.Replicas, p.ISR, under)
}

Функция missing ищет реплики, которые есть в Replicas, но отсутствуют в ISR - это и есть отставшие узлы:

go
func missing(replicas, isr []int32) []int32 {
    in := make(map[int32]struct{}, len(isr))
    for _, id := range isr {
        in[id] = struct{}{}
    }
    out := make([]int32, 0, len(replicas)-len(isr))
    for _, id := range replicas {
        if _, ok := in[id]; !ok {
            out = append(out, id)
        }
    }
    return out
}

Одна важная деталь. Ошибка ListTopics на тике не валит цикл. Если упадёт брокер, к которому был подключён клиент, franz-go сам перевыберет seed broker - но один-два запроса между этим могут не пройти. Если бы мы выходили по первой же ошибке, watch-isr закрывался бы ровно в момент падения брокера, то есть в самый интересный момент. Поэтому ошибки логируются и цикл продолжается.

Запуск

Стенд должен быть поднят (docker compose up -d из корня репо).

sh
make run

В отдельном терминале:

sh
make kill-broker     # положили kafka-2
make restore-broker  # подняли обратно

Можно положить любую другую ноду:

sh
make kill-broker BROKER=kafka-3
make restore-broker BROKER=kafka-3

Сравнить с CLI:

sh
make topic-describe

Получится тот же ISR, что в watch-isr, только в нативном для shell-скриптов виде - Leader: 1 Replicas: 1,2,3 Isr: 1,3. Идея та же, что в Топики и партиции: admin.ListTopics отдаёт всё, что нужно, без вызовов shell.

Убрать топик:

sh
make topic-delete

Что узнал

  • Репликация это копии партиции на нескольких брокерах. RF задаётся на топике. На стенде Brew по умолчанию RF=3.
  • У каждой партиции один leader; продьюсеры пишут только в leader, followers тянут от него.
  • ISR это followers, которые «в синхроне» (отстают не больше replica.lag.time.max.ms). Только из ISR можно выбрать нового leader'а при failover'е (с выключенным unclean.leader.election).
  • min.insync.replicas задаёт порог: сколько ISR-реплик должны подтвердить запись с acks=all. На стенде Brew это 2.
  • RF=3 + min.insync.replicas=2 + acks=all стандартная durable-конфигурация. Терпит падение одной ноды. На двух ISR падает в 1, и acks=all начинает возвращать NotEnoughReplicas.
  • admin.ListTopics показывает всё, что нужно для наблюдения за ISR. Никаких shell-скриптов для этого не нужно.

Дальше (Offsets и retention) посмотрим, как сообщения вообще живут во времени. Разберём offset, log end offset, HWM и retention. Заодно поймём, почему «у нас сообщения хранятся ровно N дней» - фраза с двойным дном.

·Модуль 01

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Основы / Репликация и ISR