PostgreSQL CookbookЗаписьОчередь задач на SKIP LOCKED
0 / 63 (0%)

Очередь задач на SKIP LOCKED

В Brew есть фоновая работа: разослать чеки, пересчитать остатки, отправить пуш «ваш заказ готов». Всё это складывается в таблицу-очередь, а несколько воркеров разбирают её параллельно — чем больше нагрузка, тем больше воркеров поднимаем. И тут вылезает классический баг конкурентной очереди. Наивный воркер делает «SELECT самой старой задачи в статусе queued, потом UPDATE её в processing». Два воркера успевают прочитать одну и ту же строку до того, как любой из них её застолбит — и оба берут задачу в работу. Клиент получает два одинаковых пуша, чек уходит дважды.

Очевидная починка — заблокировать строку при чтении (FOR UPDATE, см. 05-03) — ломает другое: воркеры выстраиваются в очередь друг за другом. Первый взял строку под блокировку, второй на том же SELECT ... FOR UPDATE встаёт и ждёт, пока первый закоммитит. Параллелизма нет — N воркеров работают как один. Нужен способ сказать «возьми первую СВОБОДНУЮ строку, а занятые не трогай».

SKIP LOCKED: «пропусти заблокированное, не жди»

Эту фразу и произносит SKIP LOCKED. Запрос воркера выглядит так:

sql
SELECT id FROM jobs_lab
WHERE status = 'queued'
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1;

FOR UPDATE блокирует выбранную строку до конца транзакции — пока воркер с ней работает, никто другой её не изменит. SKIP LOCKED меняет поведение при встрече с уже заблокированной строкой: вместо того чтобы ждать её освобождения, планировщик просто пропускает её и берёт следующую свободную. LIMIT 1 выдаёт по одной задаче за раз.

В сумме это даёт ровно то, что нужно очереди: два воркера никогда не возьмут одну задачу (первый её залочил — второй пропустил), и при этом они не блокируют друг друга (никто никого не ждёт — каждый мгновенно получает следующую свободную строку). Транзакция воркера короткая: взял → «обработал» (пометил done) → закоммитил, блокировка снялась.

Распределение — недетерминированное, и это правильно

Кому какая задача достанется — зависит от того, какой воркер в какой момент дёрнул SELECT. Запусти демо дважды — раскладка «воркер → задачи» будет разной. Это не баг, это и есть смысл SKIP LOCKED: воркеры сами балансируют нагрузку. Кто освободился — берёт следующую; медленный возьмёт меньше, быстрый больше, никто не простаивает в ожидании соседа. Поэтому в выводе мы печатаем не раскладку (она «плавает»), а инварианты, которые от планировщика не зависят: сколько задач забрано всего, сколько уникальных и сколько дублей. Их и проверяем.

Гонка против SKIP LOCKED, нарисованная

Вся разница — на одной картинке. Наивный воркер читает и пишет в два шага, и в зазор между ними другой воркер успевает прочитать ту же строку:

plaintext
Наивно (SELECT, потом UPDATE) — окно гонки:
  worker-1  ──SELECT job#1──┐
  worker-2  ──SELECT job#1──┤  оба прочитали #1 ДО того,
            UPDATE #1 ◄──────┘  как кто-то его застолбил
            UPDATE #1 ◄───────  → job#1 обработан ДВАЖДЫ (два пуша клиенту)
 
FOR UPDATE SKIP LOCKED — «пропусти занятое, не жди»:
  worker-1  ─SELECT … SKIP LOCKED─►  #1 (залочил) ─обработал─ COMMIT
  worker-2  ─SELECT … SKIP LOCKED─►  #1 занят → пропустил → берёт #2
            никто не ждёт соседа · никто не берёт чужую строку

FOR UPDATE столбит строку за воркером, SKIP LOCKED велит обходить чужие залоченные строки стороной — и гонка, и взаимная блокировка исчезают разом.

Что показывает наш код

Это raw-pgx юнит: урок про конкурентность (несколько горутин-воркеров, у каждого своя транзакция), а не про форму запроса. Каждый воркер крутит цикл, пока очередь не опустеет, и в каждой итерации берёт ровно одну задачу под SKIP LOCKED:

go
err = tx.QueryRow(ctx, `
    SELECT id FROM jobs_lab
    WHERE status = 'queued'
    ORDER BY id
    FOR UPDATE SKIP LOCKED
    LIMIT 1`).Scan(&id)
