Retry и DLQ deep dive
Это про деньги Brew. payment-service списывает оплату за каждый заказ и публикует PaymentReceived в brew.payments.v1. Само списание идёт через внешний платёжный провайдер, и провайдер периодически флапает: то таймаут, то 503 на пару минут, то rate-limit. Пока он жив — платёж проходит с первой попытки. Пока он лежит — попытка списать падает с transient-ошибкой, и эту оплату нельзя ни потерять, ни заретраить «в лоб» так, чтобы списать дважды.
Мы это уже один раз проходили дорого. В постмортеме acks=0 Brew теряла платежи, потому что продьюсер не ждал подтверждения от брокера. Тогда лечилось acks=all. Здесь другой класс отказа: брокер-то подтверждает запись, проблема снаружи — у платёжного провайдера. Запись в Kafka цела, а вот само списание не состоялось, и его надо повторить позже, не блокируя остальной поток платежей.
В лекции Обработка ошибок уже разбирали error handling на стороне consumer'а: in-place retry для transient-ошибок и DLQ для всего, что не получилось. Это работает на масштабе одного consumer-loop'а. Тут мы пойдём дальше. Появится несколько retry-топиков с задержкой, у DLQ — отдельная судьба, и отдельный CLI для повторной отправки. Транзиентный флап провайдера уходит по лесенке retry-топиков, а битый платёж (poison-pill) — сразу в DLQ.
Зачем вообще усложнять? Сейчас разберём.
Почему in-place retry перестаёт хватать
Главная проблема in-place retry — он сидит в poll-loop'е. Пока payment-service пять раз пытается достучаться до упавшего платёжного провайдера, consumer не дёргает poll. В franz-go v1.21.0 heartbeat-loop работает независимо от обработки, поэтому длинный backoff сам по себе из группы не выкинет — координатор считает клиента живым, пока его сетевой heartbeat в порядке. Стреляет всё в момент ребаланса (новый член зашёл, лидер сменился, broker упал): если в этот момент handler сидит в backoff'е, у него есть только RebalanceTimeout (rebalance.timeout.ms, дефолт 60 секунд в franz-go v1.21.0) чтобы свернуться и переджойниться. Не успел — координатор кикает клиента, партиция уезжает к другому, и тот возьмётся за тот же offset с той же ошибкой. В Java-клиенте механика жёстче: между poll() вызовами действует max.poll.interval.ms (дефолт 5 минут), и его превышение сразу кикает консьюмера, без привязки к ребалансу.
Это первый аргумент. Второй — головы́ блокируются. У тебя в одной партиции 1000 платежей, среди них один залипший на упавшем провайдере. На него тратится тридцать секунд. Все 999 за ним ждут. Так получается hot-line из-за единственного зависшего record'а.
Третий — про длину паузы. Если провайдер лежит, нет смысла стучать чаще, чем он проснётся. Минута, пять минут, час. Спать на это время прямо в poll-loop'е нельзя по причине номер один. Параллельно работать тоже не получится — порядок сломается, offset нельзя коммитить, пока «висит» record (см. Конкурентность и lag).
Вывод. Если ретраи нужны не «тут же ещё разок», а «через 30 секунд / 5 минут / час» — нужен другой механизм. Тот, что не блокирует основной consumer.
Идея retry-топиков
Решение простое и наглядное. Делаем отдельный топик на каждый интервал ожидания:
brew.payments.v1— основной поток платежей;brew.payments.retry.30s— списание упало в основном, перекинули сюда;brew.payments.retry.5m— упало вretry-30s, перекинули сюда;brew.payments.retry.1h— последний шанс;brew.payments.dlq— финальная остановка.
Один consumer слушает все четыре топика (main и три retry). Когда из main приходит платёж и handle падает (провайдер не ответил) — мы его пакуем с дополнительными headers и шлём в retry-30s. Дальше он лежит там как обычное сообщение Kafka. Тот же consumer его рано или поздно прочитает. И вот тут трюк: перед обработкой смотрим на record.Timestamp и ждём, пока пройдёт нужный интервал. Если запись пришла секунду назад, а ждать надо тридцать — спим 29 секунд. Потом снова handle. Получилось — commit и едем дальше. Не получилось — retry-5m. Сценарий повторяется на каждой ступени.
Получаем то, что хотели:
- ретраи не блокируют основной поток. main-партиции всегда обрабатываются с тем же темпом, что без ошибок;
- между попытками — реальные интервалы ожидания, а не «как успеет poll-loop»;
- история движения по pipeline'у видна в headers (
error.message,previous.topic,retry.count) — оператор разберёт инцидент по headers DLQ-сообщения, не лазая в логи.
Минус: я всё равно блокирую poll-loop ровно на retry-топиках, пока «отлёживаю» record. На лекционной нагрузке это нормально. На production-нагрузке делают по-другому — отдельный consumer на каждый retry-топик, либо PauseFetchPartitions плюс отложенный ResumeFetchPartitions (это тема Доставка во внешние системы). Для понимания паттерна важна сама эскалация, остальное — детали реализации.
Headers как протокол
Каждая ступень pipeline'а оставляет следы. Соглашение в нашей лекции:
| Header | Кто ставит | Что значит |
|---|---|---|
error.class | каждая ступень | permanent или transient (последняя классификация) |
error.message | каждая ступень | строка ошибки |
error.timestamp | каждая ступень | когда упало (RFC3339Nano UTC) |
retry.count | каждая ступень | счётчик эскалаций (0 → 1 → 2 → 3 → DLQ) |
previous.topic | каждая ступень | откуда переехали (для DLQ это последняя retry-ступень) |
original.topic | первая эскалация | где record родился (никогда не меняется) |
original.partition / original.offset | первая эскалация | координаты первого появления |
Соглашение специально консервативное. Headers — это пары байтов, ничего самовалидирующегося там нет. Мы решаем сами, что и как туда класть. Если выбор полей понятный — DLQ можно разбирать без access'а к коду processor'а: открыл headers, прочитал error.class и retry.count, и уже видишь картину.
previous.topic отдельно — он удобный для replay'а. Когда оператор ловит DLQ-инцидент и хочет понять, на какой именно ступени окончательно сдалось — previous.topic отвечает. original.topic нужен другому: чтобы понять, где «дом» этого payload'а. После replay из DLQ обратно в main — original.topic остаётся прежним, его мы при replay'е не перетираем. Получается стабильный идентификатор «места рождения» record'а, удобный для трейсинга.
Что показывает наш processor
Главное — таблица ступеней. Я их завёл явно, потому что это контракт лекции:
stages := []stage{
{topic: *mainTopic, delay: 0, nextTopic: *retry30},
{topic: *retry30, delay: *delay30s, nextTopic: *retry5m},
{topic: *retry5m, delay: *delay5m, nextTopic: *retry1h},
{topic: *retry1h, delay: *delay1h, nextTopic: ""},
}Пустой nextTopic на последней retry-ступени — флажок «дальше эскалировать некуда». forwardOrDLQ увидит пустую строку и пошлёт record в DLQ с reason=exhausted. Если бы поставили *dlq напрямую, в логе печатался бы reason=next-retry, и три случая (next-retry / permanent / exhausted) не различались бы между собой.
Один consumer группы lecture-04-04-processor подписывается на все четыре топика. Перед handle() смотрим на delay ступени и, если он положительный, ждём до record.Timestamp + delay. Это сердце retry-механики:
func waitUntilDue(ctx context.Context, recordTs time.Time, delay time.Duration) error {
due := recordTs.Add(delay)
wait := time.Until(due)
if wait <= 0 {
return nil
}
fmt.Printf("WAIT due=%s (через %s)\n", due.UTC().Format(time.RFC3339), wait.Truncate(time.Second))
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(wait):
return nil
}
}Дальше — решение, куда отправить упавший record. Три случая, каждый со своим target'ом:
target := st.nextTopic
reason := "next-retry"
if isPermanent(cause) {
target = dlqTopic
reason = "permanent"
} else if target == "" {
target = dlqTopic
reason = "exhausted"
}permanent — сразу в DLQ, минуя retry-ступени. Битый JSON (poison-pill) или отказ доменной валидации — повторять бесполезно, даже через час. exhausted — это transient, но мы уже на retry-1h и nextTopic пуст. Всё, что не вылечилось за час, считаем безнадёжным.
Headers собираются в forwardWithHeaders. Тонкий момент — original.* ставится только при первой эскалации:
if _, ok := idx["original.topic"]; !ok {
headers = appendOrReplace(headers, "original.topic", r.Topic)
headers = appendOrReplace(headers, "original.partition", strconv.Itoa(int(r.Partition)))
headers = appendOrReplace(headers, "original.offset", strconv.FormatInt(r.Offset, 10))
}
headers = appendOrReplace(headers, "previous.topic", r.Topic)
headers = appendOrReplace(headers, "retry.count", strconv.Itoa(nextRetries))appendOrReplace важен: error-headers перетираются на каждой ступени (нам нужна последняя ошибка, не первая), а original.* пишутся один раз и держатся.
DLQ как терминал
Когда record добрался до DLQ — это конец автоматического pipeline'а. Дальше его читает отдельный обработчик и в общем случае не возвращает в основной поток. Цели у DLQ-обработчика три:
- Зафиксировать инцидент в долговременном хранилище (БД, append-only лог, S3) — чтобы можно было поднять глазами через неделю.
- Дёрнуть алёрт — кто-то живой должен узнать, что сообщение умерло.
- Не блокировать DLQ-партиции бесконечной обработкой — DLQ должен дочитываться быстро, иначе lag растёт и ты теряешь видимость.
В нашей лекции cmd/dlq-processor делает первое и второе. Алёрт mock'нут до stdout (в проде это webhook в Slack или PagerDuty). Хранилище — append-only JSON-файл /tmp/lecture-04-04-incidents.jsonl. По плану лекции там должна быть Postgres-таблица — паттерн идентичный, файл выбран, чтобы не тащить ещё один docker-compose. На прод — подменяешь os.OpenFile на pgxpool.Exec(INSERT ...), и всё.
Структура incident-записи:
type incident struct {
DLQTopic string `json:"dlq_topic"`
DLQPartition int32 `json:"dlq_partition"`
DLQOffset int64 `json:"dlq_offset"`
Key string `json:"key,omitempty"`
OriginalTopic string `json:"original_topic,omitempty"`
OriginalPart string `json:"original_partition,omitempty"`
OriginalOffset string `json:"original_offset,omitempty"`
PreviousTopic string `json:"previous_topic,omitempty"`
RetryCount string `json:"retry_count,omitempty"`
ErrorClass string `json:"error_class,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
ErrorTimestamp string `json:"error_timestamp,omitempty"`
DLQRecordTime string `json:"dlq_record_time"`
PayloadByteCount int `json:"payload_bytes"`
}Намеренно нет поля payload. Идея — incident-лог должен быть лёгким и пригодным к индексированию (по error_class, по original_topic). Если payload надо посмотреть — это уже отдельная операция через kafka-console-consumer или dump через replay-cli --dry-run. В incident-лог копировать payload'ы — путь к терабайту жирных JSON'ов, по которым потом не найти ни одного нужного инцидента.
Алёрт в stdout простой:
[ALERT] #3 dlq=brew.payments.dlq p=1 off=2 key=k-7
original=brew.payments.v1/0/14 previous=brew.payments.retry.1h retries=3
class=transient message="exhausted retries: transient downstream blip on payment id=\"k-7\""
payload=42 bytesЭтого хватит, чтобы понять: платёж k-7 пришёл из основного brew.payments.v1, прошёл все три retry-ступени, на каждой провайдер давал transient-отказ, и в итоге списание сдалось после часа ожидания. В реальном алёрт-канале форматирование другое, поля те же.
Replay
DLQ — это финал автоматики, но не приговор. Часть инцидентов после починки провайдера имеет смысл переиграть. Тот же transient: за час платёжный провайдер ожил, и теперь у нас в brew.payments.dlq лежит 200 платежей, которые могли бы пройти, если их подать снова.
cmd/replay-cli это умеет. Ключевые флаги:
-from-topic— откуда читать, по умолчаниюbrew.payments.dlq;-to-topic— куда переотправить, по умолчанию основнойbrew.payments.v1;-since— фильтр по времени DLQ-записи (берём всё новееnow() - since);-error-class— опциональный фильтр по header'у; типичный случай —transient;-dry-run— посчитать совпадения, ничего не публикуя.
Перепаковка в новый record:
func replayRecord(r *kgo.Record, toTopic string) *kgo.Record {
headers := append([]kgo.RecordHeader(nil), r.Headers...)
headers = setHeader(headers, "retry.count", "0")
headers = setHeader(headers, "replay.from-dlq", r.Topic+"/"+strconv.Itoa(int(r.Partition))+"/"+strconv.FormatInt(r.Offset, 10))
headers = setHeader(headers, "replay.timestamp", time.Now().UTC().Format(time.RFC3339Nano))
return &kgo.Record{
Topic: toTopic,
Key: r.Key,
Value: r.Value,
Headers: headers,
}
}Значимое здесь:
retry.countобнуляется. Новый pipeline начинается с нуля — иначе DLQ-replay сразу попал бы под счётчик исчерпанных попыток предыдущей сессии и улетел обратно в DLQ.replay.from-dlq— координаты исходного record'а в DLQ. Если после replay снова упадём — в новом DLQ-инциденте по этому header'у видно, что текущий прогон уже второй.- payload и key — нетронуты. Это важно: в тех системах, где consumer строит дедуп по бизнес-ключу payload'а, replay не должен ломать идемпотентность.
Что замолчал намеренно. Replay не дедуплицирует. Если запустить make replay дважды подряд — отправит дважды. Защиту от этого должен делать consumer (см. Outbox-паттерн про idempotency на dedup-таблице). Альтернатива — хранить ID уже сделанных replay'ев на стороне CLI, но тогда у нас стейт-полный CLI, что отдельная история.
Метрики, на которые смотреть
Наблюдаемость pipeline'а строится на четырёх числах. Каждое из них имеет осмысленную цель:
- end-offset основного
brew.payments.v1. Растёт пропорционально потоку платежей. На него можно навесить алёрт «throughput упал». - end-offset каждого retry-топика. На стабильно работающей системе они должны быть низкими и расти медленно. Резкий рост — сигнал «платёжный провайдер деградировал». Идеал — все три retry-топика близки к нулю.
- end-offset DLQ. Любой ненулевой прирост — алёрт. На production это обычно
rate(messages_in_dlq_total[5m]) > 0в Prometheus. - consumer lag по группе processor'а. Лекция Конкурентность и lag показывала
kadm.Lag— для каждой ступени отдельный лаг, и если на main всё хорошо, а на retry-30s огромный — значит, мы захлёбываемся в ретраях.
Про DLQ есть отдельная мета-метрика — error.class distribution. Из incident-лога её снимаешь одной строкой: jq -r '.error_class' /tmp/lecture-04-04-incidents.jsonl | sort | uniq -c. Если 90% инцидентов — transient, значит, retry pipeline скорее всего слишком короткий: нужен ещё один уровень с большей задержкой, либо replay по расписанию.
Демо
Стенд из корня репозитория должен быть поднят (docker compose up -d в корне). Дальше из директории лекции:
make topic-create-pipeline
make seed-with-failures SEED_MESSAGES=20В brew.payments.v1 лежит 20 mock-платежей. Часть с mode=ok (прошли с первой попытки), часть transient (провайдер флапает, на каждой ступени уйдут дальше), часть permanent (битый платёж — poison-pill — или явный reject доменной валидации, сразу в DLQ).
Запускаем processor с быстрыми задержками, чтобы пайплайн прошёл за полминуты, а не за час:
make run-processor-fastВ выводе видно, как records путешествуют. Что-то вида:
OK [brew.payments.v1] p=0 off=3 key=k-3
FAIL [brew.payments.v1] p=2 off=4 key=k-5 reason=next-retry err=transient ... → brew.payments.retry.30s
FAIL [brew.payments.v1] p=1 off=2 key=k-7 reason=permanent err=invalid json: ... → brew.payments.dlq
WAIT due=2026-05-01T12:30:15Z (через 1s)
FAIL [brew.payments.retry.30s] p=0 off=0 key=k-5 reason=next-retry err=transient ... → brew.payments.retry.5mПосле того как processor прокачает все 4 топика и встанет на «нет новых сообщений» — Ctrl+C. В соседнем терминале:
make run-dlqDLQ-processor читает brew.payments.dlq, печатает ALERT и пишет JSON-строки в /tmp/lecture-04-04-incidents.jsonl. Проверим:
make dlq-count
cat /tmp/lecture-04-04-incidents.jsonl | jq -r '[.error_class, .original_topic, .key] | @tsv'В DLQ — все permanent (сразу) плюс все transient (после исчерпания трёх retry-ступеней).
Теперь replay. Допустим, платёжный провайдер починили, и мы хотим вернуть все transient платежи за последний час обратно в основной топик:
make replay REPLAY_CLASS=transient REPLAY_SINCE=1hCLI читает brew.payments.dlq, фильтрует по error.class=transient, упаковывает с обнулённым retry.count и шлёт в brew.payments.v1. После этого в основном топике появляются те же платежи заново — payment.k-5, payment.k-9. Если запустить processor снова, они пойдут по pipeline'у с нуля. На лекционных моках они опять упадут (моки не лечатся), но зато в логе processor'а у новых retry-сообщений будет header replay.from-dlq со ссылкой на исходный DLQ-offset. По нему оператор поймёт: текущий прогон уже второй, первая жизнь платежа закончилась в DLQ.
make replay-dry делает то же самое без ProduceSync — полезно убедиться, что фильтр захватывает то, что ожидаешь, до реального трафика.
Рамки паттерна
Несколько границ, которые легко упустить.
Pipeline retry-топиков сам по себе не делает доставку гарантированной — это тот же at-least-once, что был в Обработка ошибок. Ровно те же грабли «упали между produce и commit» работают и тут. Если processor упал между «сделали ProduceSync в retry-5m» и «сделали CommitRecords для retry-30s» — на рестарте retry-30s отдаст этот record снова, и он попадёт в retry-5m повторно. Дубль в retry-5m. Идемпотентность handler'а — единственная защита.
Длительные ожидания в retry-1h (час) на одной партиции блокируют все остальные record'ы в этой же партиции. Это тонкое место. Один способ обойти — partitioning по бизнес-ключу: если key=k-5 залип на час, остальные ключи лежат в других партициях и обрабатываются как ни в чём не бывало. Если же все retry-сообщения летят в одну партицию (например, ключ — это user_id, а у одного user'а сразу 100 сообщений) — pipeline захлёбывается. Решение — либо уменьшать retry-1h до меньшей задержки, либо параллелить через worker pool с per-key affinity (см. Конкурентность и lag), либо разделять retry-pipeline на больше партиций, чем у основного топика.
Replay — ручная операция, и это нормально. Автоматический replay из DLQ обратно в main без понимания причины инцидента — путь к infinite loop'у. Если провайдера ещё не починили или причина была не в нём, а в самом платеже — record снова упадёт по тому же сценарию, и DLQ начнёт расти. Поэтому replay инициирует человек или routine, который проверил, что причина устранена.
И последнее. Retry-pipeline не подходит для случаев, где порядок важнее всего. Когда payment k-5 ушёл в retry-30s, а payment k-6 (с тем же ключом, но более поздний) проскочил по основному пути — мы нарушили order per-key. Если бизнес-логика терпит inversions, это нормально. Если строго запрещены — нужна другая архитектура, например, парковка всей партиции через PauseFetchPartitions до восстановления платёжного провайдера (Доставка во внешние системы).
Запуск целиком
make topic-create-pipeline
make seed-with-failures SEED_MESSAGES=50
# терминал 1
make run-processor-fast
# терминал 2 (как только processor отработает)
make run-dlq
# терминал 3
make dlq-count
make replay REPLAY_CLASS=transient REPLAY_SINCE=24h
make clean # снести группы, топики и incident-логПолезные sanity-check'и: make main-count (общее число записей в основном топике с учётом replay'ев), make dlq-count (сколько умерло), wc -l /tmp/lecture-04-04-incidents.jsonl (сколько алёртов сгенерировалось — должно совпадать с DLQ).