PostgreSQL CookbookЗаписьTransactional outbox
0 / 63 (0%)

Transactional outbox

Покупатель оформил заказ. Brew должен сделать две вещи: записать заказ в свою базу и сообщить о нём наружу — чтобы ушло письмо, обновился аналитический дашборд, а в нашей вселенной — чтобы событие подхватил kafka-cookbook. Наивный бэкенд пишет в два места подряд: INSERT заказа в Postgres, потом publish события в брокер. И ровно между этими двумя строками живёт баг, который не видно на счастливом пути. Сервис упал после INSERT, но до publish — заказ есть, события нет, письмо не ушло, Kafka про заказ не знает. Или наоборот: событие опубликовали, а транзакция с заказом откатилась — теперь во внешнем мире живёт событие о заказе, которого в базе нет. Двух источников правды нельзя обновить атомарно: распределённая транзакция между БД и брокером дорога и хрупка.

Outbox убирает зазор простым ходом: не пишем в два места сразу.

Идея: событие — это ещё одна строка в той же базе

Вместо «заказ в БД + событие в брокер» мы пишем и заказ, и событие в одну и ту же базу, в одной транзакции. Событие ложится строкой в таблицу outbox. Атомарность даёт сам Postgres: либо коммитятся обе вставки, либо ни одной — никакого окна между ними не существует. Заказ без события или событие без заказа теперь невозможны в принципе.

А доставку наружу берёт на себя отдельный процесс — relay. Он читает неопубликованные строки outbox, отправляет их в брокер (или отдаёт CDC) и помечает доставленными. Доставка стала отдельной, повторяемой задачей: упал relay на середине — перезапустился и дочитал; «ровно один раз» он не обещает, но «хотя бы один раз» — да (consumer на другой стороне глушит дубли по outbox_id (таблица processed_outbox_ids)).

Таблицы orders и outbox здесь — базовые, из schema/brew.sql, байт-совместимые с kafka-cookbook. Это не случайность: ровно эта пара едет в capstone 10-05 и дальше — в CDC-эстафету к Kafka-курсу. Поэтому юнит не заводит своих лабораторных столов, а работает на настоящих таблицах Brew.

Запись: заказ и событие в одной транзакции

В коде это буквально одна транзакция на два INSERT:

go
tx, _ := pool.Begin(ctx)
defer tx.Rollback(ctx)
q := db.New(tx)               // те же sqlc-запросы, но в рамках транзакции
 
orderID, _ := q.InsertOrder(ctx, db.InsertOrderParams{...})
payload, _ := json.Marshal(map[string]any{"order_id": orderID, "amount": o.amount})
q.InsertOutbox(ctx, db.InsertOutboxParams{AggregateID: ..., Topic: "orders.created", Payload: payload})
 
tx.Commit(ctx)

db.New(tx) — это та же типизированная обёртка sqlc, но привязанная к транзакции (как в 05-01/03-03). Если упадёт второй INSERTdefer tx.Rollback снимет и первый. Событие физически не может пережить заказ и наоборот.

InsertOutbox оставляет published_at равным NULL — это и есть метка «ещё не доставлено».

Доставка: relay на FOR UPDATE SKIP LOCKED

relay вычитывает неопубликованные события тем же приёмом, что и очередь из 09-02:

sql
SELECT id, aggregate_id, topic, payload
FROM outbox
WHERE published_at IS NULL
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT $1;

WHERE published_at IS NULL идёт по частичному индексу outbox_unpublished_idx (он есть в схеме Brew) — relay не сканирует всю историю outbox, только хвост недоставленных. FOR UPDATE SKIP LOCKED позволяет поднять несколько relay-воркеров: они разберут события без дублей и без блокировок друг о друга. Доставив событие, relay в той же транзакции делает UPDATE outbox SET published_at = now() — «забрал и отметил» атомарно.

Зазор dual-write — и как outbox его закрывает

Картинка проблемы и решения рядом:

plaintext
Наивно: два места, между ними — зазор
  ① INSERT заказ → Postgres ──COMMIT──►  заказ записан
        ╳ креш ровно ЗДЕСЬ ──────────────►  событие потеряно
  ② publish событие → брокер ───────────►  (не доехало)
     итог: заказ без события; или при откате ① — событие без заказа
 
Outbox: одно место, одна транзакция
  BEGIN
    INSERT заказ   → orders                  ┐ обе вставки атомарны:
    INSERT событие → outbox (published_at=∅) ┘ либо обе, либо ни одной
  COMMIT
    relay: SELECT … WHERE published_at IS NULL  FOR UPDATE SKIP LOCKED
           → доставил наружу → UPDATE published_at = now()

Доставка теперь — отдельная повторяемая задача relay, а не вторая строка рядом с бизнес-записью. По гарантиям это даёт at-least-once:

