Kafka CookbookНадёжностьOutbox-паттерн
0 / 42 (0%)

Outbox-паттерн

В прошлой лекции была exactly-once связка между Kafka и Kafka — читаем из топика, пишем в другой, коммитим offset, всё внутри одной транзакции. Сейчас другая ситуация. У order-service есть своя Postgres и Kafka, и нам надо, чтобы при создании заказа Brew в таблице orders обязательно появилось событие order.created в топике brew.orders.v1. Эти два хранилища не знают друг про друга. Своих транзакций между ними нет.

И вот тут начинается то самое место, где люди годами наступают на грабли.

Два честных пути и оба ломаются

Подход первый, наивный — сначала пишем order.created в Kafka, потом заказ в Postgres. Если order-service упал между двумя шагами, событие в Kafka уже есть, а в БД заказа нет. На рестарте сервис не помнит, что что-то писал. Заказ не создаётся вообще, а downstream-консьюмеры уже уведомлены о его существовании. Дальше начинается фантомный заказ, по которому никогда не будет данных. inventory-service зарезервировал ингредиенты, payment-service ждёт платёж, поддержка Brew получает звонки.

Подход второй — сначала Postgres, потом Kafka. Заказ в orders лёг, COMMIT прошёл, между этим и Produce процесс упал. На рестарте никто не помнит, что нужно было опубликовать order.created. Заказ есть, события нет. inventory-service ничего не зарезервировал, payment-service не получил платёж, клиент Brew видит «принят», но дальше не двигается ничего.

Третий путь — какой-нибудь XA-протокол поверх двух фазовых коммитов. Существует, но почти нигде не применяется. Слишком хрупкий, медленный, операционно дорогой. Большинство БД и брокеров поддерживают его кое-как, и любая конкретная связка превращается в проект на полгода. Закроем эту тему и пойдём дальше.

Идея outbox

Главный трюк выглядит почти банально. Мы помещаем «надо опубликовать событие X» в саму БД, той же транзакцией, что и сам заказ. Появляется ещё одна таблица — outbox. В ней лежит то, что нужно отправить, и флаг «отправлено / не отправлено». При создании заказа у нас идёт одна Postgres-транзакция, в которой два INSERT — в orders и в outbox. Атомарность за нас держит сам Postgres.

sql
CREATE TABLE outbox (
    id            BIGSERIAL    PRIMARY KEY,
    aggregate_id  TEXT         NOT NULL,
    topic         TEXT         NOT NULL,
    payload       JSONB        NOT NULL,
    created_at    TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    published_at  TIMESTAMPTZ  NULL
);

Дальше отдельный процесс (publisher) читает из этой таблицы, шлёт в Kafka, помечает запись как отправленную. Если он упал — на рестарте просто продолжит с того же места: запросом WHERE published_at IS NULL выберет всё, что не успело уехать. Состояние двух систем синхронизируется глазами на outbox: «всё, что есть в outbox, — рано или поздно появится в Kafka».

Платим мы за это одну вещь: дубли в Kafka возможны. Конкретный механизм — окно между «отправили в Kafka» и «пометили published_at». Если процесс упал прямо тут, на рестарте он отправит то же самое заново. С этим живём, защищаемся на стороне consumer'а.

Запись со стороны сервиса

Кода тут немного — главное чтобы оба INSERT'а были внутри одной BeginFunc. На каждый заказ — отдельная Postgres-транзакция:

go
err = pgx.BeginFunc(ctx, pool, func(tx pgx.Tx) error {
    if err := tx.QueryRow(ctx, insertOrderSQL, customerID, amount).Scan(&orderID); err != nil {
        return fmt.Errorf("INSERT orders: %w", err)
    }
 
    payload, _ := json.Marshal(orderEvent{...})
    aggregateID := fmt.Sprintf("order-%d", orderID)
 
    if err := tx.QueryRow(ctx, insertOutboxSQL, aggregateID, topic, string(payload)).Scan(&outboxID); err != nil {
        return fmt.Errorf("INSERT outbox: %w", err)
    }
    return nil
})

Если в первом INSERT падение — нет ни заказа, ни outbox-записи. Если падение во втором — Postgres откатывает оба. Если COMMIT прошёл — оба INSERT'а одновременно стали видимыми. Никаких половинчатых состояний.

aggregate_id я кладу в отдельную колонку и заодно в payload. Он же поедет в Key Kafka-сообщения — так все события про один заказ ложатся в одну партицию, порядок сохраняется.

Publisher

Дальше — отдельный процесс, который читает из outbox и шлёт в Kafka. Логика на каждой итерации:

  1. Открыть транзакцию.
  2. SELECT id, aggregate_id, topic, payload FROM outbox WHERE published_at IS NULL ORDER BY id LIMIT 100 FOR UPDATE SKIP LOCKED.
  3. Для каждой строки сделать Produce.
  4. UPDATE outbox SET published_at = NOW() WHERE id = ANY($1).
  5. COMMIT.

