Kafka CookbookКонсьюмерГарантии обработки
0 / 42 (0%)

Гарантии обработки

В прошлой лекции мы крутили ручки commit'а offset'ов и ловили дубли разной величины. Никакая комбинация не дала нуля. Дело не в багах конкретного кода — голые commit'ы устроены так, что дубли возможны всегда, меняется лишь окно. Тут разберём другой подход. Дубли пускай приходят. Главное — чтобы kitchen-service не приготовил один заказ из brew.orders.v1 дважды.

Три уровня гарантий — и куда их вешать

В литературе про processing guarantees принято раскладывать мир на категории. Названия знакомые: at-most-once и at-least-once на одном полюсе, exactly-once на другом. Эти ярлыки удобно показывать на слайдах, но они путают, потому что ничего не говорят о том, где именно гарантия. Гарантия чего? Доставки байтов? Записи в БД? Списания денег у клиента? Отправки уведомления? Это разные слои, и каждый слой про своё.

Раскладка по слоям:

  1. Доставка байтов от брокера до твоего процесса. Тут at-least-once — единственный реалистичный режим в production. Сеть моргнула, запрос ушёл, ack потерялся — будет повтор. Голый Kafka и любой клиент над ним.
  2. Сдвиг committed offset'а в __consumer_offsets. Тут можно играть в at-most/at-least в зависимости от того, когда ты коммитишь — до обработки или после (см. лекцию Коммиты offset'ов). Третьего варианта на этом слое нет.
  3. Эффект обработки в твоём приложении или внешней системе. А вот тут уже можно сделать exactly-once — через свойство самого обработчика, не через commit.

Третий слой и есть тема этой лекции. Если обработчик идемпотентный, повторный вызов с тем же входом ничего не меняет — и уже не важно, что Kafka отдала запись дважды. На системном уровне результат один и тот же.

Идемпотентность — это математика, а не магия

Функция f(x) идемпотентна, если f(x) == f(f(x)). Применил один раз — получил результат. Применил два раза — получил тот же результат. Применил десять раз — то же.

Не любой обработчик такой по природе. account.balance += 100 — нет: два вызова дадут +200. А account.balance = 100 — да, сколько ни вызывай. INSERT INTO orders ... — нет. INSERT ... ON CONFLICT DO NOTHING по уникальному ключу — да.

Идемпотентность не появляется сама. Её надо проектировать в обработчике: либо подобрать операцию, которая по природе идемпотентна (типа SET вместо INCR), либо поставить дедуп — таблицу/индекс, по которому повтор отсекается.

В нашей лекции — второй путь. Каждое сообщение из Kafka хочет вставиться в таблицу messages. Но первичный ключ таблицы — (topic, partition, offset). Эти три числа однозначно идентифицируют конкретное сообщение в Kafka. Если оно уже есть в таблице — INSERT ... ON CONFLICT DO NOTHING молча скипнет. Эффект на БД — одинаковый, сколько раз ни попробуй.

Почему именно (topic, partition, offset) как ключ дедупа

Кажется естественно взять бизнес-ключ — id заказа или customer_id. Это иногда работает, но имеет неприятный edge case: один и тот же бизнес-объект может прилететь в Kafka дважды от разных источников или с разной семантикой обновления. Тогда дедуп по бизнес-ключу заглушит легитимное «то же id, новое состояние».

(topic, partition, offset) — это адрес записи в логе Kafka. Он гарантированно уникален в рамках топика. Один и тот же offset в одной партиции — это ровно та же запись. Идентичная байт-в-байт. Дубль такой записи на consumer-стороне может появиться только из-за рестарта без commit'а. Это именно то, что мы хотим отсечь.

Минус подхода — если ты переливаешь данные между топиками или меняешь схему партиционирования, ключ перестаёт быть стабильным (offset поменяется). Но это уже другой класс проблем — миграция данных, и решается отдельно.

Если сценарий допускает — комбинируй: бизнес-ключ для семантики плюс idempotency-key из payload или headers, который продьюсер генерирует один раз на запись. Тогда даже после re-partitioning дедуп переживёт.

