0 / 42 (0%)

Acks и durability

В прошлой лекции (Ключи и партиционирование) ключ определял, куда ляжет запись. Сейчас разберём другую крошечную опцию — acks. Она определяет совсем другой вопрос: когда продьюсер считает, что запись «уже всё, записана». И от ответа на этот вопрос зависит, потеряешь ты платёж при падении брокера или нет.

В Brew это не абстракция. payment-service пишет PaymentReceived в brew.payments.v1 — каждое событие здесь это деньги, списанные с клиента. Потерять такое событие значит: банк списал, а в Brew нет следа платежа, заказ не оплачен, клиент платит дважды или жалуется в поддержку. Уже был постмортем acks=0 — кто-то поставил acks=0 на платежах, и за неделю «потерялось» 200 транзакций. С тех пор на brew.payments.v1 стоит acks=all без обсуждения, и эта лекция объясняет, почему именно так.

Опция простая на вид. Три значения, число. Но за этим числом стоит разная модель консистентности и разный потолок durability. Если перепутать — обнаружишь это либо на первом серьёзном инциденте с платежами, либо на ревью продакшен-инцидента у соседней команды.

Что вообще такое «запись принята»

Записать PaymentReceived в Kafka — это не один атомарный шаг. На уровне продьюсера это последовательность.

  1. Сериализовать payload, сложить в локальный буфер.
  2. Отправить батч на лидера партиции.
  3. Лидер пишет батч в свой лог (на диск или хотя бы в page cache).
  4. Followers тянут батч с лидера, тоже пишут себе.
  5. Лидер видит, что N реплик подтвердили запись, и отвечает продьюсеру.

acks отвечает на вопрос: на каком из этих шагов мы считаем запись успешной и возвращаем control приложению.

Уровней три, и каждый — компромисс между latency и durability:

  1. acks=0 — продьюсер пишет в сокет и сразу считает, что готово. Лидер вообще не отвечает на запрос. Самый быстрый режим. Самый ненадёжный: если лидер упал между приёмом и записью на диск, никто никогда не узнает, что платёж потерялся. Метрики будут показывать, что payment-service «всё отправил».
  2. acks=1 — лидер ответит, как только записал батч себе. Followers могут ещё не получить копию. Если лидер сразу после ack'а упал, и реплики отстали, новый лидер из ISR может не знать про эти записи — потеря тихая, но реальная. Для brew.payments.v1 это и есть тот самый тихо пропавший платёж.
  3. acks=all (он же acks=-1) — лидер ждёт, пока все ISR-реплики подтвердят запись, и только тогда отвечает. Самый дорогой режим по latency. Самый устойчивый — теряем только если падает кластер целиком, или если ISR схлопывается ниже min.insync.replicas и продьюсер сам видит ошибку.

«Запись на диск» в шагах выше — обычно в page cache, не в синхронный fsync. Kafka по умолчанию полагается на репликацию, а не на fsync (см. flush.messages / flush.ms). Это другое измерение durability — про него отдельно в модуле 08.

Как сюда вписывается min.insync.replicas

Это парный параметр на стороне топика (или брокера), и он работает только при acks=all. При других уровнях он попросту игнорируется — продьюсер не запросил у лидера ждать подтверждения от ISR, и лидер не ждёт.

Логика такая. Лидер при acks=all смотрит на текущий ISR (синхронные реплики, не отстающие) и проверяет: их количество ≥ min.insync.replicas? Если да — пишет, ждёт подтверждений от ISR, отвечает. Если нет — отвечает ошибкой NOT_ENOUGH_REPLICAS (или NOT_ENOUGH_REPLICAS_AFTER_APPEND, если уже принял), не дожидаясь подтверждений.

Зачем нужен min.insync.replicas отдельно. Без него acks=all означает «жду все ISR» — а ISR может схлопнуться до 1 (только лидер). И тогда acks=all превращается в acks=1 фактически. min.insync.replicas ставит нижнюю границу: «либо у меня есть N синхронных реплик, либо я отказываюсь принимать запись». Это предохранитель от тихой деградации durability.

