Очередь задач на SKIP LOCKED
В Brew есть фоновая работа: разослать чеки, пересчитать остатки, отправить
пуш «ваш заказ готов». Всё это складывается в таблицу-очередь, а несколько
воркеров разбирают её параллельно — чем больше нагрузка, тем больше воркеров
поднимаем. И тут вылезает классический баг конкурентной очереди. Наивный воркер
делает «SELECT самой старой задачи в статусе queued, потом UPDATE её в
processing». Два воркера успевают прочитать одну и ту же строку до того, как
любой из них её застолбит — и оба берут задачу в работу. Клиент получает два
одинаковых пуша, чек уходит дважды.
Очевидная починка — заблокировать строку при чтении (FOR UPDATE, см. 05-03) —
ломает другое: воркеры выстраиваются в очередь друг за другом. Первый взял
строку под блокировку, второй на том же SELECT ... FOR UPDATE встаёт и ждёт,
пока первый закоммитит. Параллелизма нет — N воркеров работают как один. Нужен
способ сказать «возьми первую СВОБОДНУЮ строку, а занятые не трогай».
SKIP LOCKED: «пропусти заблокированное, не жди»
Эту фразу и произносит SKIP LOCKED. Запрос воркера выглядит так:
SELECT id FROM jobs_lab
WHERE status = 'queued'
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1;FOR UPDATE блокирует выбранную строку до конца транзакции — пока воркер с ней
работает, никто другой её не изменит. SKIP LOCKED меняет поведение при встрече
с уже заблокированной строкой: вместо того чтобы ждать её освобождения,
планировщик просто пропускает её и берёт следующую свободную. LIMIT 1
выдаёт по одной задаче за раз.
В сумме это даёт ровно то, что нужно очереди: два воркера никогда не возьмут одну
задачу (первый её залочил — второй пропустил), и при этом они не блокируют друг
друга (никто никого не ждёт — каждый мгновенно получает следующую свободную
строку). Транзакция воркера короткая: взял → «обработал» (пометил done) →
закоммитил, блокировка снялась.
Распределение — недетерминированное, и это правильно
Кому какая задача достанется — зависит от того, какой воркер в какой момент
дёрнул SELECT. Запусти демо дважды — раскладка «воркер → задачи» будет разной.
Это не баг, это и есть смысл SKIP LOCKED: воркеры сами балансируют
нагрузку. Кто освободился — берёт следующую; медленный возьмёт меньше, быстрый
больше, никто не простаивает в ожидании соседа. Поэтому в выводе мы печатаем не
раскладку (она «плавает»), а инварианты, которые от планировщика не зависят:
сколько задач забрано всего, сколько уникальных и сколько дублей. Их и проверяем.
Гонка против SKIP LOCKED, нарисованная
Вся разница — на одной картинке. Наивный воркер читает и пишет в два шага, и в зазор между ними другой воркер успевает прочитать ту же строку:
Наивно (SELECT, потом UPDATE) — окно гонки:
worker-1 ──SELECT job#1──┐
worker-2 ──SELECT job#1──┤ оба прочитали #1 ДО того,
UPDATE #1 ◄──────┘ как кто-то его застолбил
UPDATE #1 ◄─────── → job#1 обработан ДВАЖДЫ (два пуша клиенту)
FOR UPDATE SKIP LOCKED — «пропусти занятое, не жди»:
worker-1 ─SELECT … SKIP LOCKED─► #1 (залочил) ─обработал─ COMMIT
worker-2 ─SELECT … SKIP LOCKED─► #1 занят → пропустил → берёт #2
никто не ждёт соседа · никто не берёт чужую строкуFOR UPDATE столбит строку за воркером, SKIP LOCKED велит обходить чужие
залоченные строки стороной — и гонка, и взаимная блокировка исчезают разом.
Что показывает наш код
Это raw-pgx юнит: урок про конкурентность (несколько горутин-воркеров, у
каждого своя транзакция), а не про форму запроса. Каждый воркер крутит цикл,
пока очередь не опустеет, и в каждой итерации берёт ровно одну задачу под
SKIP LOCKED:
err = tx.QueryRow(ctx, `
SELECT id FROM jobs_lab
WHERE status = 'queued'
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1`).Scan(&id)
if errors.Is(err, pgx.ErrNoRows) {
return processed, nil // очередь пуста — воркер завершается
}Поймал pgx.ErrNoRows — свободных задач не осталось, воркер выходит. Иначе
помечает задачу done (с именем воркера) и коммитит — блокировка снимается, и
строка больше никому не покажется (WHERE status = 'queued' её уже не ловит).
Главный модуль поднимает четырёх таких воркеров горутинами на общем пуле,
дожидается всех и сверяет инварианты: забранные id должны составить ровно
{1..12} без повторов, а в базе — все done и ни одной queued.
Пул поднят с pg.WithMaxConns(numWorkers): воркерам нужны соединения
одновременно, иначе они встанут в очередь уже за коннектом, а не за задачей.
Запуск
docker compose up -d
make lecture L=09-writes-eventing-and-server-logic/09-02-skip-locked-job-queue T=db-reset
make lecture L=09-writes-eventing-and-server-logic/09-02-skip-locked-job-queueT=run — режим по умолчанию, его можно не писать. Изнутри каталога юнита короче:
make db-reset, затем make run.
1) В очередь jobs_lab поставлено задач: 12. Воркеров: 4.
Каждый воркер в цикле: BEGIN → SELECT ... FOR UPDATE SKIP LOCKED LIMIT 1 → обработать → COMMIT.
2) Свод по забранным задачам (инварианты, не зависят от планировщика):
забрано всего : 12
уникальных задач : 12
дублей (один job двум воркерам): 0
3) Состояние очереди в базе после прогона:
status='done' : 12
status='queued' : 012 задач, 4 воркера — забрано ровно 12, уникальных 12, дублей 0. Ни одна
задача не потеряна и ни одна не обработана дважды, при том что воркеры шли
параллельно. Раскладка «кто сколько взял» в выводе не показана намеренно: она
меняется от запуска к запуску.
Заборчик
- Держи транзакцию воркера короткой. Пока она открыта, строка залочена, а длинная транзакция ещё и держит горизонт видимости (см. 05-02) и копит bloat. Тяжёлую работу (поход во внешний API, отправку письма) не делай внутри транзакции — возьми задачу, быстро закоммить смену статуса и работай уже вне неё; иначе один зависший воркер тормозит уборку версий во всей базе.
SKIP LOCKEDжертвует строгим порядком. Пропуская занятые строки, воркеры разбирают задачи не строго поid, а «кто что успел». Если порядок обязателен (строго FIFO на каждый ключ) — это уже не проSKIP LOCKED, а про партиционирование очереди по ключу или один воркер на раздел.- Таблица-очередь — решение «до брокера». В Postgres она отлично живёт до
определённых масштабов (десятки–сотни тысяч задач в день — спокойно), но это не
Kafka и не RabbitMQ. Когда нагрузка переваливает за то, что тянет одна таблица с
постоянным
UPDATE/DELETE(а это уже вопрос к autovacuum и bloat — территория твоего DBA), пора смотреть на специализированный брокер. Где именно проходит эта граница и как очередь в БД отдаёт эстафету брокеру — в нашей вселенной этим занимается соседний курсkafka-cookbook.
Что забрать с собой
FOR UPDATE SKIP LOCKED превращает обычную таблицу в конкурентную очередь:
FOR UPDATE столбит строку за воркером, SKIP LOCKED велит пропускать чужие
залоченные строки, а не ждать их. Два воркера не возьмут одну задачу и не
заблокируют друг друга — каждый сразу получает следующую свободную. Распределение
по воркерам недетерминированное by design (они сами балансируют нагрузку),
поэтому проверять надо инварианты: ноль дублей, ноль потерь. Держи транзакцию
воркера короткой и помни, что таблица-очередь — отличное решение «до брокера», а
не вместо него.
Мы умеем раздавать готовую работу. Дальше — как эту работу вообще туда класть
надёжно: записать бизнес-факт (заказ) и событие о нём (для рассылки/CDC) так,
чтобы они либо появились вместе, либо никак. В 09-03 это сделает transactional
outbox — заказ и событие в одной транзакции, а relay вычитывает события тем
самым FOR UPDATE SKIP LOCKED.