Что делает наш код

Вся программа — один цикл poll-обработка-commit. Главная штука внутри — порядок: insert в БД перед commit'ом offset'а в Kafka. Если упасть между этими двумя точками — на рестарте сообщение прилетит снова, но ON CONFLICT его уберёт. Если упасть до insert'а — на рестарте сообщение прилетит снова и нормально вставится.

Подключение к Postgres через pgxpool, к Kafka — через franz-go с выключенным auto-commit:

go
opts := []kgo.Opt{
    kgo.ConsumerGroup(o.group),
    kgo.ConsumeTopics(o.topic),
    kgo.DisableAutoCommit(),
    // ...
}

Сам insert — голый pool.Exec с SQL, в котором PRIMARY KEY делает всю работу:

go
const insertSQL = `
INSERT INTO messages (topic, partition, "offset", payload, processed_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (topic, partition, "offset") DO NOTHING
`
 
tag, err := pool.Exec(ctx, insertSQL,
    r.Topic, int32(r.Partition), r.Offset, string(r.Value))

tag.RowsAffected() отвечает на вопрос «была ли это новая запись или повтор». Единица — новая, ноль — дубль, его уже когда-то обрабатывали. В демке мы это печатаем (INSERT vs DUP), чтобы было видно эффект защиты.

После того как все записи батча прошли через insert (часть как INSERT, часть как DUP — нам всё равно), коммитим offset:

go
err := cl.CommitRecords(commitCtx, batch...)

И только теперь Kafka «забывает» эти offset'ы — они переходят в discarded прошлое для нашей группы.

Где находится crash, который мы имитируем

В коде есть -crash-after N. Когда счётчик обработанных доходит до N — os.Exit(1) без Close, без commit'а. Важная деталь: это происходит между insert'ом и commit'ом. Insert уже выполнен, transaction в Postgres закоммичена (мы используем autocommit pgx), а Kafka ещё не знает, что мы дошли до этого offset'а. Сценарий:

plaintext
PollFetches → 11 записей в батче
  insert #1   ✓
  insert #2   ✓
  ...
  insert #10  ✓
  os.Exit(1)             ← crash, НЕТ CommitRecords

На рестарте Kafka отдаёт нам тот же батч с самого начала (committed offset не сдвинулся). Мы заходим во второй insert той же записи — ON CONFLICT DO NOTHINGRowsAffected()=0 — DUP в логе. Бизнес-эффект остался ровно один раз.

Как прогнать сценарий руками

Сначала собираем стенд:

sh
make up                    # Postgres из docker-compose.override.yml
make db-init               # таблица messages с PK (topic, partition, offset)
make topic-create          # топик с 3 партициями
make topic-load            # 30 сообщений (k-1..k-30 → event-1..event-30)

Дальше «крашевый» прогон:

sh
make run CRASH=10          # обработает 10 записей, упадёт ДО commit'а
make db-count              # 10 строк в messages

Группа сейчас знает только про commit'ы тех батчей, которые целиком успели — у нас это первый батч (или часть, где успели). Остальные offset'ы для группы как будто не читались. На втором прогоне они придут заново:

sh
make run                   # Ctrl+C когда лог перестанет расти
make db-count              # ровно 30 — без дублей

В выводе второго прогона видны строки DUP ... (уже было — ON CONFLICT) для тех записей, что приехали повторно. Это и есть видимое доказательство, что дедуп сработал. В таблице после двух прогонов — ровно 30 уникальных записей, по числу сообщений в топике.

Полная очистка:

sh
make clean                 # truncate + удалить группу + удалить топик
make down                  # остановить Postgres + удалить volume

Tradeoffs идемпотентного подхода

За exactly-once-effect платим четырьмя вещами.

Первое — каждый insert ходит в БД, даже если запись окажется дублем. На high-throughput'е (десятки тысяч сообщений в секунду) это заметная нагрузка на Postgres. Лечится батчингом insert'ов: накапливай N записей и шли одним INSERT ... ON CONFLICT или COPY ... ON CONFLICT (через staging таблицу). Дедуп остаётся, оверхед размазывается.

