Kafka CookbookПродьюсерБатчинг и пропускная способность
0 / 42 (0%)

Батчинг и пропускная способность

В пятницу Brew включает промо «бесплатный кофе по пятницам», и поток заказов в brew.orders.v1 подскакивает до 8000 заказов/мин. На обычном дне продьюсер order-service справляется в одну калитку, в промо-пик — упирается в пропускную способность. В Ключах и партиционировании вопрос был «как раскидать заказы по партициям»; тут — «как продьюсер выжимает байты в эти партиции», когда заказы идут стеной.

В Acks и durability мы измеряли latency на честных round-trip'ах через ProduceSync — каждый record отдельно, по одному в полёте. Это была демка про acks, не про скорость. Тут берём другую сторону медали — пропускную способность под промо-пятницу. Что вообще даёт продьюсеру скорость, как пользоваться батчингом и где он начинает мешать.

Спойлер на старте: пропускная способность Kafka-продьюсера — это про размер батча, а не про размер сообщения. Чем толще батч, тем меньше round-trip'ов на единицу полезной нагрузки. Сжатие тут идёт прицепом — оно работает на уровне батча, и без батча толку от него мало.

Батчинг — это не «когда я говорю Flush»

Главное недопонимание: люди думают, что батч в Kafka — это когда ты собираешь массив сообщений в коде и отдаёшь его одним вызовом. Это не так. Продьюсер сам собирает батчи. Ты пишешь record'ы по одному, продьюсер кладёт их в очередь per-partition, и периодически отправляет накопленное на брокер.

Что значит «периодически». В franz-go (и в обычном Java-клиенте) это срабатывает по одному из двух событий:

  1. Накопилось ProducerBatchMaxBytes (дефолт franz-go v1.21.0 — 1 000 012 байт, повторяет дефолт Kafka max.message.bytes) — отправляем сразу, нет смысла копить.
  2. Сработал ProducerLinger — таймер, по истечении которого отправляется всё, что есть. Дефолт franz-go v1.21.0 — 10 мс (поменяли в v1.20.0, до этого было 0; Java-клиент исторически держит 0).

Дефолт 10 мс — это уже мини-linger «по факту»: даже на плотном потоке клиент успевает собрать в один батч record'ы, упавшие в очередь между моментами «положил в очередь» и «продьюсер собрался отправить запрос». Без явного ProducerLinger(0) за счёт этого батч ощутимо толще, чем «отправляй мгновенно». При редком потоке 10 мс почти не видны на end-to-end latency, зато компрессия успевает съесть нормальную пачку.

ProducerLinger > 10ms принудительно ждёт ещё дольше, накапливая больше. На очень частом потоке прибавки почти нет (батч и так толстый), на медленном — таймер съедает latency. Поэтому linger — это компромисс между «отправить почти сразу» и «отправить эффективно».

В нашем коде batch собирается ровно так:

go
opts := []kgo.Opt{
    kgo.DefaultProduceTopic(topic),
    kgo.ProducerLinger(s.linger),
    kgo.ProducerBatchCompression(s.codec),
    kgo.ProducerBatchMaxBytes(1 << 20), // 1 MiB
    kgo.MaxBufferedRecords(200_000),
}

MaxBufferedRecords — это лимит, после которого Produce начнёт блокироваться. Дефолт 10 000 на 100k нагрузке быстро упрётся в backpressure, и мы будем мерять не linger, а ожидание места в буфере. Подняли до 200к — чтобы продьюсер ни разу не споткнулся.

Per-partition, а не per-topic

Очередь батчей — на каждую партицию свою. Это важная штука. Если ты пишешь в топик с 24 партициями и распределяешь сообщения равномерно по ключу — у тебя 24 параллельные очереди, каждая собирает свой батч независимо.