FOR UPDATE SKIP LOCKED — must-have для горизонтального масштабирования publisher'а. Без него два процесса серьёзно друг друга блокируют: первый берёт строки под лок до COMMIT, второй ждёт, всё идёт в один поток. Со SKIP LOCKED второй процесс просто пропускает занятые строки и берёт следующие. Параллелизм без кооперации.

go
const fetchBatchSQL = `
SELECT id, aggregate_id, topic, payload::text
  FROM outbox
 WHERE published_at IS NULL
 ORDER BY id
 LIMIT $1
 FOR UPDATE SKIP LOCKED
`

Сам цикл публикации:

go
results := cl.ProduceSync(ctx, records...)
if err := results.FirstErr(); err != nil {
    return 0, fmt.Errorf("ProduceSync: %w", err)
}
// ... вот здесь окно для crash'а — записи в Kafka, в БД ещё нет UPDATE ...
tag, err := tx.Exec(ctx, markPublishedSQL, ids)
if err != nil {
    return 0, fmt.Errorf("UPDATE outbox: %w", err)
}
if err := tx.Commit(ctx); err != nil {
    return 0, fmt.Errorf("Commit: %w", err)
}

Идемпотентный producer (это дефолт franz-go) защищает от дублей внутри одной сессии — ретраи Produce не дублируют запись на брокере. От дубля «между сессиями» он не помогает: новый процесс получает новый producer-id, для брокера это другой клиент.

Что важно про порядок: я в коде помечаю строки published_at = NOW() в той же транзакции, что и SELECT. Если кто-то любитель, чтобы UPDATE прошёл успешно ДО Produce (мол, оптимистично) — пожалуйста, но дальше при ошибке Produce придётся возвращать UPDATE назад, а это ещё один шанс упасть в полупозиции. Пусть Produce будет первым, а UPDATE — последним перед COMMIT. Полупозиция в этом порядке всегда одна и та же: «уже опубликовано в Kafka, ещё не помечено в БД». Один сценарий восстановления, понятный.

Дубли и защита на consumer'е

Раз publisher даёт at-least-once, потребитель должен это пережить. Стандартный приём — dedup-таблица по сквозному id события. У нас этот id уже есть: outbox.id — стабильный, монотонно растущий, гарантированно неповторяющийся. Кладём его в header сообщения:

go
records[i] = &kgo.Record{
    Topic: r.topic,
    Key:   []byte(r.aggregateID),
    Value: []byte(r.payload),
    Headers: []kgo.RecordHeader{
        {Key: "outbox-id", Value: []byte(strconv.FormatInt(r.id, 10))},
        {Key: "aggregate-id", Value: []byte(r.aggregateID)},
    },
}

Consumer читает header, делает простой INSERT в свою dedup-таблицу:

sql
INSERT INTO processed_outbox_ids (outbox_id) VALUES ($1)
ON CONFLICT (outbox_id) DO NOTHING

RowsAffected = 1 — событие новое, обрабатываем. 0 — уже видели, тихо пропускаем. Просто, надёжно, требует только PRIMARY KEY на колонке outbox_id.

go
tag, err := pool.Exec(ctx, dedupSQL, outboxID)
// ...
if tag.RowsAffected() == 1 {
    inserted.Add(1)
    // здесь основная обработка: inventory-service резервирует ингредиенты, payment-service выставляет оплату, ...
} else {
    skipped.Add(1)
}

Тонкое место — где именно делать INSERT. Я делаю его перед основной обработкой: «я заявил, что обработаю это событие». Если падаю между INSERT и реальной работой, на рестарте это событие уже dedup'нется как «обработанное» и работа потеряется. На практике это лечится тем, что и реальная работа должна быть идемпотентной (например, тоже через ON CONFLICT в своей таблице по тому же outbox_id). Тогда «защита от дублей» и «реальная обработка» сливаются в одну операцию, и проблемы нет. В этой лекции я для простоты делю их.

Polling vs CDC

То, что мы написали — polling-вариант. Каждые N миллисекунд publisher идёт в БД и спрашивает «есть что новое?». Простой, понятный, отлично работает на средних нагрузках (десятки тысяч событий в секунду тянет легко). Минус — задержка между «событие в outbox» и «событие в Kafka» зависит от частоты опроса. Можно поджимать до 100ms, можно ниже, дальше начинаешь ловить пустой запрос на каждом тике.

