Идемпотентный продьюсер
В Acks и durability мы зафиксировали acks=all на brew.payments.v1 — деньги клиентов Brew не должны теряться при падении брокера. Но durability решает только половину задачи. Вторая половина — дубли. Платёж, записанный в лог дважды, для payment-service ничем не лучше потерянного: клиента спишут на две чашки кофе вместо одной, и постмортем «двойное списание в промо-пятницу» будет читать вся компания. Там мы вскользь упомянули, что franz-go по дефолту идемпотентный продьюсер с acks=all. Тут разберёмся, что это за зверь, от чего он спасает и от чего — не спасает. И главное — посмотрим глазами на разницу: один и тот же код, дёрнули флажок — поведение поменялось.
Идемпотентный продьюсер появился в Kafka 0.11 ровно потому, что без него любой надёжный продьюсер в проде получал дубли. Дубли вылезали из-за самой природы сети — баги в приложении тут уже вторичны. Сценарий до боли простой, и для денег Brew — дорогой.
Откуда вообще берутся дубли
payment-service подтвердил списание и шлёт PaymentReceived в brew.payments.v1. Продьюсер отправляет батч лидеру партиции. Брокер записал батч в лог, докинул на followers, ответил «ок» — и где-то по пути этот ответ исчез. Источников много: ECONNRESET на TCP-уровне, GC-пауза в клиенте, переключение лидера в моменте, network partition между двумя сетевыми стойками. Клиент таймаутит RPC и ничего не знает про судьбу батча на стороне брокера. Лог уже пополнился — платёж записан. Клиент видит «не получил ответа» и ретраит. Брокер, ничего не подозревая, пишет тот же батч второй раз. С точки зрения order-service и analytics-service, читающих brew.payments.v1, — два одинаковых PaymentReceived с разными offset'ами. Один заказ, два списания.
Это не теоретическая проблема. На потоке промо-пятницы (8000 заказов/мин) дубли вылезают регулярно даже на здоровом кластере — ровно потому, что таймауты и ретраи нужны, чтобы переживать сетевые блипы. Без ретраев продьюсер начинает ронять платежи. С ретраями — дублить их. Замкнутый круг.
Идемпотентный продьюсер этот круг разрывает.
Как устроена идемпотентность на уровне протокола
Когда клиент с включённой идемпотентностью первый раз идёт в кластер, он делает запрос InitProducerId. Брокер выдаёт идентификаторы продьюсера и сам клиент начинает считать sequence на каждую партицию. Три величины, на которых всё держится:
- producer-id (PID) — целое 64-bit число, монотонно растущее на брокере. Уникальное на сессию.
- producer-epoch — короткое число, increments при перезапуске того же transactional.id (это уже из мира транзакций, для голой идемпотентности — обычно 0).
- sequence number — счётчик, начинающийся с 0 и инкрементящийся на каждое сообщение в эту партицию. Свой на каждую партицию.
Каждый record уезжает с тройкой (PID, epoch, sequence) в заголовке батча.
Брокер на стороне партиции ведёт последний принятый sequence для каждой пары (PID, epoch). Алгоритм проверки:
- Пришёл батч с
sequence == last + 1— норма. Пишем, обновляемlast. - Пришёл батч с
sequence == lastили меньше — дубль. Возвращаем «как будто записали», но в лог не пишем. - Пришёл батч с
sequence > last + 1— out-of-order (где-то в середине дыра). Брокер отклоняет с ошибкойOUT_OF_ORDER_SEQUENCE_NUMBER. Это сигнал клиенту, что что-то пошло сильно не так.
Вот и весь магический трюк. Дедупликация — на стороне партиции, по тройке (PID, epoch, sequence). Памяти на это уходит мало (per-PID последний sequence), и хранится в producer-snapshot файлах сегментов лога — переживает ротацию сегментов и рестарт брокера.
Какие условия нужны:
acks=all— обязательно. Без него протокол идемпотентности не запустится. Связь не очевидная, но прямая: дедупликация работает только если брокер уверен, что батч сохранён надёжно (репликация подтверждена), иначе можно было бы потерять «уже принятый sequence», и клиент при ретрае получил бы false-positive.max.in.flight.requests.per.connection ≤ 5— иначе на ретраях возможна перестановка батчей в логе. franz-go держит это ограничение сам.enable.idempotence=true— в franz-go дефолт. Чтобы выключить —kgo.DisableIdempotentWrite().
Что идемпотентность не покрывает
Граница ровно там, где заканчивается сессия продьюсера. PID живёт, пока живёт клиент. Перезапустил процесс — получил новый PID, sequence сбросился в ноль. Брокер видит «новый продьюсер» и принимает любые sequence как валидные.
Что это значит на практике. payment-service стартовал продьюсер, он успел отправить PaymentReceived (PID=42, seq=17), не получил ack, упал по OOM. Поднялся заново — InitProducerId, теперь PID=43, seq=0. Если приложение помнит, что тот платёж «не был подтверждён», и решит перепослать его с новой сессии — это будет дубль на стороне партиции. С точки зрения брокера — это две разные записи от двух разных продьюсеров.
Зомби-сценарий — отдельная история, ровно про это. Старый инстанс payment-service завис — длинная GC-пауза, потеря сети, выгрузка swap, что угодно. Сам процесс при этом не упал, ОС его не убила. Новый инстанс с тем же логическим назначением запустился — получил свой PID. Старый «оживает», у него батч с платежом в полёте — он его дошлёт. Брокер примет: PID разные, для него это два независимых продьюсера. Обе копии платежа лягут в лог.
С идемпотентностью самой по себе тут ничего не поделать. Защита от зомби — это transactional.id + producer-epoch fencing, тема следующего модуля (Транзакции и EOS). Идемпотентность спасает от дублей внутри одной сессии продьюсера, не между сессиями.
Запомни эту границу. Её часто путают.
Что показывает наш код
Бинарник один: cmd/forced-retry. Имитируем payment-service под потерей сети: каждая запись — это платёж по заказу (order-NNNN в ключе, payment-NNNN в значении), летящий в учебный топик. Принцип — взять обычный продьюсер, обернуть TCP-диалер штукой, которая с заданной вероятностью обрывает Read после того, как запрос уже улетел брокеру. Брокер платёж принимает и кладёт в лог; клиенту приходит EOF на чтении ответа; клиент думает «соединение умерло, надо ретраить». Получается ровно та ситуация, ради которой идемпотентность и нужна — потеря ответа при успешной записи.
Сама обёртка — два десятка строк:
type lossyConn struct {
net.Conn
parent *lossyDialer
reads atomic.Int64
}
func (c *lossyConn) Read(p []byte) (int, error) {
n := c.reads.Add(1)
if !c.parent.disabled.Load() && n > c.parent.warmupReads && rand.Float64() < c.parent.dropRate {
c.parent.dropped.Add(1)
_ = c.Conn.Close()
return 0, io.EOF
}
return c.Conn.Read(p)
}Warmup пропускает первые несколько Read'ов — чтобы хендшейк и InitProducerId успели пройти без помех. После — на каждый Read с вероятностью drop-rate мы закрываем соединение и отдаём EOF. Соединение к этому моменту уже передало запрос брокеру; ответ просто не доедет.
Сам клиент собирается в зависимости от флага:
opts := []kgo.Opt{
kgo.Dialer(dropper.DialContext),
kgo.RecordDeliveryTimeout(o.deliveryTimeout),
kgo.RecordRetries(o.retries),
}
if !o.idempotent {
opts = append(opts,
kgo.DisableIdempotentWrite(),
kgo.RequiredAcks(kgo.AllISRAcks()),
)
}В дефолтной ветке (-idempotent=true) дополнительно ничего не добавляется — franz-go сам включит идемпотентность с acks=all. В альтернативной ветке явно выключаем идемпотентность и оставляем acks=all (по умолчанию в franz-go это уже AllISRAcks, ставлю опцию явно — для симметрии и читаемости). acks=all держу одинаковым в обоих режимах — чтобы исключить, что разница в логе вылезла из-за разной durability, а не из-за идемпотентности.
Цикл записи — голый ProduceSync per-record:
for i := 0; i < o.messages; i++ {
rec := &kgo.Record{
Topic: o.topic,
Key: []byte(fmt.Sprintf("order-%04d", i)),
Value: []byte(fmt.Sprintf("payment-%04d", i)),
}
out := cl.ProduceSync(rpcCtx, rec)
if err := out.FirstErr(); err != nil {
res.failed++
res.errs[classifyErr(err)]++
continue
}
res.sent++
}Sequential, по одному платежу в полёте — нам важно, чтобы каждый record прошёл свою историю ретраев независимо. С батчингом в один запрос упадёт пачка record'ов сразу, и общая судьба смажется.
Метрика — дельта end-offsets до и после прогона:
ends, err := admin.ListEndOffsets(rpcCtx, topic)
if err != nil { return 0, err }
var total int64
ends.Each(func(o kadm.ListedOffset) {
if o.Err != nil { return }
total += o.Offset
})Сумма по партициям — это и есть «сколько физических записей лежит в логе». Дельта (after − before) сравнивается с числом успешных ProduceSync. Если идемпотентность работает, дельта совпадает. Если нет — дельта больше на количество дублей.
Что видно на прогоне
Здоровый кластер, 200 сообщений, drop-rate 0.35.
С идемпотентностью (дефолт):
режим: idempotent=true drop-rate=0.35 delivery-timeout=1m0s retries=30
topic=lecture-02-03-idempotent end offsets до прогона: 0
результаты:
METRIC VALUE
intended 200
client SENT (FirstErr==nil) 200
client FAILED 0
log delta (after-before) 200
duplicates (delta - SENT) 0
TCP reads dropped 51
elapsed 14.103sБез идемпотентности (тот же drop-rate, тот же топик после make topic-delete):
режим: idempotent=false drop-rate=0.35 delivery-timeout=1m0s retries=30
topic=lecture-02-03-idempotent end offsets до прогона: 0
результаты:
METRIC VALUE
intended 200
client SENT (FirstErr==nil) 200
client FAILED 0
log delta (after-before) 252
duplicates (delta - SENT) 52
TCP reads dropped 52
elapsed 14.887sСмотри на две вещи. Первое — client SENT в обоих случаях 200, клиент думает, что успешно отправил все платежи. Второе — log delta 200 и 252 соответственно. С идемпотентностью брокер дедуплицирует ровно столько раз, сколько мы оборвали ответ; в логе ничего лишнего. Без идемпотентности 52 оборванных ответа = 52 дубля в логе — это 52 платежа, которые order-service увидит дважды. Один-в-один.
TCP reads dropped совпадает с duplicates в non-idempotent режиме почти точно — это и есть прямая иллюстрация того, как «потерянный ответ → ретрай → ещё одно списание в логе». В idempotent режиме та же цифра в dropped, но дублей нет — брокер сделал свою работу, клиента списали один раз.
Если поднять drop-rate сильно (например, 0.6), часть record'ов начнёт упираться в RecordDeliveryTimeout и появится колонка ошибок. Это другой режим — деградация, не дубли. Для демонстрации дублей хватит 0.3–0.4.
Что взять с собой
- Идемпотентный продьюсер — это дедупликация на стороне партиции по тройке
(producer-id, producer-epoch, sequence). Чисто протокольная штука, без хранилища дубликатов на клиенте. - Включён по дефолту в franz-go при
acks=all. Чтобы выключить —kgo.DisableIdempotentWrite(). Имеет смысл выключать только если ты сознательно хочешьacks=0/1(и понимаешь, что теряешь). - Спасает от дублей при ретраях внутри одной сессии продьюсера. Для
brew.payments.v1это значит: один платёж — одна запись в логе, даже когда сеть глотает ответы в промо-пик. Самый частый источник дублей в проде и самая дешёвая защита от двойного списания. - Не спасает от дублей, которые приложение генерит само — повторные вызовы при перезапуске
payment-serviceс не-подтверждёнными платежами, дубли на стороне источника данных. И от зомби-продьюсеров между сессиями: для них —transactional.id+ epoch fencing (модуль 04). max.in.flight.requests.per.connectionидемпотентный режим держит ≤ 5 сам. Перестановки батчей в логе из-за асинхронного ретрая невозможны.- Цена включения близка к нулю на здоровом кластере. Чуть-чуть метаданных в батче, один лишний RPC на старте (InitProducerId). Дальше — тот же поток данных.
В Батчинг и пропускная способность разберёмся с батчингом и компрессией — оттуда же поймём, почему max.in.flight ≤ 5 практически не влияет на throughput.
Запуск
Стенд должен быть поднят (docker compose up -d из корня).
Прогон с идемпотентностью:
make run-with-idempotenceПрогон без идемпотентности:
make run-without-idempotenceС другим количеством сообщений и долей дропов:
make run-without-idempotence MESSAGES=500 DROP_RATE=0.4Между прогонами топик можно прибрать, чтобы дельта шла с нуля (иначе дельта всё равно считается корректно, но end-offsets копятся):
make topic-deleteСколько физических записей в логе — отдельно от Go-программы:
make topic-countОписать партиции:
make topic-describe