Что отсюда следует на практике:

  • Толстые батчи получаются легче, когда сообщений в одну партицию много. На разреженном паттерне «по ключу один-два-record'а в секунду» батч никогда не наберётся, и дефолтный linger 10 мс будет отправлять каждое почти сразу.
  • Дефолтный партиционер franz-go v1.21.0 — UniformBytesPartitioner(64 KiB, adaptive=true, keys=true, nil) (KIP-794, Java-дефолт с 3.3). При пустом ключе он «прилипает» к одной партиции до ~64 KiB накопленных данных и потом переключается на следующую (sticky-bytes, не per-record round-robin). Это специально, чтобы батч хоть иногда набирался даже без ключа.
  • Увеличение числа партиций без роста скорости записи делает батчинг хуже, не лучше. Каждая партиция получает меньше — и батчи становятся тоньше.

Это к разговору «у нас тормозит, давайте увеличим число партиций». Сначала посмотри, набирается ли вообще батч — иначе только сделаешь хуже. На промо-пятнице brew.orders.v1 спасает не лишняя партиция, а толстый батч: 8000 заказов/мин ложатся в каждую партицию плотно, и продьюсеру есть из чего собирать пачку.

Сжатие живёт на батче

Compression в Kafka применяется к батчу целиком, не к каждому record'у. Один батч — один сжатый блок. Когда консьюмер читает, он распаковывает батч целиком и отдаёт record'ы дальше.