Стандартная формула в проде: RF=3, min.insync.replicas=2 — ровно так настроен brew.payments.v1. RF=3 даёт три копии. min.ISR=2 говорит: «я готов потерять один брокер из ISR — платежи продолжают идти. Если потеряю двух — лучше отвалюсь с ошибкой, чем тихо запишу PaymentReceived в одну реплику и потеряю при её падении». Для платежей «отвалиться с ошибкой» честнее, чем «тихо записать и потерять»: ошибку payment-service увидит и обработает, потерю — нет. На стенде мы для демонстрационной части ставим min.insync.replicas=3 на учебных топиках — этот паттерн нужен только в нашей лекции, чтобы поймать NOT_ENOUGH_REPLICAS при остановке одного брокера; в реальном проде на платежах ставь 2.

Что теряется в каждом случае

Сценарий — вылетает leader сразу после того, как продьюсер получил ack.

При acks=0 сценарий ещё проще: продьюсер уже считает платёж записанным. Лидер мог даже не получить пакет (ECONNRESET после write — это валидно: ядро приняло данные в буфер сокета, обещало отправить, но не успело). PaymentReceived потеряно, payment-service этого не знает. Ровно так в Brew пропали те 200 транзакций.

При acks=1 пакет точно дошёл и лидер записал себе. Followers могли ещё не успеть. Если лидер падает до того, как кто-то из ISR подтянул эту запись, новый лидер из ISR её не видит. Платёж потерян, payment-service думает, что всё ок.

При acks=all лидер ответил только после того, как все ISR подтвердили. Если в ISR было >=2 реплик, и лидер падает — другая реплика из ISR уже имеет это PaymentReceived, она становится новым лидером, ничего не теряется. Это и есть «выдержит падение брокера». Граница — min.insync.replicas: если ISR упадёт ниже, продьюсер увидит ошибку и не получит ack — payment-service сам решит, что делать (ретрай, DLQ, 5xx клиенту), но не зачтёт списание молча.

Короткое правило: acks=all плюс min.insync.replicas >= 2 плюс RF=3 — это точка, где Kafka про durability не врёт. Для денег Brew это нижняя планка. Всё остальное — компромиссы с осознанной потерей.

Идемпотентность — побочное правило

Тут проскакивает важная деталь franz-go (и Java-клиента вместе с ним). По дефолту продьюсер идемпотентный — он умеет дедуплицировать ретраи. И идемпотентность требует acks=all. Если ставишь acks=0 или acks=1 — нужно явно отключить идемпотентность опцией kgo.DisableIdempotentWrite(). Иначе клиент при инициализации ругнётся idempotency requires acks=all.

Подробнее про идемпотентный продьюсер — в следующей лекции (Идемпотентный продьюсер). Тут просто запомни: дефолт franz-go = идемпотентный продьюсер с acks=all. Чтобы ослабить acks, нужно дополнительно отключить идемпотентность.

Что делает наш код

Бинарник один: cmd/bench-acks. Он гоняет три продьюсера с тремя разными acks и сравнивает latency и throughput на одинаковом профиле нагрузки.

Это учебный бенч, не сам payment-service: payload здесь — синтетические байты фиксированного размера, чтобы мерить latency честно, без сериализации PaymentReceived. Цифры из него — это потолок durability для каждого режима; на платежах Brew выбран только один из трёх.

Топиков три, потому что мы хотим видеть изолированный effect каждого режима — один продьюсер не должен мешать другому. Каждый топик создаётся идемпотентно с partitions=3, replication.factor=3, min.insync.replicas=3.

Сама конфигурация режимов — три записи в массиве:

go
deliveryTimeout := kgo.RecordDeliveryTimeout(5 * time.Second)
modes := []ackMode{
    {"acks=0", "0", []kgo.Opt{kgo.RequiredAcks(kgo.NoAck()), kgo.DisableIdempotentWrite(), deliveryTimeout}},
    {"acks=1", "1", []kgo.Opt{kgo.RequiredAcks(kgo.LeaderAck()), kgo.DisableIdempotentWrite(), deliveryTimeout}},
    {"acks=all", "all", []kgo.Opt{deliveryTimeout}}, // дефолт franz-go: idempotent + AllISRAcks
}

