Acks и durability
В прошлой лекции (Ключи и партиционирование) ключ определял, куда ляжет запись. Сейчас разберём другую крошечную опцию — acks. Она определяет совсем другой вопрос: когда продьюсер считает, что запись «уже всё, записана». И от ответа на этот вопрос зависит, потеряешь ты платёж при падении брокера или нет.
В Brew это не абстракция. payment-service пишет PaymentReceived в brew.payments.v1 — каждое событие здесь это деньги, списанные с клиента. Потерять такое событие значит: банк списал, а в Brew нет следа платежа, заказ не оплачен, клиент платит дважды или жалуется в поддержку. Уже был постмортем acks=0 — кто-то поставил acks=0 на платежах, и за неделю «потерялось» 200 транзакций. С тех пор на brew.payments.v1 стоит acks=all без обсуждения, и эта лекция объясняет, почему именно так.
Опция простая на вид. Три значения, число. Но за этим числом стоит разная модель консистентности и разный потолок durability. Если перепутать — обнаружишь это либо на первом серьёзном инциденте с платежами, либо на ревью продакшен-инцидента у соседней команды.
Что вообще такое «запись принята»
Записать PaymentReceived в Kafka — это не один атомарный шаг. На уровне продьюсера это последовательность.
- Сериализовать payload, сложить в локальный буфер.
- Отправить батч на лидера партиции.
- Лидер пишет батч в свой лог (на диск или хотя бы в page cache).
- Followers тянут батч с лидера, тоже пишут себе.
- Лидер видит, что N реплик подтвердили запись, и отвечает продьюсеру.
acks отвечает на вопрос: на каком из этих шагов мы считаем запись успешной и возвращаем control приложению.
Уровней три, и каждый — компромисс между latency и durability:
acks=0— продьюсер пишет в сокет и сразу считает, что готово. Лидер вообще не отвечает на запрос. Самый быстрый режим. Самый ненадёжный: если лидер упал между приёмом и записью на диск, никто никогда не узнает, что платёж потерялся. Метрики будут показывать, чтоpayment-service«всё отправил».acks=1— лидер ответит, как только записал батч себе. Followers могут ещё не получить копию. Если лидер сразу после ack'а упал, и реплики отстали, новый лидер из ISR может не знать про эти записи — потеря тихая, но реальная. Дляbrew.payments.v1это и есть тот самый тихо пропавший платёж.acks=all(он жеacks=-1) — лидер ждёт, пока все ISR-реплики подтвердят запись, и только тогда отвечает. Самый дорогой режим по latency. Самый устойчивый — теряем только если падает кластер целиком, или если ISR схлопывается нижеmin.insync.replicasи продьюсер сам видит ошибку.
«Запись на диск» в шагах выше — обычно в page cache, не в синхронный fsync. Kafka по умолчанию полагается на репликацию, а не на fsync (см. flush.messages / flush.ms). Это другое измерение durability — про него отдельно в модуле 08.
Как сюда вписывается min.insync.replicas
Это парный параметр на стороне топика (или брокера), и он работает только при acks=all. При других уровнях он попросту игнорируется — продьюсер не запросил у лидера ждать подтверждения от ISR, и лидер не ждёт.
Логика такая. Лидер при acks=all смотрит на текущий ISR (синхронные реплики, не отстающие) и проверяет: их количество ≥ min.insync.replicas? Если да — пишет, ждёт подтверждений от ISR, отвечает. Если нет — отвечает ошибкой NOT_ENOUGH_REPLICAS (или NOT_ENOUGH_REPLICAS_AFTER_APPEND, если уже принял), не дожидаясь подтверждений.
Зачем нужен min.insync.replicas отдельно. Без него acks=all означает «жду все ISR» — а ISR может схлопнуться до 1 (только лидер). И тогда acks=all превращается в acks=1 фактически. min.insync.replicas ставит нижнюю границу: «либо у меня есть N синхронных реплик, либо я отказываюсь принимать запись». Это предохранитель от тихой деградации durability.
Стандартная формула в проде: RF=3, min.insync.replicas=2 — ровно так настроен brew.payments.v1. RF=3 даёт три копии. min.ISR=2 говорит: «я готов потерять один брокер из ISR — платежи продолжают идти. Если потеряю двух — лучше отвалюсь с ошибкой, чем тихо запишу PaymentReceived в одну реплику и потеряю при её падении». Для платежей «отвалиться с ошибкой» честнее, чем «тихо записать и потерять»: ошибку payment-service увидит и обработает, потерю — нет. На стенде мы для демонстрационной части ставим min.insync.replicas=3 на учебных топиках — этот паттерн нужен только в нашей лекции, чтобы поймать NOT_ENOUGH_REPLICAS при остановке одного брокера; в реальном проде на платежах ставь 2.
Что теряется в каждом случае
Сценарий — вылетает leader сразу после того, как продьюсер получил ack.
При acks=0 сценарий ещё проще: продьюсер уже считает платёж записанным. Лидер мог даже не получить пакет (ECONNRESET после write — это валидно: ядро приняло данные в буфер сокета, обещало отправить, но не успело). PaymentReceived потеряно, payment-service этого не знает. Ровно так в Brew пропали те 200 транзакций.
При acks=1 пакет точно дошёл и лидер записал себе. Followers могли ещё не успеть. Если лидер падает до того, как кто-то из ISR подтянул эту запись, новый лидер из ISR её не видит. Платёж потерян, payment-service думает, что всё ок.
При acks=all лидер ответил только после того, как все ISR подтвердили. Если в ISR было >=2 реплик, и лидер падает — другая реплика из ISR уже имеет это PaymentReceived, она становится новым лидером, ничего не теряется. Это и есть «выдержит падение брокера». Граница — min.insync.replicas: если ISR упадёт ниже, продьюсер увидит ошибку и не получит ack — payment-service сам решит, что делать (ретрай, DLQ, 5xx клиенту), но не зачтёт списание молча.
Короткое правило: acks=all плюс min.insync.replicas >= 2 плюс RF=3 — это точка, где Kafka про durability не врёт. Для денег Brew это нижняя планка. Всё остальное — компромиссы с осознанной потерей.
Идемпотентность — побочное правило
Тут проскакивает важная деталь franz-go (и Java-клиента вместе с ним). По дефолту продьюсер идемпотентный — он умеет дедуплицировать ретраи. И идемпотентность требует acks=all. Если ставишь acks=0 или acks=1 — нужно явно отключить идемпотентность опцией kgo.DisableIdempotentWrite(). Иначе клиент при инициализации ругнётся idempotency requires acks=all.
Подробнее про идемпотентный продьюсер — в следующей лекции (Идемпотентный продьюсер). Тут просто запомни: дефолт franz-go = идемпотентный продьюсер с acks=all. Чтобы ослабить acks, нужно дополнительно отключить идемпотентность.
Что делает наш код
Бинарник один: cmd/bench-acks. Он гоняет три продьюсера с тремя разными acks и сравнивает latency и throughput на одинаковом профиле нагрузки.
Это учебный бенч, не сам payment-service: payload здесь — синтетические байты фиксированного размера, чтобы мерить latency честно, без сериализации PaymentReceived. Цифры из него — это потолок durability для каждого режима; на платежах Brew выбран только один из трёх.
Топиков три, потому что мы хотим видеть изолированный effect каждого режима — один продьюсер не должен мешать другому. Каждый топик создаётся идемпотентно с partitions=3, replication.factor=3, min.insync.replicas=3.
Сама конфигурация режимов — три записи в массиве:
deliveryTimeout := kgo.RecordDeliveryTimeout(5 * time.Second)
modes := []ackMode{
{"acks=0", "0", []kgo.Opt{kgo.RequiredAcks(kgo.NoAck()), kgo.DisableIdempotentWrite(), deliveryTimeout}},
{"acks=1", "1", []kgo.Opt{kgo.RequiredAcks(kgo.LeaderAck()), kgo.DisableIdempotentWrite(), deliveryTimeout}},
{"acks=all", "all", []kgo.Opt{deliveryTimeout}}, // дефолт franz-go: idempotent + AllISRAcks
}DisableIdempotentWrite() обязателен для acks=0 и acks=1 — без этого клиент не запустится. Для acks=all ничего не нужно, дефолт уже правильный. RecordDeliveryTimeout(5s) ставит верхнюю границу на полную доставку одного record'а — без неё franz-go под degraded ISR будет ретраить NOT_ENOUGH_REPLICAS пока не упрётся в общий context, и это плохо видно в выводе.
Для измерения latency я взял синхронный ProduceSync per-record — у клиента всегда не больше одного сообщения в полёте. Это занижает throughput по сравнению с реальным async-pipeline'ом (где батчинг даёт x10–x50), зато даёт честную per-record latency. P50/P99 показывают, сколько занимает один полный round-trip с конкретными acks, а не «время отдачи целого батча, размазанное по всем сообщениям внутри».
Сам цикл записи — голый ProduceSync плюс таймстемп вокруг:
for i := 0; i < msgs; i++ {
if err := ctx.Err(); err != nil {
break
}
rec := &kgo.Record{Topic: topic, Value: payload}
rpcCtx, rpcCancel := context.WithTimeout(ctx, 15*time.Second)
sendAt := time.Now()
out := cl.ProduceSync(rpcCtx, rec)
took := time.Since(sendAt)
rpcCancel()
if err := out.FirstErr(); err != nil {
res.failed++
res.errs[classifyErr(err)]++
continue
}
res.sent++
res.latencies = append(res.latencies, took)
}После прогона все три goroutine ждут друг друга, потом печатается общая таблица — отсортированные latency сводятся в percentile через простое:
func percentile(sorted []time.Duration, p float64) time.Duration {
if len(sorted) == 0 {
return 0
}
idx := int(float64(len(sorted)-1) * p)
return sorted[idx]
}В конце — отдельным шагом — kadm.ListEndOffsets по каждому топику, чтобы видеть, что в логе действительно осело столько же, сколько мы насчитали как SENT.
Как читать вывод
Под здоровым стендом (make run без kill-broker) типичный результат:
параллельно пишем 1000 сообщений по 1024 B на каждый режим acks (partitions=3, rf=3, min.insync.replicas=3)
результаты:
MODE SENT FAILED ELAPSED THROUGHPUT P50 P99 P99.9 MAX
acks=0 1000 0 42.37ms 23603 msg/s 16.0µs 78.0µs 1.91ms 18.32ms
acks=1 1000 0 680.24ms 1470 msg/s 582.0µs 1.80ms 14.47ms 19.78ms
acks=all 1000 0 1.22s 819 msg/s 753.0µs 2.70ms 14.36ms 268.45msЧто важно. P50 у acks=0 — десятки микросекунд. Это просто время отдать пакет в сокет ядра, никакого broker round-trip нет. У acks=1 уже миллисекунды — лидер записал и ответил. У acks=all чуть больше — лидер дополнительно ждал ISR-followers. Разница acks=1 vs acks=all обычно небольшая на здоровом кластере с быстрой сетью — 30–50% overhead. На медленной сети или нагруженных follower'ах разрыв резко растёт.
Throughput идёт обратно: 23k/1.4k/0.8k msg/s. Цифры выглядят пугающе низкими, но мы синхронные — без батчинга и параллелизма. С нормальным async-продьюсером и линджером — другая история, в Батчинг и пропускная способность будем мерить именно throughput.
MAX колонка — отдельная история. Иногда там вылетают сотни миллисекунд (в моём прогоне 268ms на acks=all). Это типичное поведение: первая запись в топик после старта клиента триггерит metadata refresh, lookup лидеров, открытие соединений к нужным брокерам. Поэтому MAX — это, скорее всего, первая запись, а не репрезентативная latency. P99/P99.9 показывают реальный хвост.
Что происходит при make kill-broker
Останавливаем kafka-2 (docker stop kafka-2), ждём пару секунд, пока контроллер заметит, и запускаем make run снова. ISR теперь 2 на каждой партиции (Isr: 1,3). У нас min.insync.replicas=3 — то есть для acks=all лидер не может удовлетворить условие.
Что увидим:
результаты:
MODE SENT FAILED ELAPSED THROUGHPUT P50 P99 P99.9 MAX
acks=0 300 0 55.21ms 5434 msg/s 54.0µs 495.0µs 4.62ms 24.31ms
acks=1 300 0 165.93ms 1808 msg/s 307.0µs 2.67ms 5.28ms 30.98ms
acks=all 0 12 60.08s 0 msg/s 0 0 0 0
[acks=all] классы ошибок:
ERROR COUNT
DEADLINE_EXCEEDED 1
records have timed out before they were able to be produced 11acks=0 и acks=1 записали все 300 как ни в чём не бывало — лидеры всех партиций живы (kafka-1 и kafka-3 остались), запросу не нужны ISR. acks=all отдал 0 записей и накопил 12 ошибок таймаута за 60 секунд (после чего runtime cancel'ит goroutine'у по общему таймауту). За окно 60 секунд при RecordDeliveryTimeout(5s) физически успевает попробовать ровно ~12 записей — каждая ждёт 5 секунд внутренних retry'ев и сдаётся.
Сама ошибка от franz-go выглядит как records have timed out before they were able to be produced. Внутри клиент крутил retry на NOT_ENOUGH_REPLICAS (это retriable error — кластер может починиться), не дождался — отдал record-level timeout. Чтобы увидеть NOT_ENOUGH_REPLICAS явно, можно поставить kgo.RecordRetries(0) — тогда первая ошибка от лидера сразу пробьётся наружу. В проде такое не делают (любой transient гик завалит produce), но для диагностики — нормальный приём.
После эксперимента — make restore-broker. Команда поднимает kafka-2 и ждёт пару секунд, чтобы followers догнали лог. ISR на топиках возвращается к 1,2,3, и acks=all снова работает.
Что взять с собой
В голове после лекции должно сложиться примерно такое:
acks— это выбор durability vs latency. По умолчанию в franz-go (и в большинстве production-настроек) —acks=all. Это правильный дефолт, и ровно он стоит наbrew.payments.v1.acks=0— для метрик, телеметрии, кликстримаbrew.clickstream.v1«чем больше, тем лучше, но потерять не страшно». Платежи и любой serious payload — не сюда; в Brew это уже один раз стоило 200 потерянных транзакций.acks=1— компромисс, который соблазнителен по latency, но даёт тихую потерю при падении лидера. На моей практике — почти всегда плохой выбор. Если экономия latency реально нужна, обычно правильнее тюнить батчинг и compression приacks=all, а не понижать durability платежей.acks=allничего не даёт сам по себе. Работает в связке с RF≥2 иmin.insync.replicas≥2(как наbrew.payments.v1). Без этогоacks=allможет тихо превратиться вacks=1при схлопывании ISR.- При
min.insync.replicasстроже, чем текущий ISR, продьюсер увидит ошибку. Это фича, а не баг — лучше не записать, чем записать в одну реплику и потерять. - Идемпотентность в franz-go по дефолту включена и требует
acks=all. Чтобы понизить —kgo.DisableIdempotentWrite(). Сама идемпотентность — тема следующей лекции (Идемпотентный продьюсер).
В Идемпотентный продьюсер разберём, что делает идемпотентный продьюсер на самом деле — почему он спасает от дублей при ретраях и почему не спасает от zombie-сценария между сессиями.
Запуск
Стенд должен быть поднят (docker compose up -d из корня).
Базовый прогон под здоровый кластер:
make runС другим количеством сообщений и payload-ом:
make run MESSAGES=2000 PAYLOAD=2048Демонстрация падения брокера. Запускается двумя терминалами или последовательно:
make kill-broker # остановили kafka-2, ISR падает до 2
make run # acks=all валится с timeout, acks=0/1 работают
make restore-broker # kafka-2 обратно, ISR восстанавливаетсяОписать топики через kafka-topics.sh --describe:
make topic-describeПрибрать после лекции:
make topic-delete