Гарантии обработки
В прошлой лекции мы крутили ручки commit'а offset'ов и ловили дубли разной величины. Никакая комбинация не дала нуля. Дело не в багах конкретного кода — голые commit'ы устроены так, что дубли возможны всегда, меняется лишь окно. Тут разберём другой подход. Дубли пускай приходят. Главное — чтобы kitchen-service не приготовил один заказ из brew.orders.v1 дважды.
Три уровня гарантий — и куда их вешать
В литературе про processing guarantees принято раскладывать мир на категории. Названия знакомые: at-most-once и at-least-once на одном полюсе, exactly-once на другом. Эти ярлыки удобно показывать на слайдах, но они путают, потому что ничего не говорят о том, где именно гарантия. Гарантия чего? Доставки байтов? Записи в БД? Списания денег у клиента? Отправки уведомления? Это разные слои, и каждый слой про своё.
Раскладка по слоям:
- Доставка байтов от брокера до твоего процесса. Тут at-least-once — единственный реалистичный режим в production. Сеть моргнула, запрос ушёл, ack потерялся — будет повтор. Голый Kafka и любой клиент над ним.
- Сдвиг committed offset'а в
__consumer_offsets. Тут можно играть в at-most/at-least в зависимости от того, когда ты коммитишь — до обработки или после (см. лекцию Коммиты offset'ов). Третьего варианта на этом слое нет. - Эффект обработки в твоём приложении или внешней системе. А вот тут уже можно сделать exactly-once — через свойство самого обработчика, не через commit.
Третий слой и есть тема этой лекции. Если обработчик идемпотентный, повторный вызов с тем же входом ничего не меняет — и уже не важно, что Kafka отдала запись дважды. На системном уровне результат один и тот же.
Идемпотентность — это математика, а не магия
Функция f(x) идемпотентна, если f(x) == f(f(x)). Применил один раз — получил результат. Применил два раза — получил тот же результат. Применил десять раз — то же.
Не любой обработчик такой по природе. account.balance += 100 — нет: два вызова дадут +200. А account.balance = 100 — да, сколько ни вызывай. INSERT INTO orders ... — нет. INSERT ... ON CONFLICT DO NOTHING по уникальному ключу — да.
Идемпотентность не появляется сама. Её надо проектировать в обработчике: либо подобрать операцию, которая по природе идемпотентна (типа SET вместо INCR), либо поставить дедуп — таблицу/индекс, по которому повтор отсекается.
В нашей лекции — второй путь. Каждое сообщение из Kafka хочет вставиться в таблицу messages. Но первичный ключ таблицы — (topic, partition, offset). Эти три числа однозначно идентифицируют конкретное сообщение в Kafka. Если оно уже есть в таблице — INSERT ... ON CONFLICT DO NOTHING молча скипнет. Эффект на БД — одинаковый, сколько раз ни попробуй.
Почему именно (topic, partition, offset) как ключ дедупа
Кажется естественно взять бизнес-ключ — id заказа или customer_id. Это иногда работает, но имеет неприятный edge case: один и тот же бизнес-объект может прилететь в Kafka дважды от разных источников или с разной семантикой обновления. Тогда дедуп по бизнес-ключу заглушит легитимное «то же id, новое состояние».
(topic, partition, offset) — это адрес записи в логе Kafka. Он гарантированно уникален в рамках топика. Один и тот же offset в одной партиции — это ровно та же запись. Идентичная байт-в-байт. Дубль такой записи на consumer-стороне может появиться только из-за рестарта без commit'а. Это именно то, что мы хотим отсечь.
Минус подхода — если ты переливаешь данные между топиками или меняешь схему партиционирования, ключ перестаёт быть стабильным (offset поменяется). Но это уже другой класс проблем — миграция данных, и решается отдельно.
Если сценарий допускает — комбинируй: бизнес-ключ для семантики плюс idempotency-key из payload или headers, который продьюсер генерирует один раз на запись. Тогда даже после re-partitioning дедуп переживёт.
Что делает наш код
Вся программа — один цикл poll-обработка-commit. Главная штука внутри — порядок: insert в БД перед commit'ом offset'а в Kafka. Если упасть между этими двумя точками — на рестарте сообщение прилетит снова, но ON CONFLICT его уберёт. Если упасть до insert'а — на рестарте сообщение прилетит снова и нормально вставится.
Подключение к Postgres через pgxpool, к Kafka — через franz-go с выключенным auto-commit:
opts := []kgo.Opt{
kgo.ConsumerGroup(o.group),
kgo.ConsumeTopics(o.topic),
kgo.DisableAutoCommit(),
// ...
}Сам insert — голый pool.Exec с SQL, в котором PRIMARY KEY делает всю работу:
const insertSQL = `
INSERT INTO messages (topic, partition, "offset", payload, processed_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (topic, partition, "offset") DO NOTHING
`
tag, err := pool.Exec(ctx, insertSQL,
r.Topic, int32(r.Partition), r.Offset, string(r.Value))tag.RowsAffected() отвечает на вопрос «была ли это новая запись или повтор». Единица — новая, ноль — дубль, его уже когда-то обрабатывали. В демке мы это печатаем (INSERT vs DUP), чтобы было видно эффект защиты.
После того как все записи батча прошли через insert (часть как INSERT, часть как DUP — нам всё равно), коммитим offset:
err := cl.CommitRecords(commitCtx, batch...)И только теперь Kafka «забывает» эти offset'ы — они переходят в discarded прошлое для нашей группы.
Где находится crash, который мы имитируем
В коде есть -crash-after N. Когда счётчик обработанных доходит до N — os.Exit(1) без Close, без commit'а. Важная деталь: это происходит между insert'ом и commit'ом. Insert уже выполнен, transaction в Postgres закоммичена (мы используем autocommit pgx), а Kafka ещё не знает, что мы дошли до этого offset'а. Сценарий:
PollFetches → 11 записей в батче
insert #1 ✓
insert #2 ✓
...
insert #10 ✓
os.Exit(1) ← crash, НЕТ CommitRecordsНа рестарте Kafka отдаёт нам тот же батч с самого начала (committed offset не сдвинулся). Мы заходим во второй insert той же записи — ON CONFLICT DO NOTHING — RowsAffected()=0 — DUP в логе. Бизнес-эффект остался ровно один раз.
Как прогнать сценарий руками
Сначала собираем стенд:
make up # Postgres из docker-compose.override.yml
make db-init # таблица messages с PK (topic, partition, offset)
make topic-create # топик с 3 партициями
make topic-load # 30 сообщений (k-1..k-30 → event-1..event-30)Дальше «крашевый» прогон:
make run CRASH=10 # обработает 10 записей, упадёт ДО commit'а
make db-count # 10 строк в messagesГруппа сейчас знает только про commit'ы тех батчей, которые целиком успели — у нас это первый батч (или часть, где успели). Остальные offset'ы для группы как будто не читались. На втором прогоне они придут заново:
make run # Ctrl+C когда лог перестанет расти
make db-count # ровно 30 — без дублейВ выводе второго прогона видны строки DUP ... (уже было — ON CONFLICT) для тех записей, что приехали повторно. Это и есть видимое доказательство, что дедуп сработал. В таблице после двух прогонов — ровно 30 уникальных записей, по числу сообщений в топике.
Полная очистка:
make clean # truncate + удалить группу + удалить топик
make down # остановить Postgres + удалить volumeTradeoffs идемпотентного подхода
За exactly-once-effect платим четырьмя вещами.
Первое — каждый insert ходит в БД, даже если запись окажется дублем. На high-throughput'е (десятки тысяч сообщений в секунду) это заметная нагрузка на Postgres. Лечится батчингом insert'ов: накапливай N записей и шли одним INSERT ... ON CONFLICT или COPY ... ON CONFLICT (через staging таблицу). Дедуп остаётся, оверхед размазывается.
Второе — таблица растёт. Если retention топика — 7 дней, а dedup-таблица — навсегда, она быстро станет огромной. Лечится либо TTL по processed_at (cron-задача чистит старые записи), либо range_partitioning по дате с DROP старых партиций. В обоих случаях окно дедупа должно быть больше, чем ожидаемый промежуток между retry — иначе после очистки старая запись снова станет «новой» и пройдёт повторно.
Третье — это работает только пока БД и Kafka живут параллельно. Если БД упала после insert'а, но до commit'а — Kafka не сдвинула offset, мы рестартанули, БД поднялась, повторный insert получил ON CONFLICT — всё ок. А если БД потеряла данные (восстановление из бэкапа в прошлое) — наш дедуп сломан, потому что дубль больше не считается дублем. Это уже катастрофический сценарий, для него нужен отдельный план recovery.
Четвёртое — atomicity на стороне БД. Здесь у нас один insert, и Postgres его коммитит атомарно. Если бизнес-логика сложнее (несколько UPDATE'ов плюс вызов внешнего API) — нужно либо обернуть всё в одну транзакцию, либо вынести наружу через transactional outbox (Outbox-паттерн). Если ни то ни другое не подходит — остаётся принять, что часть операций может быть not-exactly-once, и компенсировать постфактум.
Где этот подход не подойдёт
Если в обработке есть внешний side effect, который не идемпотентен — например, пуш «заказ готов» через notification-service, списание денег у платёжного шлюза без идемпотентного API — голый ON CONFLICT не спасёт. Insert в локальную БД пройдёт идемпотентно, а пуш уйдёт второй раз.
Решения известные. Либо downstream сам поддерживает idempotency-key (платёжные шлюзы обычно умеют). Либо ты строишь outbox: insert в БД и outbox-таблицу в одной транзакции, отдельный publisher шлёт outbox в Kafka, идемпотентный consumer тянет наружу. Тогда «отправил пуш» = «обновил outbox.sent_at = NOW()», а сам факт отправки прошёл через идемпотентный downstream.
Это уже Outbox-паттерн и Доставка во внешние системы. В этой лекции у нас всё проще: один insert в одну таблицу. Всё остальное живёт в той же БД, дедуп закрывает вопрос.
Что ещё попробовать
- запусти
make run CRASH=15— упадёшь после 15 записей; потомmake runбез crash — увидишь дубли в логе иdb-countпокажет ровно 30; - увеличь
WORK_DELAY=400ms— обработка медленнее, у тебя есть время сравниватьmake db-countв другой консоли по ходу прогона; - удали committed offset группы (
make group-delete) и снова запустиmake run— увидишь, что все 30 записей в логе будут DUP, ноdb-countостанется 30; это и есть «exactly-once-effect»; - сделай
make db-truncateбез удаления группы — на рестарте группа дочитает с того места, где остановилась (commit'нутый offset цел), но в таблице будет меньше 30, потому что часть записей мы не «увидим» заново; - замени PRIMARY KEY на
(topic, partition)(не offset) — увидишь, что дедуп начинает рубить легитимные повторные сообщения от одного producer'а в ту же партицию.
Дальше
Этот подход — фундамент для всего, что будет в модуле 04. Транзакции (Транзакции и EOS) дадут exactly-once на уровне Kafka-to-Kafka pipeline. Outbox (Outbox-паттерн) — exactly-once на стыке БД и Kafka. External delivery (Доставка во внешние системы) — что делать, когда downstream не идемпотентен. Везде идея одна и та же: придумай способ распознать повтор и сделать его no-op. Поменяется только инструмент.
Следующая лекция (Обработка ошибок) — про error handling: что делать, когда обработчик завершается ошибкой. Skip, retry, retry-topic, DLQ. Идемпотентность останется фоном — она нужна везде, где есть повтор, а повторов в error handling по определению много.