Kafka CookbookПродьюсерИдемпотентный продьюсер
0 / 42 (0%)

Идемпотентный продьюсер

В Acks и durability мы зафиксировали acks=all на brew.payments.v1 — деньги клиентов Brew не должны теряться при падении брокера. Но durability решает только половину задачи. Вторая половина — дубли. Платёж, записанный в лог дважды, для payment-service ничем не лучше потерянного: клиента спишут на две чашки кофе вместо одной, и постмортем «двойное списание в промо-пятницу» будет читать вся компания. Там мы вскользь упомянули, что franz-go по дефолту идемпотентный продьюсер с acks=all. Тут разберёмся, что это за зверь, от чего он спасает и от чего — не спасает. И главное — посмотрим глазами на разницу: один и тот же код, дёрнули флажок — поведение поменялось.

Идемпотентный продьюсер появился в Kafka 0.11 ровно потому, что без него любой надёжный продьюсер в проде получал дубли. Дубли вылезали из-за самой природы сети — баги в приложении тут уже вторичны. Сценарий до боли простой, и для денег Brew — дорогой.

Откуда вообще берутся дубли

payment-service подтвердил списание и шлёт PaymentReceived в brew.payments.v1. Продьюсер отправляет батч лидеру партиции. Брокер записал батч в лог, докинул на followers, ответил «ок» — и где-то по пути этот ответ исчез. Источников много: ECONNRESET на TCP-уровне, GC-пауза в клиенте, переключение лидера в моменте, network partition между двумя сетевыми стойками. Клиент таймаутит RPC и ничего не знает про судьбу батча на стороне брокера. Лог уже пополнился — платёж записан. Клиент видит «не получил ответа» и ретраит. Брокер, ничего не подозревая, пишет тот же батч второй раз. С точки зрения order-service и analytics-service, читающих brew.payments.v1, — два одинаковых PaymentReceived с разными offset'ами. Один заказ, два списания.

Это не теоретическая проблема. На потоке промо-пятницы (8000 заказов/мин) дубли вылезают регулярно даже на здоровом кластере — ровно потому, что таймауты и ретраи нужны, чтобы переживать сетевые блипы. Без ретраев продьюсер начинает ронять платежи. С ретраями — дублить их. Замкнутый круг.

Идемпотентный продьюсер этот круг разрывает.

Как устроена идемпотентность на уровне протокола

Когда клиент с включённой идемпотентностью первый раз идёт в кластер, он делает запрос InitProducerId. Брокер выдаёт идентификаторы продьюсера и сам клиент начинает считать sequence на каждую партицию. Три величины, на которых всё держится:

  1. producer-id (PID) — целое 64-bit число, монотонно растущее на брокере. Уникальное на сессию.
  2. producer-epoch — короткое число, increments при перезапуске того же transactional.id (это уже из мира транзакций, для голой идемпотентности — обычно 0).
  3. sequence number — счётчик, начинающийся с 0 и инкрементящийся на каждое сообщение в эту партицию. Свой на каждую партицию.

Каждый record уезжает с тройкой (PID, epoch, sequence) в заголовке батча.

Брокер на стороне партиции ведёт последний принятый sequence для каждой пары (PID, epoch). Алгоритм проверки:

  • Пришёл батч с sequence == last + 1 — норма. Пишем, обновляем last.
  • Пришёл батч с sequence == last или меньше — дубль. Возвращаем «как будто записали», но в лог не пишем.
  • Пришёл батч с sequence > last + 1out-of-order (где-то в середине дыра). Брокер отклоняет с ошибкой OUT_OF_ORDER_SEQUENCE_NUMBER. Это сигнал клиенту, что что-то пошло сильно не так.

Вот и весь магический трюк. Дедупликация — на стороне партиции, по тройке (PID, epoch, sequence). Памяти на это уходит мало (per-PID последний sequence), и хранится в producer-snapshot файлах сегментов лога — переживает ротацию сегментов и рестарт брокера.

Какие условия нужны:

  • acks=all — обязательно. Без него протокол идемпотентности не запустится. Связь не очевидная, но прямая: дедупликация работает только если брокер уверен, что батч сохранён надёжно (репликация подтверждена), иначе можно было бы потерять «уже принятый sequence», и клиент при ретрае получил бы false-positive.
  • max.in.flight.requests.per.connection ≤ 5 — иначе на ретраях возможна перестановка батчей в логе. franz-go держит это ограничение сам.
  • enable.idempotence=true — в franz-go дефолт. Чтобы выключить — kgo.DisableIdempotentWrite().

Что идемпотентность не покрывает