ГарантияЧто значитГде в нашей схеме
at-most-onceмаксимум один раз, возможна потерянаивный publish без ретрая; NOTIFY (09-04)
at-least-onceхотя бы один раз, возможны дублиoutbox + relay с ретраем — наш случай
exactly-onceровно один разв доставке недостижимо; эмулируется at-least-once + идемпотентный consumer: дедуп по outbox_id (таблица processed_outbox_ids)

Это и есть водораздел между двумя способами отдать изменения наружу: 09-03 outbox — на уровне приложения (ты пишешь строку-событие и сам гоняешь relay), а 10-05 CDC — на уровне БД (Postgres отдаёт сам WAL в логическую репликацию, без таблицы-события и без своего relay; Debezium читает базовые таблицы напрямую). Это не два шага одного процесса, а две разные точки входа — выбираешь одну.

Что показывает наш код

query.sql — протагонист: InsertOrder/InsertOutbox (запись пары), ClaimUnpublished (relay-чтение под SKIP LOCKED), MarkPublished (отметка доставки). cmd/demo/main.go тонкий: кладёт три заказа с событиями, показывает, что откатившаяся транзакция не оставляет ни заказа, ни события, и прогоняет relay один раз.

Запуск

sh
docker compose up -d
make lecture L=09-writes-eventing-and-server-logic/09-03-transactional-outbox T=db-reset
make lecture L=09-writes-eventing-and-server-logic/09-03-transactional-outbox

T=run — режим по умолчанию, его можно не писать. Изнутри каталога юнита короче: make db-reset, затем make run.

plaintext
1) Кладём заказы — каждый ВМЕСТЕ с событием в одной транзакции:
   заказ  событие outbox  тема
   #1     #1              orders.created
   #2     #2              orders.created
   #3     #3              orders.created
   → событий ждёт доставки: 3
 
2) Транзакция «заказ записан, но проверка провалилась» → ROLLBACK:
   → заказов в таблице: 3, событий ждёт доставки: 3 (откат не оставил ничего)
 
3) relay вычитывает события (FOR UPDATE SKIP LOCKED) и доставляет их:
   событие  тема            aggregate  payload
   #1       orders.created  1          {"amount": "5.00", "order_id": 1}
   #2       orders.created  2          {"amount": "3.00", "order_id": 2}
   #3       orders.created  3          {"amount": "9.60", "order_id": 3}
   → событий ждёт доставки: 0

Три заказа легли вместе с тремя событиями. Четвёртая транзакция записала заказ и «провалила проверку» — после отката заказов по-прежнему 3 и событий 3: откат снял обе вставки разом. relay вычитал все три события, «доставил» их (payload вернулся уже нормализованным jsonb — ключи отсортированы) и пометил published — в очереди не осталось ничего.

Заборчик

  • at-least-once, не exactly-once. За гарантию доставки ты по-прежнему отвечаешь сам: если relay упал между publish и UPDATE published_at, после перезапуска событие уедет повторно. Поэтому на стороне consumer'а нужна идемпотентность — гасить дубли по outbox_id (таблица processed_outbox_ids).
  • Держи транзакцию записи короткой. Саму отправку в брокер делай в relay вне транзакции чтения — иначе медленная сеть до брокера будет держать блокировки и горизонт видимости (см. 05-02 и заборчик 09-02).
  • outbox — high-churn таблица, источник bloat. Она постоянно растёт и сразу же чистится. В проде её надо периодически подчищать (удалять давно опубликованные строки) и не забывать про autovacuum — но это уже эксплуатация, территория твоего DBA, мы её здесь не трогаем.
  • Развилка relay против CDC. relay можно написать руками (как тут — читаем outbox и публикуем), а можно вообще не писать своего relay: отдать базовые таблицы в логическую репликацию и снять изменения через CDC (Debezium). Второй путь — это capstone 10-05 (REPLICA IDENTITY FULL на CDC-источниках + CREATE PUBLICATION); там CDC работает на уровне БД и подаётся как альтернатива ручному outbox-relay, а не его продолжение. Debezium из kafka-cookbook читает наши таблицы без переписывания схемы.

Что забрать с собой

Transactional outbox решает проблему двух источников правды одним ходом: не пиши в БД и брокер по отдельности — пиши бизнес-факт и событие о нём в одну базу в одной транзакции, атомарность даёт Postgres. Доставку наружу выносим в отдельный relay, который читает неопубликованные строки outbox через FOR UPDATE SKIP LOCKED (несколько воркеров, без дублей) и помечает их доставленными. Это at-least-once: consumer обязан быть идемпотентным. Базовые orders/outbox тут не случайны — это та самая пара, что в 10-05 уезжает в CDC.

Дальше — другой способ узнать об изменении сразу, не опрашивая таблицу в цикле: БД сама толкает уведомление. В 09-04 триггер на INSERT шлёт pg_notify, а слушатель на стороне Go получает событие в реальном времени — с важными оговорками про транзакционность, размер и «доставку максимум один раз».

·Модуль 10

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Запись / Transactional outbox