DisableIdempotentWrite() обязателен для acks=0 и acks=1 — без этого клиент не запустится. Для acks=all ничего не нужно, дефолт уже правильный. RecordDeliveryTimeout(5s) ставит верхнюю границу на полную доставку одного record'а — без неё franz-go под degraded ISR будет ретраить NOT_ENOUGH_REPLICAS пока не упрётся в общий context, и это плохо видно в выводе.

Для измерения latency я взял синхронный ProduceSync per-record — у клиента всегда не больше одного сообщения в полёте. Это занижает throughput по сравнению с реальным async-pipeline'ом (где батчинг даёт x10–x50), зато даёт честную per-record latency. P50/P99 показывают, сколько занимает один полный round-trip с конкретными acks, а не «время отдачи целого батча, размазанное по всем сообщениям внутри».

Сам цикл записи — голый ProduceSync плюс таймстемп вокруг:

go
for i := 0; i < msgs; i++ {
    if err := ctx.Err(); err != nil {
        break
    }
    rec := &kgo.Record{Topic: topic, Value: payload}
    rpcCtx, rpcCancel := context.WithTimeout(ctx, 15*time.Second)
    sendAt := time.Now()
    out := cl.ProduceSync(rpcCtx, rec)
    took := time.Since(sendAt)
    rpcCancel()
 
    if err := out.FirstErr(); err != nil {
        res.failed++
        res.errs[classifyErr(err)]++
        continue
    }
    res.sent++
    res.latencies = append(res.latencies, took)
}

После прогона все три goroutine ждут друг друга, потом печатается общая таблица — отсортированные latency сводятся в percentile через простое:

go
func percentile(sorted []time.Duration, p float64) time.Duration {
    if len(sorted) == 0 {
        return 0
    }
    idx := int(float64(len(sorted)-1) * p)
    return sorted[idx]
}

В конце — отдельным шагом — kadm.ListEndOffsets по каждому топику, чтобы видеть, что в логе действительно осело столько же, сколько мы насчитали как SENT.

Как читать вывод

Под здоровым стендом (make run без kill-broker) типичный результат:

plaintext
параллельно пишем 1000 сообщений по 1024 B на каждый режим acks (partitions=3, rf=3, min.insync.replicas=3)
 
результаты:
MODE      SENT  FAILED  ELAPSED   THROUGHPUT   P50      P99     P99.9    MAX
acks=0    1000  0       42.37ms   23603 msg/s  16.0µs   78.0µs  1.91ms   18.32ms
acks=1    1000  0       680.24ms  1470 msg/s   582.0µs  1.80ms  14.47ms  19.78ms
acks=all  1000  0       1.22s     819 msg/s    753.0µs  2.70ms  14.36ms  268.45ms

Что важно. P50 у acks=0 — десятки микросекунд. Это просто время отдать пакет в сокет ядра, никакого broker round-trip нет. У acks=1 уже миллисекунды — лидер записал и ответил. У acks=all чуть больше — лидер дополнительно ждал ISR-followers. Разница acks=1 vs acks=all обычно небольшая на здоровом кластере с быстрой сетью — 30–50% overhead. На медленной сети или нагруженных follower'ах разрыв резко растёт.

Throughput идёт обратно: 23k/1.4k/0.8k msg/s. Цифры выглядят пугающе низкими, но мы синхронные — без батчинга и параллелизма. С нормальным async-продьюсером и линджером — другая история, в Батчинг и пропускная способность будем мерить именно throughput.

MAX колонка — отдельная история. Иногда там вылетают сотни миллисекунд (в моём прогоне 268ms на acks=all). Это типичное поведение: первая запись в топик после старта клиента триггерит metadata refresh, lookup лидеров, открытие соединений к нужным брокерам. Поэтому MAX — это, скорее всего, первая запись, а не репрезентативная latency. P99/P99.9 показывают реальный хвост.

Что происходит при make kill-broker

Останавливаем kafka-2 (docker stop kafka-2), ждём пару секунд, пока контроллер заметит, и запускаем make run снова. ISR теперь 2 на каждой партиции (Isr: 1,3). У нас min.insync.replicas=3 — то есть для acks=all лидер не может удовлетворить условие.