Альтернатива — CDC-publisher. Он вместо опроса таблицы слушает Postgres logical replication и видит INSERT в outbox в реальном времени. Это уже Debezium-территория: CDC из таблицы orders Brew мы разберём отдельно в Debezium CDC (та же нить — order-service и его Postgres). Главное отличие в гарантиях: CDC даёт меньшую задержку и переносит «координацию» из БД в реплику, но взамен требует настроенного wal_level=logical, отдельного слота, вечного процесса возле БД. На уровне модели приложения с outbox-таблицей всё то же: пишем в orders плюс outbox одной TX, кто-то читает из outbox и шлёт в Kafka. Меняется только этот «кто-то».

Третий вариант, который встречается в маленьких сервисах — publisher живёт в том же процессе, что и order-service, и работает на том же пуле. После каждого INSERT в outbox делает event-trigger в коде «иди и опубликуй сразу». Снижает задержку, но имеет проблемы с масштабированием на N инстансов и не пережёвывает рестарт сервиса (если только в нём не будет того же поллера на старте). Я не буду демонстрировать этот вариант в коде — для понимания паттерна важнее именно «отдельный publisher».

Демо

Подними БД, накати схему, создай топик:

sh
make up
make db-init
make topic-create

Налей 20 заказов:

sh
make run-service COUNT=20

В Postgres видим 20 строк в orders и 20 в outbox, у всех published_at IS NULL:

sh
make db-count          # 20
make outbox-count      # 20
make outbox-pending    # 20

Дальше запускаем publisher с эмулированным крахом — он отправит первый батч в Kafka и упадёт, не успев пометить published_at:

sh
make chaos-kill-publisher BATCH_SIZE=5

Видим в выводе пять PUB-строк и сообщение про CRASH. В Kafka ушли пять записей, в outbox они всё ещё «не опубликованы». Запускаем нормальный publisher — он подберёт остаток (включая те самые пять, которые publisher с крахом пометить не успел) и опубликует всё:

sh
make run-publisher

Смотрим, что в outbox теперь все 20 строк помечены опубликованными:

sh
make outbox-pending    # 0

Теперь consumer:

sh
make run-consumer

В выводе видим 25 событий: 20 уникальных (INSERT) и 5 дублей от первого батча (DUP). Финальная статистика — processed=25 inserted=20 skipped=5. На consumer-стороне это 20 «реально обработанных» и 5 schluck'нутых дублей.

Посчитать:

sh
make dup-count         # 20 — столько уникальных событий обработано

Что произошло целиком: было 20 заказов, в Kafka улетело 25 сообщений (5 дублей из-за crash'а), на consumer'е дедуп схлопнул дубли, на бизнес-логику попало 20 раз. Гарантия системы — at-least-once на producer-стороне плюс idempotency на consumer-стороне даёт effectively-once в наблюдаемом эффекте.

Что не закрывает outbox

Outbox — это про связку «БД одного сервиса ↔ Kafka». Внешние стороны (HTTP к чужому API, отправка email, вызов SMS-провайдера) в эту схему не входят. Если consumer обрабатывает событие, дёргает внешний API и потом коммитит offset — там же те же грабли «упали между двумя шагами», и outbox их не лечит. Лечит идемпотентность на стороне внешнего получателя (idempotency-key в HTTP-запросе), либо ещё один уровень outbox у consumer'а («Я зафиксировал намерение позвать API, и потом отдельным процессом зову»). Это уже про лекцию Доставка во внешние системы.

Ещё одно: outbox привязывает Kafka к жизненному циклу нашей БД. Если БД упала — publisher не сможет читать. На практике это нормально: если основная БД лежит, у нас и без Kafka проблемы. Но архитектурно стоит понимать, что мы сделали Postgres единственной точкой синхронизации.

И последнее. Размер таблицы. В outbox накапливаются опубликованные записи, и если ничего не делать, через год их там миллиарды. Лечится партиционированием по дате плюс отдельным процессом «очистить опубликованные старше 7 дней». Это операционка, не часть паттерна, но в эксплуатации забывать про неё не стоит.

Запуск целиком

sh
make up
make db-init
make topic-create
 
make run-service COUNT=100             # 100 заказов в БД
make chaos-kill-publisher BATCH_SIZE=20 # упадёт после первого батча
make run-publisher                      # дочитывает остаток
make run-consumer                       # дедуплицирует дубли
 
make outbox-pending                     # 0
make dup-count                          # 100 (уникальных событий)
make end-offsets                        # сумма по партициям > 100 (с дублями)

В качестве sanity-check полезно посмотреть make outbox-count (всегда ровно столько, сколько было заказов) и make dup-count (тоже ровно столько). Разница между «end-offsets» и «количество уникальных» — это и есть стоимость at-least-once гарантии outbox-publisher'а.

·Модуль 04

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Надёжность / Outbox-паттерн