Второе — таблица растёт. Если retention топика — 7 дней, а dedup-таблица — навсегда, она быстро станет огромной. Лечится либо TTL по processed_at (cron-задача чистит старые записи), либо range_partitioning по дате с DROP старых партиций. В обоих случаях окно дедупа должно быть больше, чем ожидаемый промежуток между retry — иначе после очистки старая запись снова станет «новой» и пройдёт повторно.

Третье — это работает только пока БД и Kafka живут параллельно. Если БД упала после insert'а, но до commit'а — Kafka не сдвинула offset, мы рестартанули, БД поднялась, повторный insert получил ON CONFLICT — всё ок. А если БД потеряла данные (восстановление из бэкапа в прошлое) — наш дедуп сломан, потому что дубль больше не считается дублем. Это уже катастрофический сценарий, для него нужен отдельный план recovery.

Четвёртое — atomicity на стороне БД. Здесь у нас один insert, и Postgres его коммитит атомарно. Если бизнес-логика сложнее (несколько UPDATE'ов плюс вызов внешнего API) — нужно либо обернуть всё в одну транзакцию, либо вынести наружу через transactional outbox (Outbox-паттерн). Если ни то ни другое не подходит — остаётся принять, что часть операций может быть not-exactly-once, и компенсировать постфактум.

Где этот подход не подойдёт

Если в обработке есть внешний side effect, который не идемпотентен — например, пуш «заказ готов» через notification-service, списание денег у платёжного шлюза без идемпотентного API — голый ON CONFLICT не спасёт. Insert в локальную БД пройдёт идемпотентно, а пуш уйдёт второй раз.

Решения известные. Либо downstream сам поддерживает idempotency-key (платёжные шлюзы обычно умеют). Либо ты строишь outbox: insert в БД и outbox-таблицу в одной транзакции, отдельный publisher шлёт outbox в Kafka, идемпотентный consumer тянет наружу. Тогда «отправил пуш» = «обновил outbox.sent_at = NOW()», а сам факт отправки прошёл через идемпотентный downstream.

Это уже Outbox-паттерн и Доставка во внешние системы. В этой лекции у нас всё проще: один insert в одну таблицу. Всё остальное живёт в той же БД, дедуп закрывает вопрос.

Что ещё попробовать

  • запусти make run CRASH=15 — упадёшь после 15 записей; потом make run без crash — увидишь дубли в логе и db-count покажет ровно 30;
  • увеличь WORK_DELAY=400ms — обработка медленнее, у тебя есть время сравнивать make db-count в другой консоли по ходу прогона;
  • удали committed offset группы (make group-delete) и снова запусти make run — увидишь, что все 30 записей в логе будут DUP, но db-count останется 30; это и есть «exactly-once-effect»;
  • сделай make db-truncate без удаления группы — на рестарте группа дочитает с того места, где остановилась (commit'нутый offset цел), но в таблице будет меньше 30, потому что часть записей мы не «увидим» заново;
  • замени PRIMARY KEY на (topic, partition) (не offset) — увидишь, что дедуп начинает рубить легитимные повторные сообщения от одного producer'а в ту же партицию.

Дальше

Этот подход — фундамент для всего, что будет в модуле 04. Транзакции (Транзакции и EOS) дадут exactly-once на уровне Kafka-to-Kafka pipeline. Outbox (Outbox-паттерн) — exactly-once на стыке БД и Kafka. External delivery (Доставка во внешние системы) — что делать, когда downstream не идемпотентен. Везде идея одна и та же: придумай способ распознать повтор и сделать его no-op. Поменяется только инструмент.

Следующая лекция (Обработка ошибок) — про error handling: что делать, когда обработчик завершается ошибкой. Skip, retry, retry-topic, DLQ. Идемпотентность останется фоном — она нужна везде, где есть повтор, а повторов в error handling по определению много.

·Модуль 03

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Консьюмер / Гарантии обработки