Что увидим:

plaintext
результаты:
MODE      SENT  FAILED  ELAPSED   THROUGHPUT  P50      P99      P99.9   MAX
acks=0    300   0       55.21ms   5434 msg/s  54.0µs   495.0µs  4.62ms  24.31ms
acks=1    300   0       165.93ms  1808 msg/s  307.0µs  2.67ms   5.28ms  30.98ms
acks=all  0     12      60.08s    0 msg/s     0        0        0       0
 
[acks=all] классы ошибок:
ERROR                                                        COUNT
DEADLINE_EXCEEDED                                            1
records have timed out before they were able to be produced  11

acks=0 и acks=1 записали все 300 как ни в чём не бывало — лидеры всех партиций живы (kafka-1 и kafka-3 остались), запросу не нужны ISR. acks=all отдал 0 записей и накопил 12 ошибок таймаута за 60 секунд (после чего runtime cancel'ит goroutine'у по общему таймауту). За окно 60 секунд при RecordDeliveryTimeout(5s) физически успевает попробовать ровно ~12 записей — каждая ждёт 5 секунд внутренних retry'ев и сдаётся.

Сама ошибка от franz-go выглядит как records have timed out before they were able to be produced. Внутри клиент крутил retry на NOT_ENOUGH_REPLICAS (это retriable error — кластер может починиться), не дождался — отдал record-level timeout. Чтобы увидеть NOT_ENOUGH_REPLICAS явно, можно поставить kgo.RecordRetries(0) — тогда первая ошибка от лидера сразу пробьётся наружу. В проде такое не делают (любой transient гик завалит produce), но для диагностики — нормальный приём.

После эксперимента — make restore-broker. Команда поднимает kafka-2 и ждёт пару секунд, чтобы followers догнали лог. ISR на топиках возвращается к 1,2,3, и acks=all снова работает.

Что взять с собой

В голове после лекции должно сложиться примерно такое:

  • acks — это выбор durability vs latency. По умолчанию в franz-go (и в большинстве production-настроек) — acks=all. Это правильный дефолт, и ровно он стоит на brew.payments.v1.
  • acks=0 — для метрик, телеметрии, кликстрима brew.clickstream.v1 «чем больше, тем лучше, но потерять не страшно». Платежи и любой serious payload — не сюда; в Brew это уже один раз стоило 200 потерянных транзакций.
  • acks=1 — компромисс, который соблазнителен по latency, но даёт тихую потерю при падении лидера. На моей практике — почти всегда плохой выбор. Если экономия latency реально нужна, обычно правильнее тюнить батчинг и compression при acks=all, а не понижать durability платежей.
  • acks=all ничего не даёт сам по себе. Работает в связке с RF≥2 и min.insync.replicas≥2 (как на brew.payments.v1). Без этого acks=all может тихо превратиться в acks=1 при схлопывании ISR.
  • При min.insync.replicas строже, чем текущий ISR, продьюсер увидит ошибку. Это фича, а не баг — лучше не записать, чем записать в одну реплику и потерять.
  • Идемпотентность в franz-go по дефолту включена и требует acks=all. Чтобы понизить — kgo.DisableIdempotentWrite(). Сама идемпотентность — тема следующей лекции (Идемпотентный продьюсер).

В Идемпотентный продьюсер разберём, что делает идемпотентный продьюсер на самом деле — почему он спасает от дублей при ретраях и почему не спасает от zombie-сценария между сессиями.

Запуск

Стенд должен быть поднят (docker compose up -d из корня).

Базовый прогон под здоровый кластер:

sh
make run

С другим количеством сообщений и payload-ом:

sh
make run MESSAGES=2000 PAYLOAD=2048

Демонстрация падения брокера. Запускается двумя терминалами или последовательно:

sh
make kill-broker      # остановили kafka-2, ISR падает до 2
make run              # acks=all валится с timeout, acks=0/1 работают
make restore-broker   # kafka-2 обратно, ISR восстанавливается

Описать топики через kafka-topics.sh --describe:

sh
make topic-describe

Прибрать после лекции:

sh
make topic-delete
·Модуль 02

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Продьюсер / Acks и durability