Граница ровно там, где заканчивается сессия продьюсера. PID живёт, пока живёт клиент. Перезапустил процесс — получил новый PID, sequence сбросился в ноль. Брокер видит «новый продьюсер» и принимает любые sequence как валидные.

Что это значит на практике. payment-service стартовал продьюсер, он успел отправить PaymentReceived (PID=42, seq=17), не получил ack, упал по OOM. Поднялся заново — InitProducerId, теперь PID=43, seq=0. Если приложение помнит, что тот платёж «не был подтверждён», и решит перепослать его с новой сессии — это будет дубль на стороне партиции. С точки зрения брокера — это две разные записи от двух разных продьюсеров.

Зомби-сценарий — отдельная история, ровно про это. Старый инстанс payment-service завис — длинная GC-пауза, потеря сети, выгрузка swap, что угодно. Сам процесс при этом не упал, ОС его не убила. Новый инстанс с тем же логическим назначением запустился — получил свой PID. Старый «оживает», у него батч с платежом в полёте — он его дошлёт. Брокер примет: PID разные, для него это два независимых продьюсера. Обе копии платежа лягут в лог.

С идемпотентностью самой по себе тут ничего не поделать. Защита от зомби — это transactional.id + producer-epoch fencing, тема следующего модуля (Транзакции и EOS). Идемпотентность спасает от дублей внутри одной сессии продьюсера, не между сессиями.

Запомни эту границу. Её часто путают.

Что показывает наш код

Бинарник один: cmd/forced-retry. Имитируем payment-service под потерей сети: каждая запись — это платёж по заказу (order-NNNN в ключе, payment-NNNN в значении), летящий в учебный топик. Принцип — взять обычный продьюсер, обернуть TCP-диалер штукой, которая с заданной вероятностью обрывает Read после того, как запрос уже улетел брокеру. Брокер платёж принимает и кладёт в лог; клиенту приходит EOF на чтении ответа; клиент думает «соединение умерло, надо ретраить». Получается ровно та ситуация, ради которой идемпотентность и нужна — потеря ответа при успешной записи.

Сама обёртка — два десятка строк:

go
type lossyConn struct {
    net.Conn
    parent *lossyDialer
    reads  atomic.Int64
}
 
func (c *lossyConn) Read(p []byte) (int, error) {
    n := c.reads.Add(1)
    if !c.parent.disabled.Load() && n > c.parent.warmupReads && rand.Float64() < c.parent.dropRate {
        c.parent.dropped.Add(1)
        _ = c.Conn.Close()
        return 0, io.EOF
    }
    return c.Conn.Read(p)
}

Warmup пропускает первые несколько Read'ов — чтобы хендшейк и InitProducerId успели пройти без помех. После — на каждый Read с вероятностью drop-rate мы закрываем соединение и отдаём EOF. Соединение к этому моменту уже передало запрос брокеру; ответ просто не доедет.

Сам клиент собирается в зависимости от флага:

go
opts := []kgo.Opt{
    kgo.Dialer(dropper.DialContext),
    kgo.RecordDeliveryTimeout(o.deliveryTimeout),
    kgo.RecordRetries(o.retries),
}
if !o.idempotent {
    opts = append(opts,
        kgo.DisableIdempotentWrite(),
        kgo.RequiredAcks(kgo.AllISRAcks()),
    )
}

В дефолтной ветке (-idempotent=true) дополнительно ничего не добавляется — franz-go сам включит идемпотентность с acks=all. В альтернативной ветке явно выключаем идемпотентность и оставляем acks=all (по умолчанию в franz-go это уже AllISRAcks, ставлю опцию явно — для симметрии и читаемости). acks=all держу одинаковым в обоих режимах — чтобы исключить, что разница в логе вылезла из-за разной durability, а не из-за идемпотентности.

Цикл записи — голый ProduceSync per-record:

go
for i := 0; i < o.messages; i++ {
    rec := &kgo.Record{
        Topic: o.topic,
        Key:   []byte(fmt.Sprintf("order-%04d", i)),
        Value: []byte(fmt.Sprintf("payment-%04d", i)),
    }
    out := cl.ProduceSync(rpcCtx, rec)
    if err := out.FirstErr(); err != nil {
        res.failed++
        res.errs[classifyErr(err)]++
        continue
    }
    res.sent++
}

Sequential, по одному платежу в полёте — нам важно, чтобы каждый record прошёл свою историю ретраев независимо. С батчингом в один запрос упадёт пачка record'ов сразу, и общая судьба смажется.

Метрика — дельта end-offsets до и после прогона:

go
ends, err := admin.ListEndOffsets(rpcCtx, topic)
if err != nil { return 0, err }
var total int64
ends.Each(func(o kadm.ListedOffset) {
    if o.Err != nil { return }
    total += o.Offset
})

Сумма по партициям — это и есть «сколько физических записей лежит в логе». Дельта (after − before) сравнивается с числом успешных ProduceSync. Если идемпотентность работает, дельта совпадает. Если нет — дельта больше на количество дублей.

Что видно на прогоне

Здоровый кластер, 200 сообщений, drop-rate 0.35.

С идемпотентностью (дефолт):

plaintext
режим: idempotent=true drop-rate=0.35 delivery-timeout=1m0s retries=30
topic=lecture-02-03-idempotent end offsets до прогона: 0
 
результаты:
METRIC                       VALUE
intended                     200
client SENT (FirstErr==nil)  200
client FAILED                0
log delta (after-before)     200
duplicates (delta - SENT)    0
TCP reads dropped            51
elapsed                      14.103s

Без идемпотентности (тот же drop-rate, тот же топик после make topic-delete):

plaintext
режим: idempotent=false drop-rate=0.35 delivery-timeout=1m0s retries=30
topic=lecture-02-03-idempotent end offsets до прогона: 0
 
результаты:
METRIC                       VALUE
intended                     200
client SENT (FirstErr==nil)  200
client FAILED                0
log delta (after-before)     252
duplicates (delta - SENT)    52
TCP reads dropped            52
elapsed                      14.887s

Смотри на две вещи. Первое — client SENT в обоих случаях 200, клиент думает, что успешно отправил все платежи. Второе — log delta 200 и 252 соответственно. С идемпотентностью брокер дедуплицирует ровно столько раз, сколько мы оборвали ответ; в логе ничего лишнего. Без идемпотентности 52 оборванных ответа = 52 дубля в логе — это 52 платежа, которые order-service увидит дважды. Один-в-один.

TCP reads dropped совпадает с duplicates в non-idempotent режиме почти точно — это и есть прямая иллюстрация того, как «потерянный ответ → ретрай → ещё одно списание в логе». В idempotent режиме та же цифра в dropped, но дублей нет — брокер сделал свою работу, клиента списали один раз.

Если поднять drop-rate сильно (например, 0.6), часть record'ов начнёт упираться в RecordDeliveryTimeout и появится колонка ошибок. Это другой режим — деградация, не дубли. Для демонстрации дублей хватит 0.3–0.4.

Что взять с собой

  • Идемпотентный продьюсер — это дедупликация на стороне партиции по тройке (producer-id, producer-epoch, sequence). Чисто протокольная штука, без хранилища дубликатов на клиенте.
  • Включён по дефолту в franz-go при acks=all. Чтобы выключить — kgo.DisableIdempotentWrite(). Имеет смысл выключать только если ты сознательно хочешь acks=0/1 (и понимаешь, что теряешь).
  • Спасает от дублей при ретраях внутри одной сессии продьюсера. Для brew.payments.v1 это значит: один платёж — одна запись в логе, даже когда сеть глотает ответы в промо-пик. Самый частый источник дублей в проде и самая дешёвая защита от двойного списания.
  • Не спасает от дублей, которые приложение генерит само — повторные вызовы при перезапуске payment-service с не-подтверждёнными платежами, дубли на стороне источника данных. И от зомби-продьюсеров между сессиями: для них — transactional.id + epoch fencing (модуль 04).
  • max.in.flight.requests.per.connection идемпотентный режим держит ≤ 5 сам. Перестановки батчей в логе из-за асинхронного ретрая невозможны.
  • Цена включения близка к нулю на здоровом кластере. Чуть-чуть метаданных в батче, один лишний RPC на старте (InitProducerId). Дальше — тот же поток данных.

В Батчинг и пропускная способность разберёмся с батчингом и компрессией — оттуда же поймём, почему max.in.flight ≤ 5 практически не влияет на throughput.

Запуск

Стенд должен быть поднят (docker compose up -d из корня).

Прогон с идемпотентностью:

sh
make run-with-idempotence

Прогон без идемпотентности:

sh
make run-without-idempotence

С другим количеством сообщений и долей дропов:

sh
make run-without-idempotence MESSAGES=500 DROP_RATE=0.4

Между прогонами топик можно прибрать, чтобы дельта шла с нуля (иначе дельта всё равно считается корректно, но end-offsets копятся):

sh
make topic-delete

Сколько физических записей в логе — отдельно от Go-программы:

sh
make topic-count

Описать партиции:

sh
make topic-describe
·Модуль 02

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Продьюсер / Идемпотентный продьюсер