if errors.Is(err, pgx.ErrNoRows) {
    return processed, nil // очередь пуста — воркер завершается
}

Поймал pgx.ErrNoRows — свободных задач не осталось, воркер выходит. Иначе помечает задачу done (с именем воркера) и коммитит — блокировка снимается, и строка больше никому не покажется (WHERE status = 'queued' её уже не ловит). Главный модуль поднимает четырёх таких воркеров горутинами на общем пуле, дожидается всех и сверяет инварианты: забранные id должны составить ровно {1..12} без повторов, а в базе — все done и ни одной queued.

Пул поднят с pg.WithMaxConns(numWorkers): воркерам нужны соединения одновременно, иначе они встанут в очередь уже за коннектом, а не за задачей.

Запуск

sh
docker compose up -d
make lecture L=09-writes-eventing-and-server-logic/09-02-skip-locked-job-queue T=db-reset
make lecture L=09-writes-eventing-and-server-logic/09-02-skip-locked-job-queue

T=run — режим по умолчанию, его можно не писать. Изнутри каталога юнита короче: make db-reset, затем make run.

plaintext
1) В очередь jobs_lab поставлено задач: 12. Воркеров: 4.
   Каждый воркер в цикле: BEGIN → SELECT ... FOR UPDATE SKIP LOCKED LIMIT 1 → обработать → COMMIT.
 
2) Свод по забранным задачам (инварианты, не зависят от планировщика):
   забрано всего      : 12
   уникальных задач   : 12
   дублей (один job двум воркерам): 0
 
3) Состояние очереди в базе после прогона:
   status='done'   : 12
   status='queued' : 0

12 задач, 4 воркера — забрано ровно 12, уникальных 12, дублей 0. Ни одна задача не потеряна и ни одна не обработана дважды, при том что воркеры шли параллельно. Раскладка «кто сколько взял» в выводе не показана намеренно: она меняется от запуска к запуску.

Заборчик

  • Держи транзакцию воркера короткой. Пока она открыта, строка залочена, а длинная транзакция ещё и держит горизонт видимости (см. 05-02) и копит bloat. Тяжёлую работу (поход во внешний API, отправку письма) не делай внутри транзакции — возьми задачу, быстро закоммить смену статуса и работай уже вне неё; иначе один зависший воркер тормозит уборку версий во всей базе.
  • SKIP LOCKED жертвует строгим порядком. Пропуская занятые строки, воркеры разбирают задачи не строго по id, а «кто что успел». Если порядок обязателен (строго FIFO на каждый ключ) — это уже не про SKIP LOCKED, а про партиционирование очереди по ключу или один воркер на раздел.
  • Таблица-очередь — решение «до брокера». В Postgres она отлично живёт до определённых масштабов (десятки–сотни тысяч задач в день — спокойно), но это не Kafka и не RabbitMQ. Когда нагрузка переваливает за то, что тянет одна таблица с постоянным UPDATE/DELETE (а это уже вопрос к autovacuum и bloat — территория твоего DBA), пора смотреть на специализированный брокер. Где именно проходит эта граница и как очередь в БД отдаёт эстафету брокеру — в нашей вселенной этим занимается соседний курс kafka-cookbook.

Что забрать с собой

FOR UPDATE SKIP LOCKED превращает обычную таблицу в конкурентную очередь: FOR UPDATE столбит строку за воркером, SKIP LOCKED велит пропускать чужие залоченные строки, а не ждать их. Два воркера не возьмут одну задачу и не заблокируют друг друга — каждый сразу получает следующую свободную. Распределение по воркерам недетерминированное by design (они сами балансируют нагрузку), поэтому проверять надо инварианты: ноль дублей, ноль потерь. Держи транзакцию воркера короткой и помни, что таблица-очередь — отличное решение «до брокера», а не вместо него.

Мы умеем раздавать готовую работу. Дальше — как эту работу вообще туда класть надёжно: записать бизнес-факт (заказ) и событие о нём (для рассылки/CDC) так, чтобы они либо появились вместе, либо никак. В 09-03 это сделает transactional outbox — заказ и событие в одной транзакции, а relay вычитывает события тем самым FOR UPDATE SKIP LOCKED.

·Модуль 10

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Запись / Очередь задач на SKIP LOCKED