Репликация и ISR
В предыдущей лекции Brew пересоздал топик brew.orders.v1 с тремя партициями и запустил inspect. На каждой строке вывода были три колонки:
PARTITION LEADER REPLICAS ISR OFFLINE
0 2 [2 3 1] [2 3 1] -
1 3 [3 1 2] [3 1 2] -
...LEADER тогда разобрали - это тот брокер, через который идёт запись в партицию. REPLICAS и ISR я махнул рукой: «про это будет 01-03». Сейчас как раз 01-03.
Тема разделится надвое. Сначала разберёмся, зачем вообще нужны эти три колонки и что они защищают. Потом запустим утилиту watch-isr и руками уроним брокер - увидим, как ISR сжимается, как восстанавливается и в какой момент acks=all падает с NotEnoughReplicas.
Зачем Brew репликация
Партиция - это файл на диске одного брокера. Если этот брокер упал, файла нет. Сообщения, которые в нём лежали, прочитать нельзя. Для brew.orders.v1 это значит: пропали заказы за период от последнего бэкапа до момента падения. На промо «бесплатный кофе по пятницам» под нагрузкой 8000 заказов в минуту - это тысячи потерянных платежей и злых клиентов в чате поддержки.
Решение очевидное: держать копии. Каждая партиция хранится сразу на нескольких брокерах. Это и есть репликация. Сколько копий держать - задаётся параметром replication.factor (RF) на уровне топика. На стенде Brew дефолт KAFKA_DEFAULT_REPLICATION_FACTOR=3, три ноды как раз позволяют хранить три копии каждой партиции - по одной на ноду.
Из RF растут две гарантии:
- Доступность. При RF=3 можно потерять один-два брокера и продолжать читать и писать (зависит от настроек).
- Сохранность. Сообщение записано - значит, оно уже на нескольких дисках, а не висит в RAM одной ноды.
Репликация не работает бесплатно. Каждое сообщение по сети летит RF раз, на дисках занимает RF×размер. На прод-кластере с трафиком Brew это уже считаемые деньги: при RF=3 счёт на диски и сеть утраивается. Поэтому RF=3 это здравый дефолт. RF=5 берут под критичные топики типа brew.payments.v1 в банковских системах. Выше - почти никогда не нужно.
Для Brew с тремя нодами RF=3 это потолок: пятая реплика просто негде разместиться. Если бы кластер расширили до пяти нод, можно было бы поднять RF до 5 для платёжного топика, оставив brew.orders.v1 на RF=3 - на топиках разные значения уживаются спокойно.
Leader, follower, replica
У каждой партиции есть один LEADER и остальные - followers. Все они одинаково лежат на дисках своих брокеров; разница только в роли.
Leader делает всю работу:
- Принимает запись от продьюсера (продьюсер всегда пишет в leader, не во follower).
- Записывает себе в локальный лог.
- Раздаёт followers по сети.
- Отвечает на запросы консьюмеров.
Followers просто реплицируют - тянут от leader'а свежие записи и сохраняют у себя. Они не отвечают на запросы продьюсеров и обычно не отвечают на запросы консьюмеров (для multi-DC есть Fetch From Follower, но это отдельная история - для одного дата-центра она не пригодится).
Leader выбирается контроллером (этого парня встретили в Архитектуре и KRaft, он живёт на одной из нод KRaft-кворума). Если leader партиции упал, контроллер выбирает нового из живых followers, и роль переходит. Об этом узнают и продьюсер, и консьюмер - оба перевыбирают leader'а на лету через metadata-refresh, без перезапуска приложения.
Когда order-service Brew пишет OrderPlaced в партицию 0 топика brew.orders.v1, под капотом происходит вот что. Клиент франца смотрит свой metadata-кеш, видит «партиция 0 - leader=2», открывает соединение с kafka-2, отправляет туда Produce. Если kafka-2 неожиданно упала, первый запрос вернёт ошибку «not leader», клиент сходит за свежими metadata, узнает нового leader'а (например, kafka-3) и повторит запрос туда. Для приложения это две лишних миллисекунды задержки на одном сообщении, не больше.
ISR - кто из followers «в синхроне»
Тут начинается мякотка. Followers тянут данные от leader'а асинхронно. Один follower может отстать на сотни миллисекунд, другой - лежит и не тянет ничего, пока его не починят. Какие из них считать живыми, а какие - отставшими?
Для этого есть множество ISR - In-Sync Replicas. Это подмножество REPLICAS, в котором лежат те, кто:
- читал у leader'а свежие данные не дольше, чем
replica.lag.time.max.ms(дефолт 30 секунд); - успевал догнать leader'а до этой границы по offset'у.
Если follower завис, отстал по сети, или его процесс рестартует - он выпадает из ISR. Это происходит с задержкой replica.lag.time.max.ms: пока таймер тикает, follower считается живым; когда таймер вышел, отчисляется. Когда вернётся в строй, догонит leader'а до конца лога и снова войдёт в ISR. Это естественное состояние: ISR живёт и дышит, постоянно пересчитывается контроллером.
Аналогия для бэкендера. ISR это что-то вроде healthcheck-пула в load balancer'е. У ноды есть таймаут на heartbeat: пропустил два подряд - вылетел из пула, перестал получать трафик. Вернулся, прошёл проверку - вошёл обратно. С тем отличием, что в Kafka «healthcheck» проверяет одно: успеваешь ли ты копировать данные за replica.lag.time.max.ms. Пинги тут роли не играют, доступность по сети без копирования - повод вылететь из ISR.
Важная штука. Только реплики из ISR могут стать новым leader'ом при failover'е (если выключен unclean.leader.election, а в Kafka 4.x он выключен по умолчанию - стенд Brew этот дефолт не переопределяет). Это значит - данные не потеряются: новый leader точно знает все сообщения, которые подтвердил старый leader. Ровно поэтому продьюсер с acks=all ждёт записи именно в ISR, реплики за его границами в кворуме не учитываются.
Кстати про admin-чат Brew. На прошлой неделе kafka-2 ночью ушла в обслуживание, бариста утром заметили только потому, что пришло уведомление «under-replicated partitions = 12, ISR=2/3». Кластер при этом работал: ISR=2 при min.insync.replicas=2 это всё ещё штатная работа. Восстановили kafka-2 за десять минут, реплики догнались - и under-replicated обнулилось. Никто из клиентов кофеен ничего не заметил.
min.insync.replicas - порог записи
Без этого параметра RF только половина гарантии. Он отвечает на вопрос: сколько ISR-реплик должно подтвердить запись, чтобы продьюсер с acks=all получил OK.
На стенде Brew KAFKA_MIN_INSYNC_REPLICAS=2 - это значит:
- ISR=3 - запись с
acks=allнормально подтверждается, всё хорошо. - ISR=2 - тоже подтверждается, кластер работает в «уменьшенном» режиме, но запись идёт.
- ISR=1 - запись с
acks=allпадает сNotEnoughReplicas. Продьюсер ретраит (вдруг ISR вернётся в норму) - в franz-go по умолчаниюRecordRetriesиRecordDeliveryTimeoutне ограничены, без явных лимитов попытки уходят вверх по экспоненте без потолка. Чтение при этом всё ещё работает. - ISR=0 - партиция офлайн целиком: ни записи, ни чтения. Этот случай редкий, обычно означает, что упал весь кластер; для Brew это уровень «звонят CTO».
Сочетание RF=3 + min.insync.replicas=2 + acks=all это стандартная durable-конфигурация. Можно потерять один брокер и продолжать писать. Двух - уже нельзя писать с durability-гарантией (только без неё, через acks=1 или acks=0, но тогда привет потери). Про сам параметр acks будет подробно в First producer, пока достаточно знать: acks=all означает «дождись записи на все реплики из ISR».
min.insync.replicas хранится на топике (или на брокере как дефолт). Под разные топики Brew может ставить разное:
brew.orders.v1- min.insync=2, потому что потерять заказ это потерять выручку.brew.payments.v1- min.insync=2 (или даже 3, если хочется блокировать запись при любой деградации - финансовая совесть).brew.kitchen.v1- min.insync=2, кухонные события важны для оперативной работы баристы, потеря приводит к «забытым» заказам.brew.telemetry.v1- min.insync=1, чтобы метрики продолжали течь даже при двух упавших нодах. Кому нужна метрика об упавшем кластере, если сама метрика тоже не пишется?
Как это выглядит на стенде
partition: brew.orders.v1-0
RF=3, min.insync.replicas=2
|- kafka-1 (id=1) --- replica -|
| |
|- kafka-2 (id=2) --- LEADER --|-- ISR={1,2,3} acks=all OK
| |
|- kafka-3 (id=3) --- replica -|
stop kafka-2 -> leader сваливает на kafka-3 (новый leader)
replica id=2 выпадает из ISR через ~30s
|- kafka-1 (id=1) --- replica -|
| |
|- kafka-2 (down) |-- ISR={1,3} acks=all OK (2 из 3)
| |
|- kafka-3 (id=3) --- LEADER ---| under-replicated = yes
start kafka-2 -> догоняет, через ~5-30s возвращается в ISR
|- kafka-1 (id=1) --- replica -|
| |
|- kafka-2 (id=2) --- replica -|-- ISR={1,2,3} восстановлено
| |
|- kafka-3 (id=3) --- LEADER ---|Кто именно стал новым leader'ом - зависит от того, какая из реплик в ISR была первой в Replicas-списке (preferred leader logic). Точные числа у тебя получатся другие. Главное, что схема та же.
Сценарий, который воспроизведём
Запускаем make run - программа создаёт топик brew.orders.v1 с RF=3 идемпотентно, дальше каждые 2 секунды печатает таблицу:
[16:42:11]
PARTITION LEADER REPLICAS ISR UNDER-REPLICATED
0 2 [1 2 3] [1 2 3] no
1 3 [1 2 3] [1 2 3] no
2 1 [1 2 3] [1 2 3] no
---ISR полный, leader на каждой ноде свой (контроллер раскидал лидерство по preferred-replica), под-репликации нет. Состояние «всё хорошо».
В соседнем терминале:
make kill-brokerЭто docker stop kafka-2. Первые тики watch-isr ещё покажут ISR=[1 2 3] на всех трёх партициях - брокер уже не отвечает, но лидер ещё держит его в ISR. Через replica.lag.time.max.ms (дефолт 30 секунд) лидер каждой партиции замечает, что id=2 давно не присылал fetch, и убирает его из ISR. Срабатывает для всех партиций примерно одновременно - таймер общий, потому что kafka-2 перестал слать fetch'и сразу для всех. Через ~30 секунд после docker stop watch-isr покажет:
[16:42:51]
PARTITION LEADER REPLICAS ISR UNDER-REPLICATED
0 1 [1 2 3] [1 3] yes (missing [2])
1 3 [1 2 3] [1 3] yes (missing [2])
2 1 [1 2 3] [1 3] yes (missing [2])
---Что произошло. Партиция 0 раньше имела leader=2 - а 2 упал. Контроллер выбрал нового leader'а из ISR (id=1), запись в неё продолжилась без даунтайма. Партиция 1 сразу была на leader=3, ей вообще ничего не нужно было делать. Партиция 2 жила на leader=1 - тоже без переключения. Под капотом успели произойти leader-election'ы и metadata-refresh у всех клиентов; в наших логах это не видно, но колонка LEADER показывает текущую правду.
Под-репликации видно у всех трёх партиций. Кластер всё ещё рабочий: min.insync.replicas=2, ISR=2 - порог достигнут, order-service пишет заказы как ни в чём не бывало. Но запас прочности съеден. Ещё одна нода вниз - и acks=all начнёт возвращать NotEnoughReplicas.
Возвращаем брокера:
make restore-brokerЧерез несколько секунд видно, как id=2 догоняет leader'а и возвращается в ISR. Если ничего не писалось во время downtime, догон мгновенный (нечего догонять). Если писалось - пропорционально объёму. После полного догона:
[16:43:25]
PARTITION LEADER REPLICAS ISR UNDER-REPLICATED
0 1 [1 2 3] [1 2 3] no
1 3 [1 2 3] [1 2 3] no
2 1 [1 2 3] [1 2 3] no
---Внимание - leader'ы остались такие, какие были после failover'а. По умолчанию контроллер не перевыбирает leader обратно на «исторически правильную» ноду; для этого есть auto.leader.rebalance.enable и периодический leader rebalance, но выполняется он с задержкой. Поведение намеренное - экономия ресурсов на лишнем переключении. На прод-кластерах админы запускают kafka-leader-election.sh --election-type preferred руками или ждут авто-балансировки.
Когда ISR теряется полностью
Допустим, у Brew одновременно лягут kafka-2 и kafka-3. Останется один брокер. Сам по себе один узел держать durability с min.insync=2 не может - упёрлись.
ISR={1} min.insync.replicas=2 -> запись acks=all -> NotEnoughReplicasЧто произойдёт:
- Запись
acks=allупирается вNOT_ENOUGH_REPLICAS. В franz-go это retryable-ошибка, и по умолчаниюkgo.RecordRetriesиkgo.RecordDeliveryTimeoutравны бесконечности - продьюсер будет ретраить, пока ISR не вернётся в норму, и клиентorder-serviceзависнет на отправке. Чтобы получить «не удалось принять заказ, попробуйте ещё раз» с понятным таймаутом, нужно явно ограничить попытки или дедлайн (например,kgo.RecordDeliveryTimeout(5*time.Second)на проде).kgo.RequestRetriesтут не работает - его godoc прямо оговаривает, что он не применяется к produce-запросам. - Запись
acks=1всё ещё работает, но теряется durability на случай падения последнего leader'а. - Запись
acks=0улетает в одну сторону без подтверждения - продьюсер не узнает о потерянных сообщениях; для платёжного топика так писать нельзя. - Чтение работает, leader жив.
kitchen-serviceможет дочитать всё, что было раньше зафиксировано, и ничего не сломается.
Это и есть смысл min.insync.replicas. Kafka не делает вид, что всё хорошо, когда оно не хорошо. Ты явно говоришь: мне нужно минимум столько-то реплик. Меньше - стоп, не пишем. Лучше получить ошибку и алерт, чем потерять данные при следующем сбое.
В реальных инцидентах Brew логика такая. Если упала одна нода - алерт «under-replicated», on-call идёт чинить без спешки, клиенты не страдают. Если упали две - алерт «producers failing acks=all», on-call идёт чинить срочно, потому что order-service уже начал возвращать 503. Третий уровень («кластер вообще лёг») за пределами этой лекции - там уже план DR, переключение трафика на резервный регион и прочая инфраструктурная история.
Что показывает наш код
cmd/watch-isr/main.go делает три вещи. Создаёт топик идемпотентно через admin.CreateTopic (если он уже есть - использует его, не пересоздаёт; в этой лекции пересоздание мешает наблюдать состояние). Запускает таймер с заданным -interval. На каждом тике дёргает admin.ListTopics(ctx, topic) и печатает Partitions.Sorted().
Колонка UNDER-REPLICATED - это len(p.ISR) < len(p.Replicas). Когда yes - мы знаем, что какие-то реплики выпали; функция missing находит конкретные id'ы для подсказки.
Сам цикл наблюдения - обычный тикер с проверкой контекста:
t := time.NewTicker(interval)
defer t.Stop()
if err := tick(ctx, admin, topic); err != nil { ... }
for {
select {
case <-ctx.Done():
return nil
case <-t.C:
if err := tick(ctx, admin, topic); err != nil {
// Не выходим по разовой ошибке metadata: если упал брокер,
// клиент сам переключится на живого. Просто логируем и
// продолжаем - иначе watch-isr теряет смысл при failover'е.
fmt.Fprintf(os.Stderr, "tick failed: %v\n", err)
}
}
}Один тик - это ListTopics плюс печать. Логика under-replicated буквально сравнение длин:
for _, p := range td.Partitions.Sorted() {
under := "no"
if len(p.ISR) < len(p.Replicas) {
under = fmt.Sprintf("yes (missing %v)", missing(p.Replicas, p.ISR))
}
fmt.Fprintf(tw, "%d\t%d\t%v\t%v\t%s\n",
p.Partition, p.Leader, p.Replicas, p.ISR, under)
}Функция missing ищет реплики, которые есть в Replicas, но отсутствуют в ISR - это и есть отставшие узлы:
func missing(replicas, isr []int32) []int32 {
in := make(map[int32]struct{}, len(isr))
for _, id := range isr {
in[id] = struct{}{}
}
out := make([]int32, 0, len(replicas)-len(isr))
for _, id := range replicas {
if _, ok := in[id]; !ok {
out = append(out, id)
}
}
return out
}Одна важная деталь. Ошибка ListTopics на тике не валит цикл. Если упадёт брокер, к которому был подключён клиент, franz-go сам перевыберет seed broker - но один-два запроса между этим могут не пройти. Если бы мы выходили по первой же ошибке, watch-isr закрывался бы ровно в момент падения брокера, то есть в самый интересный момент. Поэтому ошибки логируются и цикл продолжается.
Запуск
Стенд должен быть поднят (docker compose up -d из корня репо).
make runВ отдельном терминале:
make kill-broker # положили kafka-2
make restore-broker # подняли обратноМожно положить любую другую ноду:
make kill-broker BROKER=kafka-3
make restore-broker BROKER=kafka-3Сравнить с CLI:
make topic-describeПолучится тот же ISR, что в watch-isr, только в нативном для shell-скриптов виде - Leader: 1 Replicas: 1,2,3 Isr: 1,3. Идея та же, что в Топики и партиции: admin.ListTopics отдаёт всё, что нужно, без вызовов shell.
Убрать топик:
make topic-deleteЧто узнал
- Репликация это копии партиции на нескольких брокерах. RF задаётся на топике. На стенде Brew по умолчанию RF=3.
- У каждой партиции один leader; продьюсеры пишут только в leader, followers тянут от него.
ISRэто followers, которые «в синхроне» (отстают не большеreplica.lag.time.max.ms). Только из ISR можно выбрать нового leader'а при failover'е (с выключеннымunclean.leader.election).min.insync.replicasзадаёт порог: сколько ISR-реплик должны подтвердить запись сacks=all. На стенде Brew это 2.RF=3 + min.insync.replicas=2 + acks=allстандартная durable-конфигурация. Терпит падение одной ноды. На двух ISR падает в 1, иacks=allначинает возвращатьNotEnoughReplicas.admin.ListTopicsпоказывает всё, что нужно для наблюдения за ISR. Никаких shell-скриптов для этого не нужно.
Дальше (Offsets и retention) посмотрим, как сообщения вообще живут во времени. Разберём offset, log end offset, HWM и retention. Заодно поймём, почему «у нас сообщения хранятся ровно N дней» - фраза с двойным дном.