0 / 63 (0%)

MERGE и COPY

Каждую ночь поставщик присылает в Brew файл с остатками: строка на напиток — SKU;количество. С ним надо сделать две вещи. Сначала — быстро влить его в базу: тысячи строк, и заливать их по одной INSERT'ом — это тысячи round-trip, ночное окно не резиновое. Потом — свести файл с нашим складом: чего пришло больше — обновить, чего у нас ещё нет — завести, а что поставщик прислал с нулём (снял с производства) — убрать. Раньше бэкенд тащил это циклом: на каждую строку файла — SELECT, есть ли такая позиция, потом INSERT или UPDATE, а удаление вообще отдельным проходом. Длинно, и между «прочитал» и «записал» зияет окно, где данные могли уехать.

Postgres закрывает обе задачи двумя инструментами: COPY для заливки и MERGE для сверки.

COPY FROM STDIN: заливка минуя «запрос на строку»

COPY — это не INSERT, а отдельный потоковый протокол загрузки. Вместо «разобрать запрос → выполнить → ответить» на каждую строку драйвер открывает один поток и гонит в него весь батч; сервер пишет строки пачкой. На больших объёмах это в разы быстрее цепочки INSERT'ов: нет разбора запроса на строку, нет round-trip на строку.

COPY FROM STDIN — это именно протокол, поэтому в нашем правиле «SQL руками → sqlc» для него нет места: sqlc генерирует функции-запросы, а потокового COPY среди них нет. В pgx он живёт отдельным методом — pool.CopyFrom, который принимает имя таблицы, список колонок и источник строк. Поэтому весь юнит — на raw-pgx (escape-hatch до sqlc), как 00-04 и 05-05.

MERGE: INSERT, UPDATE и DELETE в одной команде

MERGE берёт источник (нашу залитую поставку s) и цель (склад t), сопоставляет их по условию ON и для каждой строки выбирает ветку:

  • WHEN MATCHED AND s.on_hand = 0 THEN DELETE — позиция есть у нас, но пришла с нулём → снять с продажи;
  • WHEN MATCHED THEN UPDATE — позиция есть, пришло новое количество → обновить;
  • WHEN NOT MATCHED THEN INSERT — позиции у нас нет → завести.

Ветки проверяются сверху вниз, срабатывает первая подошедшая — поэтому условие AND s.on_hand = 0 стоит раньше общего MATCHED. Весь тот цикл «прочитал — решил — записал» из приложения сворачивается в одну декларативную команду: ты описываешь, каким должно стать состояние при каждом исходе, а перебор строк и выбор ветки делает база.

merge_action() и RETURNING: отчёт о том, что случилось

После MERGE хочется знать, что именно он сделал с каждой строкой. Для этого в RETURNING есть функция merge_action() — она возвращает 'INSERT', 'UPDATE' или 'DELETE' для строки, которую только что обработали. В ветке DELETE колонки цели t.* — это значения удалённой строки (какими они были до удаления). Так одна команда и меняет данные, и сразу отчитывается, что с ними стало, без второго запроса.

Порядок строк в RETURNING у MERGE не определён, поэтому в демо мы собираем их в срез и сортируем по SKU в Go — иначе вывод «плавал» бы между прогонами.

MERGE или ON CONFLICT — что когда

MERGE и INSERT ... ON CONFLICT (03-04) оба умеют «вставить или обновить», но решают разные задачи — короткая развилка, чтобы не путать:

ВопросMERGEINSERT ... ON CONFLICT (03-04)
Что делаетсводит источник с целью, ветки INSERT/UPDATE/DELETEвставляет, при конфликте по ключу — обновляет или пропускает
Сильная сторонапакетная сверка двух наборов одним проходоматомарный upsert строки по уникальному ключу
Под конкуренциейне race-safe: два MERGE → оба NOT MATCHED → оба INSERT → дубль или ошибкаатомарен по уникальному индексу
Ветка DELETEесть (WHEN MATCHED ... THEN DELETE)нет
Когда братьночная сверка стейджинга со складом (наш случай)конкурентный upsert по ключу

Память простая: MERGE — пакетная сверка двух наборов, ON CONFLICT — конкурентный upsert по ключу.

Что показывает наш код

Это raw-pgx юнит, центр — две операции в main.go. Сначала COPY FROM STDIN одним вызовом заливает поставку в стейджинг-таблицу:

go
copied, err := pool.CopyFrom(ctx,
    pgx.Identifier{"supplier_feed_lab"},
    []string{"drink_sku", "on_hand"},
    pgx.CopyFromRows(feed),
)