Из этого вытекает и сила, и предел сжатия. На толстом батче (десятки-сотни record'ов с похожей структурой) — степень сжатия отличная: повторяющиеся поля, общие префиксы, словарь работает. На тонком батче (один record на батч) — почти ноль выигрыша; CPU потратили, ratio ≈ 1.

Вот почему имеет смысл связка linger + compression. Без linger в момент низкой нагрузки батчи остаются тонкими, compression не окупается. Со средним linger (5–20 мс) даже на медленном потоке успевает накопиться нормальный батч — и кодек начинает работать.

Кодеки в порядке от быстрого/слабого к медленному/сильному:

  1. none — без сжатия. Простой, минимум CPU, ratio = 1.
  2. lz4 — очень быстрое, ratio в районе 1.5–2× на структурированных данных. Промышленный дефолт.
  3. snappy — где-то рядом с lz4 по соотношению. Старее, в franz-go так же поддерживается через kgo.SnappyCompression().
  4. zstd — медленнее по CPU, но сильно лучше по ratio: 2–4× на JSON, ещё больше на текстах. Поддерживается с Kafka 2.1.
  5. gzip — медленнее zstd, ratio часто хуже. Используется редко, чаще из соображений совместимости.

В нашем бенчмарке мы гоняем none / lz4 / zstd — это три точки, по которым видно практическую разницу.

Что показывает наш код

Бинарник один: cmd/bench. Он гоняет матрицу из трёх linger'ов (0/5/50 мс) и трёх кодеков (none/lz4/zstd), на каждой комбинации пишет 100 000 JSON-сообщений по ~1 КБ — это модель промо-пятничного потока заказов (OrderPlaced в brew.orders.v1). На выходе — таблица с throughput, P50/P99/P99.9 latency и размером на диске.

Каждая комбинация пишет в свой топик — иначе размеры на диске смешаются и колонка про сжатие потеряет смысл. Имена топиков lecture-02-04-batching-l<linger>-<codec>.

Запись принципиально асинхронная — cl.Produce плюс callback. Если бы мы делали ProduceSync (как в Acks и durability), батч-эффекты бы исчезли: пока один record не добьёт round-trip, следующий не уезжает.

Сам цикл — это просто cl.Produce в loop'е, с замером времени до колбэка:

go
for i := 0; i < msgs; i++ {
    rec := &kgo.Record{Value: payloads[i]}
    sendAt := time.Now()
    cl.Produce(ctx, rec, func(_ *kgo.Record, err error) {
        took := time.Since(sendAt)
        // запись в res.latencies, инкремент счётчиков
    })
}
 
flushCtx, flushCancel := context.WithTimeout(ctx, 2*time.Minute)
_ = cl.Flush(flushCtx)
flushCancel()
res.elapsed = time.Since(start)

Тут две тонкости. Первая — Flush обязателен. Без него цикл закончится мгновенно (мы отдали 100к записей в очередь буфера за миллисекунды), и elapsed окажется фейковым. Вторая — latency, которую мы записываем в callback, это не round-trip к брокеру. Это полный путь record'а: «положил в очередь» → «попал в батч» → «батч отправлен» → «брокер ответил» → «callback вызван». На толстой нагрузке у первых записей в очереди latency будет огромная — они ждут, пока пройдут все батчи. У последних — маленькая. P50 на 100к получается ~средний шаг по всему прогону, не «время одного RPC».

Это ОК для нашего сравнения — мы сравниваем сценарии между собой при одинаковой нагрузке. Если хочется честную per-record latency на низкой нагрузке — это уже другой эксперимент (и он был в Acks и durability с ProduceSync).

Размер на диске считаем через kadm.DescribeAllLogDirs:

go
all, err := admin.DescribeAllLogDirs(rpcCtx, nil)
// ...
all.Each(func(d kadm.DescribedLogDir) {
    d.Topics.Each(func(p kadm.DescribedLogDirPartition) {
        if !wanted[p.Topic] { return }
        if seen[p.Topic][p.Partition] { return }
        seen[p.Topic][p.Partition] = true
        sizes[p.Topic] += p.Size
    })
})

Дельта после − до — сколько байт лёг в логи одной реплики. На rf=3 в кластере на самом деле в три раза больше; для сравнения сценариев это не важно. Берём строго первую найденную реплику на каждую партицию (через seen-мапу), иначе при rf=3 размер размножится втрое.

Payload-генератор тоже не случайный кусок кода. Если бы я переиспользовал один и тот же буфер на все 100k record'ов — zstd закодировал бы его как «повтор N раз» и compression-ratio улетел бы в небо. Поэтому каждый record получает уникальный JSON: фиксированная структура (ключи seq, id, ts, event, payload) и случайный hex-наполнитель до целевого размера. Так zstd видит реалистичную картину — структура жмётся, random-часть нет.

Что видно на прогоне

Здоровый кластер, 100 000 сообщений по ~1 КБ JSON, partitions=3, rf=3. Прогон занимает ~30 секунд суммарно по всем девяти сценариям.

plaintext
SCENARIO                      SENT    FAILED  ELAPSED   THROUGHPUT    P50       P99       P99.9     DISK
linger=0ms  compression=none  100000  0       1.10s     91200 msg/s   363.79ms  1.07s     1.08s     98.6MB
linger=0ms  compression=lz4   100000  0       1.28s     77887 msg/s   548.80ms  1.25s     1.26s     92.2MB
linger=0ms  compression=zstd  100000  0       1.01s     99229 msg/s   531.73ms  968.58ms  969.76ms  48.2MB
linger=5ms  compression=none  100000  0       1.45s     69135 msg/s   936.45ms  1.42s     1.42s     98.6MB
linger=5ms  compression=lz4   100000  0       752.63ms  132868 msg/s  386.57ms  708.90ms  717.08ms  92.2MB
linger=5ms  compression=zstd  100000  0       704.46ms  141952 msg/s  436.75ms  632.30ms  636.41ms  48.2MB
linger=50ms compression=none  100000  0       1.69s     59335 msg/s   930.63ms  1.64s     1.66s     98.6MB
linger=50ms compression=lz4   100000  0       2.60s     38507 msg/s   1.06s     2.51s     2.52s     92.2MB
linger=50ms compression=zstd  100000  0       1.24s     80911 msg/s   781.35ms  1.14s     1.14s     48.3MB

Что отсюда читается.

Колонка DISK — самое чистое. На JSON 1 КБ:

  • none — 98.6 МБ. Это ровно 100 000 × ~1 КБ плюс немного метаданных батча.
  • lz4 — 92.2 МБ. Жмёт всего на 6%. На JSON-with-random-id структура есть, но её мало; основная масса — random hex, который lz4 не берёт.
  • zstd — 48.2 МБ. Половина от none. Тот же random hex, но zstd использует словарь и берёт куски структуры значительно лучше.

Числа стабильные между прогонами: компрессия — функция от данных, не от тайминга.

Колонка THROUGHPUT — а вот тут интересно. Лучшая комбинация — linger=5ms × zstd (~142k msg/s). Хуже всего — linger=50ms × lz4 (~38k msg/s), и это контринтуитивно: линкер же должен помогать?

Объяснение в том, что мы пишем все 100к сразу, продьюсер набирает огромные батчи в любом случае. Толкаем cl.Flush(), и он ждёт, пока все таймеры linger'а истекут. С linger=50ms каждый последний кусок ждёт ещё 50 мс перед отправкой — а таких «последних кусков» в конце много (хвосты по партициям). На разреженном потоке linger=50 был бы полезен; на «накормили клиента и ждём» — он только тормозит хвост.

Это тоже урок. Linger — это про сглаживание неравномерности нагрузки, не про «всегда лучше». Если у тебя ровный поток и хочется минимальной end-to-end latency — оставляй 0. Если поток рваный, с пиками и провалами — 5–20 мс убирают пилу. 50 мс в продакшне — почти всегда перебор.

P99 / P99.9 на 100к async-produce — это «сколько занял путь у самого позднего record'а в буфере». Лучшие хвосты на linger=5ms × {lz4, zstd} (~700 мс). Худшие — на linger=50ms × lz4 (~2.5 с). На реальной нагрузке с разреженным потоком эти числа выглядели бы иначе — там P99 определяется бы тем самым 50-мс linger'ом, а не размером буфера.

А что с размером сообщения

Мы взяли 1 КБ — типичный размер JSON-события (id, поля заказа, метаданные). Если поднять payload до 10 КБ:

  • Размер на диске пропорционально вырастет.
  • Compression-ratio станет лучше — больше повторов внутри одного record'а.
  • Throughput по сообщениям упадёт, throughput по байтам останется того же порядка.

Если уменьшить до 100 байт:

  • Метаданные record'а (headers, key, partition, offset, CRC, timestamp) станут заметной долей. Compression почти не работает — мало данных в записи.
  • Throughput по сообщениям может вырасти до сотен тысяч msg/s — но throughput по байтам упадёт.

Граница — где-то 200–300 байт. Меньше — пиши пачкой в одном record'е (если бизнес-логика позволяет) или соглашайся на накладные. Больше — батчинг и compression работают как ожидается.

Что взять с собой

  • Батчинг в Kafka — на стороне продьюсера, per-partition. Сам собирает, сам отправляет. Размер регулируется ProducerBatchMaxBytes и ProducerLinger.
  • Дефолт franz-go v1.21.0 — ProducerLinger=10ms. Это уже не «отправляй сразу», но и не агрессивный linger: естественные батчи набираются толще, чем при 0, и end-to-end latency почти не растёт.
  • linger > 10ms нужен для разреженного потока, чтобы дать накопиться. На плотном потоке от него только хвостовая latency растёт — дешевле оставить дефолт или явный ProducerLinger(0).
  • Compression работает на батч. На тонком батче compression не даёт ничего; на толстом — даёт всё.
  • Кодеки. lz4 дёшево по CPU и берёт умеренно. zstd дороже, но ratio в 2–4× на JSON. На случайных байтах никакой кодек не поможет.
  • MaxBufferedRecords — лимит после которого Produce блокируется. Дефолт 10 000 в реальной нагрузке нужно поднимать; иначе backpressure будет упираться в буфер, а не в брокер.
  • При sizing'е партиций смотри на скорость записи в одну партицию, а не на топик. Слишком много партиций при низком потоке = тонкие батчи = плохой compression-ratio.

В Ошибки, retries и headers разберёмся с классами ошибок продьюсера, retry/timeout-настройками и headers — это последний кусок, после которого продьюсер у нас будет полным.

Запуск

Стенд из корня репозитория должен быть поднят (docker compose up -d). Дальше:

sh
make run

Это создаст 9 топиков и прогонит матрицу с дефолтами (100 000 × 1024 байт). Прогон занимает примерно 30 секунд на ноуте.

Меньшую нагрузку можно гонять для быстрых итераций:

sh
make run MESSAGES=10000 PAYLOAD=512

Между прогонами полезно прибирать топики, иначе размер на диске просто складывается:

sh
make topic-delete

Описать партиции:

sh
make topic-describe
·Модуль 02

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Продьюсер / Батчинг и пропускная способность