Transactional outbox
Покупатель оформил заказ. Brew должен сделать две вещи: записать заказ в свою
базу и сообщить о нём наружу — чтобы ушло письмо, обновился аналитический
дашборд, а в нашей вселенной — чтобы событие подхватил kafka-cookbook. Наивный
бэкенд пишет в два места подряд: INSERT заказа в Postgres, потом publish
события в брокер. И ровно между этими двумя строками живёт баг, который не видно
на счастливом пути. Сервис упал после INSERT, но до publish — заказ есть,
события нет, письмо не ушло, Kafka про заказ не знает. Или наоборот: событие
опубликовали, а транзакция с заказом откатилась — теперь во внешнем мире живёт
событие о заказе, которого в базе нет. Двух источников правды нельзя обновить
атомарно: распределённая транзакция между БД и брокером дорога и хрупка.
Outbox убирает зазор простым ходом: не пишем в два места сразу.
Идея: событие — это ещё одна строка в той же базе
Вместо «заказ в БД + событие в брокер» мы пишем и заказ, и событие в одну и ту
же базу, в одной транзакции. Событие ложится строкой в таблицу outbox.
Атомарность даёт сам Postgres: либо коммитятся обе вставки, либо ни одной —
никакого окна между ними не существует. Заказ без события или событие без заказа
теперь невозможны в принципе.
А доставку наружу берёт на себя отдельный процесс — relay. Он читает
неопубликованные строки outbox, отправляет их в брокер (или отдаёт CDC) и
помечает доставленными. Доставка стала отдельной, повторяемой задачей: упал
relay на середине — перезапустился и дочитал; «ровно один раз» он не обещает, но
«хотя бы один раз» — да (consumer на другой стороне глушит дубли по outbox_id
(таблица processed_outbox_ids)).
Таблицы orders и outbox здесь — базовые, из schema/brew.sql,
байт-совместимые с kafka-cookbook. Это не случайность: ровно эта пара едет в
capstone 10-05 и дальше — в CDC-эстафету к Kafka-курсу. Поэтому юнит не заводит
своих лабораторных столов, а работает на настоящих таблицах Brew.
Запись: заказ и событие в одной транзакции
В коде это буквально одна транзакция на два INSERT:
tx, _ := pool.Begin(ctx)
defer tx.Rollback(ctx)
q := db.New(tx) // те же sqlc-запросы, но в рамках транзакции
orderID, _ := q.InsertOrder(ctx, db.InsertOrderParams{...})
payload, _ := json.Marshal(map[string]any{"order_id": orderID, "amount": o.amount})
q.InsertOutbox(ctx, db.InsertOutboxParams{AggregateID: ..., Topic: "orders.created", Payload: payload})
tx.Commit(ctx)db.New(tx) — это та же типизированная обёртка sqlc, но привязанная к
транзакции (как в 05-01/03-03). Если упадёт второй INSERT — defer tx.Rollback снимет и первый. Событие физически не может пережить заказ и
наоборот.
InsertOutbox оставляет published_at равным NULL — это и есть метка «ещё не
доставлено».
Доставка: relay на FOR UPDATE SKIP LOCKED
relay вычитывает неопубликованные события тем же приёмом, что и очередь из 09-02:
SELECT id, aggregate_id, topic, payload
FROM outbox
WHERE published_at IS NULL
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT $1;WHERE published_at IS NULL идёт по частичному индексу outbox_unpublished_idx
(он есть в схеме Brew) — relay не сканирует всю историю outbox, только хвост
недоставленных. FOR UPDATE SKIP LOCKED позволяет поднять несколько
relay-воркеров: они разберут события без дублей и без блокировок друг о друга.
Доставив событие, relay в той же транзакции делает UPDATE outbox SET published_at = now() — «забрал и отметил» атомарно.
Зазор dual-write — и как outbox его закрывает
Картинка проблемы и решения рядом:
Наивно: два места, между ними — зазор
① INSERT заказ → Postgres ──COMMIT──► заказ записан
╳ креш ровно ЗДЕСЬ ──────────────► событие потеряно
② publish событие → брокер ───────────► (не доехало)
итог: заказ без события; или при откате ① — событие без заказа
Outbox: одно место, одна транзакция
BEGIN
INSERT заказ → orders ┐ обе вставки атомарны:
INSERT событие → outbox (published_at=∅) ┘ либо обе, либо ни одной
COMMIT
relay: SELECT … WHERE published_at IS NULL FOR UPDATE SKIP LOCKED
→ доставил наружу → UPDATE published_at = now()Доставка теперь — отдельная повторяемая задача relay, а не вторая строка рядом с бизнес-записью. По гарантиям это даёт at-least-once:
| Гарантия | Что значит | Где в нашей схеме |
|---|---|---|
| at-most-once | максимум один раз, возможна потеря | наивный publish без ретрая; NOTIFY (09-04) |
| at-least-once | хотя бы один раз, возможны дубли | outbox + relay с ретраем — наш случай |
| exactly-once | ровно один раз | в доставке недостижимо; эмулируется at-least-once + идемпотентный consumer: дедуп по outbox_id (таблица processed_outbox_ids) |
Это и есть водораздел между двумя способами отдать изменения наружу: 09-03 outbox — на уровне приложения (ты пишешь строку-событие и сам гоняешь relay), а 10-05 CDC — на уровне БД (Postgres отдаёт сам WAL в логическую репликацию, без таблицы-события и без своего relay; Debezium читает базовые таблицы напрямую). Это не два шага одного процесса, а две разные точки входа — выбираешь одну.
Что показывает наш код
query.sql — протагонист: InsertOrder/InsertOutbox (запись пары),
ClaimUnpublished (relay-чтение под SKIP LOCKED), MarkPublished (отметка
доставки). cmd/demo/main.go тонкий: кладёт три заказа с событиями, показывает,
что откатившаяся транзакция не оставляет ни заказа, ни события, и прогоняет
relay один раз.
Запуск
docker compose up -d
make lecture L=09-writes-eventing-and-server-logic/09-03-transactional-outbox T=db-reset
make lecture L=09-writes-eventing-and-server-logic/09-03-transactional-outboxT=run — режим по умолчанию, его можно не писать. Изнутри каталога юнита короче:
make db-reset, затем make run.
1) Кладём заказы — каждый ВМЕСТЕ с событием в одной транзакции:
заказ событие outbox тема
#1 #1 orders.created
#2 #2 orders.created
#3 #3 orders.created
→ событий ждёт доставки: 3
2) Транзакция «заказ записан, но проверка провалилась» → ROLLBACK:
→ заказов в таблице: 3, событий ждёт доставки: 3 (откат не оставил ничего)
3) relay вычитывает события (FOR UPDATE SKIP LOCKED) и доставляет их:
событие тема aggregate payload
#1 orders.created 1 {"amount": "5.00", "order_id": 1}
#2 orders.created 2 {"amount": "3.00", "order_id": 2}
#3 orders.created 3 {"amount": "9.60", "order_id": 3}
→ событий ждёт доставки: 0Три заказа легли вместе с тремя событиями. Четвёртая транзакция записала заказ и
«провалила проверку» — после отката заказов по-прежнему 3 и событий 3: откат
снял обе вставки разом. relay вычитал все три события, «доставил» их (payload
вернулся уже нормализованным jsonb — ключи отсортированы) и пометил
published — в очереди не осталось ничего.
Заборчик
- at-least-once, не exactly-once. За гарантию доставки ты по-прежнему
отвечаешь сам: если relay упал между
publishиUPDATE published_at, после перезапуска событие уедет повторно. Поэтому на стороне consumer'а нужна идемпотентность — гасить дубли поoutbox_id(таблицаprocessed_outbox_ids). - Держи транзакцию записи короткой. Саму отправку в брокер делай в relay вне транзакции чтения — иначе медленная сеть до брокера будет держать блокировки и горизонт видимости (см. 05-02 и заборчик 09-02).
outbox— high-churn таблица, источник bloat. Она постоянно растёт и сразу же чистится. В проде её надо периодически подчищать (удалять давно опубликованные строки) и не забывать про autovacuum — но это уже эксплуатация, территория твоего DBA, мы её здесь не трогаем.- Развилка relay против CDC. relay можно написать руками (как тут — читаем
outboxи публикуем), а можно вообще не писать своего relay: отдать базовые таблицы в логическую репликацию и снять изменения через CDC (Debezium). Второй путь — это capstone 10-05 (REPLICA IDENTITY FULLна CDC-источниках +CREATE PUBLICATION); там CDC работает на уровне БД и подаётся как альтернатива ручному outbox-relay, а не его продолжение. Debezium изkafka-cookbookчитает наши таблицы без переписывания схемы.
Что забрать с собой
Transactional outbox решает проблему двух источников правды одним ходом: не пиши
в БД и брокер по отдельности — пиши бизнес-факт и событие о нём в одну базу в
одной транзакции, атомарность даёт Postgres. Доставку наружу выносим в
отдельный relay, который читает неопубликованные строки outbox через
FOR UPDATE SKIP LOCKED (несколько воркеров, без дублей) и помечает их
доставленными. Это at-least-once: consumer обязан быть идемпотентным. Базовые
orders/outbox тут не случайны — это та самая пара, что в 10-05 уезжает в CDC.
Дальше — другой способ узнать об изменении сразу, не опрашивая таблицу в цикле:
БД сама толкает уведомление. В 09-04 триггер на INSERT шлёт pg_notify, а
слушатель на стороне Go получает событие в реальном времени — с важными
оговорками про транзакционность, размер и «доставку максимум один раз».