MERGE и COPY
Каждую ночь поставщик присылает в Brew файл с остатками: строка на напиток —
SKU;количество. С ним надо сделать две вещи. Сначала — быстро влить его в
базу: тысячи строк, и заливать их по одной INSERT'ом — это тысячи round-trip,
ночное окно не резиновое. Потом — свести файл с нашим складом: чего пришло
больше — обновить, чего у нас ещё нет — завести, а что поставщик прислал с нулём
(снял с производства) — убрать. Раньше бэкенд тащил это циклом: на каждую строку
файла — SELECT, есть ли такая позиция, потом INSERT или UPDATE, а удаление
вообще отдельным проходом. Длинно, и между «прочитал» и «записал» зияет окно,
где данные могли уехать.
Postgres закрывает обе задачи двумя инструментами: COPY для заливки и MERGE
для сверки.
COPY FROM STDIN: заливка минуя «запрос на строку»
COPY — это не INSERT, а отдельный потоковый протокол загрузки. Вместо
«разобрать запрос → выполнить → ответить» на каждую строку драйвер открывает
один поток и гонит в него весь батч; сервер пишет строки пачкой. На больших
объёмах это в разы быстрее цепочки INSERT'ов: нет разбора запроса на строку,
нет round-trip на строку.
COPY FROM STDIN — это именно протокол, поэтому в нашем правиле «SQL руками →
sqlc» для него нет места: sqlc генерирует функции-запросы, а потокового COPY
среди них нет. В pgx он живёт отдельным методом — pool.CopyFrom, который
принимает имя таблицы, список колонок и источник строк. Поэтому весь юнит — на
raw-pgx (escape-hatch до sqlc), как 00-04 и 05-05.
MERGE: INSERT, UPDATE и DELETE в одной команде
MERGE берёт источник (нашу залитую поставку s) и цель (склад t),
сопоставляет их по условию ON и для каждой строки выбирает ветку:
WHEN MATCHED AND s.on_hand = 0 THEN DELETE— позиция есть у нас, но пришла с нулём → снять с продажи;WHEN MATCHED THEN UPDATE— позиция есть, пришло новое количество → обновить;WHEN NOT MATCHED THEN INSERT— позиции у нас нет → завести.
Ветки проверяются сверху вниз, срабатывает первая подошедшая — поэтому условие
AND s.on_hand = 0 стоит раньше общего MATCHED. Весь тот цикл «прочитал —
решил — записал» из приложения сворачивается в одну декларативную команду:
ты описываешь, каким должно стать состояние при каждом исходе, а перебор строк
и выбор ветки делает база.
merge_action() и RETURNING: отчёт о том, что случилось
После MERGE хочется знать, что именно он сделал с каждой строкой. Для этого в
RETURNING есть функция merge_action() — она возвращает 'INSERT',
'UPDATE' или 'DELETE' для строки, которую только что обработали. В ветке
DELETE колонки цели t.* — это значения удалённой строки (какими они были
до удаления). Так одна команда и меняет данные, и сразу отчитывается, что с ними
стало, без второго запроса.
Порядок строк в RETURNING у MERGE не определён, поэтому в демо мы собираем
их в срез и сортируем по SKU в Go — иначе вывод «плавал» бы между прогонами.
MERGE или ON CONFLICT — что когда
MERGE и INSERT ... ON CONFLICT (03-04) оба умеют «вставить или обновить», но
решают разные задачи — короткая развилка, чтобы не путать:
| Вопрос | MERGE | INSERT ... ON CONFLICT (03-04) |
|---|---|---|
| Что делает | сводит источник с целью, ветки INSERT/UPDATE/DELETE | вставляет, при конфликте по ключу — обновляет или пропускает |
| Сильная сторона | пакетная сверка двух наборов одним проходом | атомарный upsert строки по уникальному ключу |
| Под конкуренцией | не race-safe: два MERGE → оба NOT MATCHED → оба INSERT → дубль или ошибка | атомарен по уникальному индексу |
Ветка DELETE | есть (WHEN MATCHED ... THEN DELETE) | нет |
| Когда брать | ночная сверка стейджинга со складом (наш случай) | конкурентный upsert по ключу |
Память простая: MERGE — пакетная сверка двух наборов, ON CONFLICT —
конкурентный upsert по ключу.
Что показывает наш код
Это raw-pgx юнит, центр — две операции в main.go. Сначала COPY FROM STDIN
одним вызовом заливает поставку в стейджинг-таблицу:
copied, err := pool.CopyFrom(ctx,
pgx.Identifier{"supplier_feed_lab"},
[]string{"drink_sku", "on_hand"},
pgx.CopyFromRows(feed),
)Затем один MERGE сверяет стейджинг со складом и через merge_action()
докладывает по каждой строке:
MERGE INTO stock_lab t
USING supplier_feed_lab s ON t.drink_sku = s.drink_sku
WHEN MATCHED AND s.on_hand = 0 THEN DELETE
WHEN MATCHED THEN UPDATE SET on_hand = s.on_hand
WHEN NOT MATCHED THEN INSERT (drink_sku, on_hand) VALUES (s.drink_sku, s.on_hand)
RETURNING merge_action() AS action, t.drink_sku, t.on_hand;Поставка нарочно подобрана так, чтобы сработали все три ветки: ESP-01/LAT-01
есть у нас (UPDATE), CAP-01 пришёл с нулём (DELETE), CLD-01/TEA-01 на
складе ещё не заведены (INSERT).
Запуск
docker compose up -d
make lecture L=09-writes-eventing-and-server-logic/09-01-merge-and-copy T=db-reset
make lecture L=09-writes-eventing-and-server-logic/09-01-merge-and-copyT=run — режим по умолчанию, его можно не писать. Изнутри каталога юнита короче:
make db-reset, затем make run.
1) COPY FROM STDIN: загружено строк поставки = 5
Наш склад ДО сверки (stock_lab):
SKU остаток
CAP-01 40
ESP-01 50
LAT-01 30
2) MERGE поставки в склад — один проход, три исхода:
SKU merge_action() остаток
CAP-01 DELETE 40
CLD-01 INSERT 25
ESP-01 UPDATE 60
LAT-01 UPDATE 35
TEA-01 INSERT 15
3) Наш склад ПОСЛЕ сверки (stock_lab):
SKU остаток
CLD-01 25
ESP-01 60
LAT-01 35
TEA-01 15CAP-01 пришёл с нулём — merge_action() показал DELETE, и в итоговом складе
его уже нет (остаток 40 в строке DELETE — это его значение до удаления).
CLD-01 и TEA-01 завелись (INSERT), ESP-01/LAT-01 обновились до новых
количеств — и всё это одной командой.
Заборчик
MERGEне race-safe для upsert'а. Под конкуренцией две параллельные командыMERGEмогут обе не найти строку (NOT MATCHED), обе пойти в веткуINSERT— и одна упадёт на нарушении уникальности, а если ключа нет — появятся дубли.MERGEне делает за тебя то, что делаетINSERT ... ON CONFLICT(03-04): тот атомарно ловит конфликт по уникальному индексу и сам решает «вставить или обновить». Поэтому для конкурентного upsert'а по ключу бериON CONFLICT, аMERGE— для пакетной сверки двух наборов (наш типичный случай: ночная синхронизация стейджинга со складом), когда ты сам контролируешь, что параллельных мержей в ту же таблицу нет.COPYгрузит «как есть». Он быстр, но почти не проверяет данные на лету. Поэтому промышленный паттерн — литьCOPYв стейджинг-таблицу (как здесьsupplier_feed_lab), а уже оттуда переносить в боевую с проверками и сверкой черезMERGE/INSERT ... SELECT. ЛитьCOPYнапрямую в таблицу с кучей ограничений и триггеров — терять и в скорости, и в предсказуемости.- Тонкая настройка
COPY(форматы,FREEZE, отключение индексов на время массивной загрузки) — это уже работа на стыке с твоим DBA, мы её тут не трогаем.
Что забрать с собой
COPY FROM STDIN (pool.CopyFrom в pgx) заливает батч одним потоком, минуя
«разбор запроса и round-trip на строку», — это инструмент массовой загрузки, и
его нет в sqlc, потому что это протокол, а не запрос. MERGE сводит источник с
целью и в одной команде делает INSERT/UPDATE/DELETE, а merge_action() в
RETURNING докладывает, что случилось с каждой строкой. Но MERGE — это про
пакетную сверку, не про конкурентный upsert: для «вставить или обновить» под
нагрузкой остаётся INSERT ... ON CONFLICT из 03-04.
Дальше — когда писать надо не пачкой раз в ночь, а потоком от множества воркеров
сразу, и нельзя обработать одну задачу дважды. В 09-02 очередь задач на
FOR UPDATE SKIP LOCKED раздаёт работу N воркерам без дублей и без блокировок
друг о друга.