Outbox-паттерн
В прошлой лекции была exactly-once связка между Kafka и Kafka — читаем из топика, пишем в другой, коммитим offset, всё внутри одной транзакции. Сейчас другая ситуация. У order-service есть своя Postgres и Kafka, и нам надо, чтобы при создании заказа Brew в таблице orders обязательно появилось событие order.created в топике brew.orders.v1. Эти два хранилища не знают друг про друга. Своих транзакций между ними нет.
И вот тут начинается то самое место, где люди годами наступают на грабли.
Два честных пути и оба ломаются
Подход первый, наивный — сначала пишем order.created в Kafka, потом заказ в Postgres. Если order-service упал между двумя шагами, событие в Kafka уже есть, а в БД заказа нет. На рестарте сервис не помнит, что что-то писал. Заказ не создаётся вообще, а downstream-консьюмеры уже уведомлены о его существовании. Дальше начинается фантомный заказ, по которому никогда не будет данных. inventory-service зарезервировал ингредиенты, payment-service ждёт платёж, поддержка Brew получает звонки.
Подход второй — сначала Postgres, потом Kafka. Заказ в orders лёг, COMMIT прошёл, между этим и Produce процесс упал. На рестарте никто не помнит, что нужно было опубликовать order.created. Заказ есть, события нет. inventory-service ничего не зарезервировал, payment-service не получил платёж, клиент Brew видит «принят», но дальше не двигается ничего.
Третий путь — какой-нибудь XA-протокол поверх двух фазовых коммитов. Существует, но почти нигде не применяется. Слишком хрупкий, медленный, операционно дорогой. Большинство БД и брокеров поддерживают его кое-как, и любая конкретная связка превращается в проект на полгода. Закроем эту тему и пойдём дальше.
Идея outbox
Главный трюк выглядит почти банально. Мы помещаем «надо опубликовать событие X» в саму БД, той же транзакцией, что и сам заказ. Появляется ещё одна таблица — outbox. В ней лежит то, что нужно отправить, и флаг «отправлено / не отправлено». При создании заказа у нас идёт одна Postgres-транзакция, в которой два INSERT — в orders и в outbox. Атомарность за нас держит сам Postgres.
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_id TEXT NOT NULL,
topic TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ NULL
);Дальше отдельный процесс (publisher) читает из этой таблицы, шлёт в Kafka, помечает запись как отправленную. Если он упал — на рестарте просто продолжит с того же места: запросом WHERE published_at IS NULL выберет всё, что не успело уехать. Состояние двух систем синхронизируется глазами на outbox: «всё, что есть в outbox, — рано или поздно появится в Kafka».
Платим мы за это одну вещь: дубли в Kafka возможны. Конкретный механизм — окно между «отправили в Kafka» и «пометили published_at». Если процесс упал прямо тут, на рестарте он отправит то же самое заново. С этим живём, защищаемся на стороне consumer'а.
Запись со стороны сервиса
Кода тут немного — главное чтобы оба INSERT'а были внутри одной BeginFunc. На каждый заказ — отдельная Postgres-транзакция:
err = pgx.BeginFunc(ctx, pool, func(tx pgx.Tx) error {
if err := tx.QueryRow(ctx, insertOrderSQL, customerID, amount).Scan(&orderID); err != nil {
return fmt.Errorf("INSERT orders: %w", err)
}
payload, _ := json.Marshal(orderEvent{...})
aggregateID := fmt.Sprintf("order-%d", orderID)
if err := tx.QueryRow(ctx, insertOutboxSQL, aggregateID, topic, string(payload)).Scan(&outboxID); err != nil {
return fmt.Errorf("INSERT outbox: %w", err)
}
return nil
})Если в первом INSERT падение — нет ни заказа, ни outbox-записи. Если падение во втором — Postgres откатывает оба. Если COMMIT прошёл — оба INSERT'а одновременно стали видимыми. Никаких половинчатых состояний.
aggregate_id я кладу в отдельную колонку и заодно в payload. Он же поедет в Key Kafka-сообщения — так все события про один заказ ложатся в одну партицию, порядок сохраняется.
Publisher
Дальше — отдельный процесс, который читает из outbox и шлёт в Kafka. Логика на каждой итерации:
- Открыть транзакцию.
SELECT id, aggregate_id, topic, payload FROM outbox WHERE published_at IS NULL ORDER BY id LIMIT 100 FOR UPDATE SKIP LOCKED.- Для каждой строки сделать
Produce. UPDATE outbox SET published_at = NOW() WHERE id = ANY($1).COMMIT.
FOR UPDATE SKIP LOCKED — must-have для горизонтального масштабирования publisher'а. Без него два процесса серьёзно друг друга блокируют: первый берёт строки под лок до COMMIT, второй ждёт, всё идёт в один поток. Со SKIP LOCKED второй процесс просто пропускает занятые строки и берёт следующие. Параллелизм без кооперации.
const fetchBatchSQL = `
SELECT id, aggregate_id, topic, payload::text
FROM outbox
WHERE published_at IS NULL
ORDER BY id
LIMIT $1
FOR UPDATE SKIP LOCKED
`Сам цикл публикации:
results := cl.ProduceSync(ctx, records...)
if err := results.FirstErr(); err != nil {
return 0, fmt.Errorf("ProduceSync: %w", err)
}
// ... вот здесь окно для crash'а — записи в Kafka, в БД ещё нет UPDATE ...
tag, err := tx.Exec(ctx, markPublishedSQL, ids)
if err != nil {
return 0, fmt.Errorf("UPDATE outbox: %w", err)
}
if err := tx.Commit(ctx); err != nil {
return 0, fmt.Errorf("Commit: %w", err)
}Идемпотентный producer (это дефолт franz-go) защищает от дублей внутри одной сессии — ретраи Produce не дублируют запись на брокере. От дубля «между сессиями» он не помогает: новый процесс получает новый producer-id, для брокера это другой клиент.
Что важно про порядок: я в коде помечаю строки published_at = NOW() в той же транзакции, что и SELECT. Если кто-то любитель, чтобы UPDATE прошёл успешно ДО Produce (мол, оптимистично) — пожалуйста, но дальше при ошибке Produce придётся возвращать UPDATE назад, а это ещё один шанс упасть в полупозиции. Пусть Produce будет первым, а UPDATE — последним перед COMMIT. Полупозиция в этом порядке всегда одна и та же: «уже опубликовано в Kafka, ещё не помечено в БД». Один сценарий восстановления, понятный.
Дубли и защита на consumer'е
Раз publisher даёт at-least-once, потребитель должен это пережить. Стандартный приём — dedup-таблица по сквозному id события. У нас этот id уже есть: outbox.id — стабильный, монотонно растущий, гарантированно неповторяющийся. Кладём его в header сообщения:
records[i] = &kgo.Record{
Topic: r.topic,
Key: []byte(r.aggregateID),
Value: []byte(r.payload),
Headers: []kgo.RecordHeader{
{Key: "outbox-id", Value: []byte(strconv.FormatInt(r.id, 10))},
{Key: "aggregate-id", Value: []byte(r.aggregateID)},
},
}Consumer читает header, делает простой INSERT в свою dedup-таблицу:
INSERT INTO processed_outbox_ids (outbox_id) VALUES ($1)
ON CONFLICT (outbox_id) DO NOTHINGRowsAffected = 1 — событие новое, обрабатываем. 0 — уже видели, тихо пропускаем. Просто, надёжно, требует только PRIMARY KEY на колонке outbox_id.
tag, err := pool.Exec(ctx, dedupSQL, outboxID)
// ...
if tag.RowsAffected() == 1 {
inserted.Add(1)
// здесь основная обработка: inventory-service резервирует ингредиенты, payment-service выставляет оплату, ...
} else {
skipped.Add(1)
}Тонкое место — где именно делать INSERT. Я делаю его перед основной обработкой: «я заявил, что обработаю это событие». Если падаю между INSERT и реальной работой, на рестарте это событие уже dedup'нется как «обработанное» и работа потеряется. На практике это лечится тем, что и реальная работа должна быть идемпотентной (например, тоже через ON CONFLICT в своей таблице по тому же outbox_id). Тогда «защита от дублей» и «реальная обработка» сливаются в одну операцию, и проблемы нет. В этой лекции я для простоты делю их.
Polling vs CDC
То, что мы написали — polling-вариант. Каждые N миллисекунд publisher идёт в БД и спрашивает «есть что новое?». Простой, понятный, отлично работает на средних нагрузках (десятки тысяч событий в секунду тянет легко). Минус — задержка между «событие в outbox» и «событие в Kafka» зависит от частоты опроса. Можно поджимать до 100ms, можно ниже, дальше начинаешь ловить пустой запрос на каждом тике.
Альтернатива — CDC-publisher. Он вместо опроса таблицы слушает Postgres logical replication и видит INSERT в outbox в реальном времени. Это уже Debezium-территория: CDC из таблицы orders Brew мы разберём отдельно в Debezium CDC (та же нить — order-service и его Postgres). Главное отличие в гарантиях: CDC даёт меньшую задержку и переносит «координацию» из БД в реплику, но взамен требует настроенного wal_level=logical, отдельного слота, вечного процесса возле БД. На уровне модели приложения с outbox-таблицей всё то же: пишем в orders плюс outbox одной TX, кто-то читает из outbox и шлёт в Kafka. Меняется только этот «кто-то».
Третий вариант, который встречается в маленьких сервисах — publisher живёт в том же процессе, что и order-service, и работает на том же пуле. После каждого INSERT в outbox делает event-trigger в коде «иди и опубликуй сразу». Снижает задержку, но имеет проблемы с масштабированием на N инстансов и не пережёвывает рестарт сервиса (если только в нём не будет того же поллера на старте). Я не буду демонстрировать этот вариант в коде — для понимания паттерна важнее именно «отдельный publisher».
Демо
Подними БД, накати схему, создай топик:
make up
make db-init
make topic-createНалей 20 заказов:
make run-service COUNT=20В Postgres видим 20 строк в orders и 20 в outbox, у всех published_at IS NULL:
make db-count # 20
make outbox-count # 20
make outbox-pending # 20Дальше запускаем publisher с эмулированным крахом — он отправит первый батч в Kafka и упадёт, не успев пометить published_at:
make chaos-kill-publisher BATCH_SIZE=5Видим в выводе пять PUB-строк и сообщение про CRASH. В Kafka ушли пять записей, в outbox они всё ещё «не опубликованы». Запускаем нормальный publisher — он подберёт остаток (включая те самые пять, которые publisher с крахом пометить не успел) и опубликует всё:
make run-publisherСмотрим, что в outbox теперь все 20 строк помечены опубликованными:
make outbox-pending # 0Теперь consumer:
make run-consumerВ выводе видим 25 событий: 20 уникальных (INSERT) и 5 дублей от первого батча (DUP). Финальная статистика — processed=25 inserted=20 skipped=5. На consumer-стороне это 20 «реально обработанных» и 5 schluck'нутых дублей.
Посчитать:
make dup-count # 20 — столько уникальных событий обработаноЧто произошло целиком: было 20 заказов, в Kafka улетело 25 сообщений (5 дублей из-за crash'а), на consumer'е дедуп схлопнул дубли, на бизнес-логику попало 20 раз. Гарантия системы — at-least-once на producer-стороне плюс idempotency на consumer-стороне даёт effectively-once в наблюдаемом эффекте.
Что не закрывает outbox
Outbox — это про связку «БД одного сервиса ↔ Kafka». Внешние стороны (HTTP к чужому API, отправка email, вызов SMS-провайдера) в эту схему не входят. Если consumer обрабатывает событие, дёргает внешний API и потом коммитит offset — там же те же грабли «упали между двумя шагами», и outbox их не лечит. Лечит идемпотентность на стороне внешнего получателя (idempotency-key в HTTP-запросе), либо ещё один уровень outbox у consumer'а («Я зафиксировал намерение позвать API, и потом отдельным процессом зову»). Это уже про лекцию Доставка во внешние системы.
Ещё одно: outbox привязывает Kafka к жизненному циклу нашей БД. Если БД упала — publisher не сможет читать. На практике это нормально: если основная БД лежит, у нас и без Kafka проблемы. Но архитектурно стоит понимать, что мы сделали Postgres единственной точкой синхронизации.
И последнее. Размер таблицы. В outbox накапливаются опубликованные записи, и если ничего не делать, через год их там миллиарды. Лечится партиционированием по дате плюс отдельным процессом «очистить опубликованные старше 7 дней». Это операционка, не часть паттерна, но в эксплуатации забывать про неё не стоит.
Запуск целиком
make up
make db-init
make topic-create
make run-service COUNT=100 # 100 заказов в БД
make chaos-kill-publisher BATCH_SIZE=20 # упадёт после первого батча
make run-publisher # дочитывает остаток
make run-consumer # дедуплицирует дубли
make outbox-pending # 0
make dup-count # 100 (уникальных событий)
make end-offsets # сумма по партициям > 100 (с дублями)В качестве sanity-check полезно посмотреть make outbox-count (всегда ровно столько, сколько было заказов) и make dup-count (тоже ровно столько). Разница между «end-offsets» и «количество уникальных» — это и есть стоимость at-least-once гарантии outbox-publisher'а.