Затем один MERGE сверяет стейджинг со складом и через merge_action() докладывает по каждой строке:

sql
MERGE INTO stock_lab t
USING supplier_feed_lab s ON t.drink_sku = s.drink_sku
WHEN MATCHED AND s.on_hand = 0 THEN DELETE
WHEN MATCHED THEN UPDATE SET on_hand = s.on_hand
WHEN NOT MATCHED THEN INSERT (drink_sku, on_hand) VALUES (s.drink_sku, s.on_hand)
RETURNING merge_action() AS action, t.drink_sku, t.on_hand;

Поставка нарочно подобрана так, чтобы сработали все три ветки: ESP-01/LAT-01 есть у нас (UPDATE), CAP-01 пришёл с нулём (DELETE), CLD-01/TEA-01 на складе ещё не заведены (INSERT).

Запуск

sh
docker compose up -d
make lecture L=09-writes-eventing-and-server-logic/09-01-merge-and-copy T=db-reset
make lecture L=09-writes-eventing-and-server-logic/09-01-merge-and-copy

T=run — режим по умолчанию, его можно не писать. Изнутри каталога юнита короче: make db-reset, затем make run.

plaintext
1) COPY FROM STDIN: загружено строк поставки = 5
   Наш склад ДО сверки (stock_lab):
   SKU     остаток
   CAP-01  40
   ESP-01  50
   LAT-01  30
 
2) MERGE поставки в склад — один проход, три исхода:
SKU     merge_action()  остаток
CAP-01  DELETE          40
CLD-01  INSERT          25
ESP-01  UPDATE          60
LAT-01  UPDATE          35
TEA-01  INSERT          15
 
3) Наш склад ПОСЛЕ сверки (stock_lab):
   SKU     остаток
   CLD-01  25
   ESP-01  60
   LAT-01  35
   TEA-01  15

CAP-01 пришёл с нулём — merge_action() показал DELETE, и в итоговом складе его уже нет (остаток 40 в строке DELETE — это его значение до удаления). CLD-01 и TEA-01 завелись (INSERT), ESP-01/LAT-01 обновились до новых количеств — и всё это одной командой.

Заборчик

  • MERGE не race-safe для upsert'а. Под конкуренцией две параллельные команды MERGE могут обе не найти строку (NOT MATCHED), обе пойти в ветку INSERT — и одна упадёт на нарушении уникальности, а если ключа нет — появятся дубли. MERGE не делает за тебя то, что делает INSERT ... ON CONFLICT (03-04): тот атомарно ловит конфликт по уникальному индексу и сам решает «вставить или обновить». Поэтому для конкурентного upsert'а по ключу бери ON CONFLICT, а MERGE — для пакетной сверки двух наборов (наш типичный случай: ночная синхронизация стейджинга со складом), когда ты сам контролируешь, что параллельных мержей в ту же таблицу нет.
  • COPY грузит «как есть». Он быстр, но почти не проверяет данные на лету. Поэтому промышленный паттерн — лить COPY в стейджинг-таблицу (как здесь supplier_feed_lab), а уже оттуда переносить в боевую с проверками и сверкой через MERGE/INSERT ... SELECT. Лить COPY напрямую в таблицу с кучей ограничений и триггеров — терять и в скорости, и в предсказуемости.
  • Тонкая настройка COPY (форматы, FREEZE, отключение индексов на время массивной загрузки) — это уже работа на стыке с твоим DBA, мы её тут не трогаем.

Что забрать с собой

COPY FROM STDIN (pool.CopyFrom в pgx) заливает батч одним потоком, минуя «разбор запроса и round-trip на строку», — это инструмент массовой загрузки, и его нет в sqlc, потому что это протокол, а не запрос. MERGE сводит источник с целью и в одной команде делает INSERT/UPDATE/DELETE, а merge_action() в RETURNING докладывает, что случилось с каждой строкой. Но MERGE — это про пакетную сверку, не про конкурентный upsert: для «вставить или обновить» под нагрузкой остаётся INSERT ... ON CONFLICT из 03-04.

Дальше — когда писать надо не пачкой раз в ночь, а потоком от множества воркеров сразу, и нельзя обработать одну задачу дважды. В 09-02 очередь задач на FOR UPDATE SKIP LOCKED раздаёт работу N воркерам без дублей и без блокировок друг о друга.

·Модуль 10

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Запись / MERGE и COPY