Батчинг и пропускная способность
В пятницу Brew включает промо «бесплатный кофе по пятницам», и поток заказов в brew.orders.v1 подскакивает до 8000 заказов/мин. На обычном дне продьюсер order-service справляется в одну калитку, в промо-пик — упирается в пропускную способность. В Ключах и партиционировании вопрос был «как раскидать заказы по партициям»; тут — «как продьюсер выжимает байты в эти партиции», когда заказы идут стеной.
В Acks и durability мы измеряли latency на честных round-trip'ах через ProduceSync — каждый record отдельно, по одному в полёте. Это была демка про acks, не про скорость. Тут берём другую сторону медали — пропускную способность под промо-пятницу. Что вообще даёт продьюсеру скорость, как пользоваться батчингом и где он начинает мешать.
Спойлер на старте: пропускная способность Kafka-продьюсера — это про размер батча, а не про размер сообщения. Чем толще батч, тем меньше round-trip'ов на единицу полезной нагрузки. Сжатие тут идёт прицепом — оно работает на уровне батча, и без батча толку от него мало.
Батчинг — это не «когда я говорю Flush»
Главное недопонимание: люди думают, что батч в Kafka — это когда ты собираешь массив сообщений в коде и отдаёшь его одним вызовом. Это не так. Продьюсер сам собирает батчи. Ты пишешь record'ы по одному, продьюсер кладёт их в очередь per-partition, и периодически отправляет накопленное на брокер.
Что значит «периодически». В franz-go (и в обычном Java-клиенте) это срабатывает по одному из двух событий:
- Накопилось
ProducerBatchMaxBytes(дефолт franz-go v1.21.0 — 1 000 012 байт, повторяет дефолт Kafkamax.message.bytes) — отправляем сразу, нет смысла копить. - Сработал
ProducerLinger— таймер, по истечении которого отправляется всё, что есть. Дефолт franz-go v1.21.0 — 10 мс (поменяли в v1.20.0, до этого было 0; Java-клиент исторически держит 0).
Дефолт 10 мс — это уже мини-linger «по факту»: даже на плотном потоке клиент успевает собрать в один батч record'ы, упавшие в очередь между моментами «положил в очередь» и «продьюсер собрался отправить запрос». Без явного ProducerLinger(0) за счёт этого батч ощутимо толще, чем «отправляй мгновенно». При редком потоке 10 мс почти не видны на end-to-end latency, зато компрессия успевает съесть нормальную пачку.
ProducerLinger > 10ms принудительно ждёт ещё дольше, накапливая больше. На очень частом потоке прибавки почти нет (батч и так толстый), на медленном — таймер съедает latency. Поэтому linger — это компромисс между «отправить почти сразу» и «отправить эффективно».
В нашем коде batch собирается ровно так:
opts := []kgo.Opt{
kgo.DefaultProduceTopic(topic),
kgo.ProducerLinger(s.linger),
kgo.ProducerBatchCompression(s.codec),
kgo.ProducerBatchMaxBytes(1 << 20), // 1 MiB
kgo.MaxBufferedRecords(200_000),
}MaxBufferedRecords — это лимит, после которого Produce начнёт блокироваться. Дефолт 10 000 на 100k нагрузке быстро упрётся в backpressure, и мы будем мерять не linger, а ожидание места в буфере. Подняли до 200к — чтобы продьюсер ни разу не споткнулся.
Per-partition, а не per-topic
Очередь батчей — на каждую партицию свою. Это важная штука. Если ты пишешь в топик с 24 партициями и распределяешь сообщения равномерно по ключу — у тебя 24 параллельные очереди, каждая собирает свой батч независимо.
Что отсюда следует на практике:
- Толстые батчи получаются легче, когда сообщений в одну партицию много. На разреженном паттерне «по ключу один-два-record'а в секунду» батч никогда не наберётся, и дефолтный linger 10 мс будет отправлять каждое почти сразу.
- Дефолтный партиционер franz-go v1.21.0 —
UniformBytesPartitioner(64 KiB, adaptive=true, keys=true, nil)(KIP-794, Java-дефолт с 3.3). При пустом ключе он «прилипает» к одной партиции до ~64 KiB накопленных данных и потом переключается на следующую (sticky-bytes, не per-record round-robin). Это специально, чтобы батч хоть иногда набирался даже без ключа. - Увеличение числа партиций без роста скорости записи делает батчинг хуже, не лучше. Каждая партиция получает меньше — и батчи становятся тоньше.
Это к разговору «у нас тормозит, давайте увеличим число партиций». Сначала посмотри, набирается ли вообще батч — иначе только сделаешь хуже. На промо-пятнице brew.orders.v1 спасает не лишняя партиция, а толстый батч: 8000 заказов/мин ложатся в каждую партицию плотно, и продьюсеру есть из чего собирать пачку.
Сжатие живёт на батче
Compression в Kafka применяется к батчу целиком, не к каждому record'у. Один батч — один сжатый блок. Когда консьюмер читает, он распаковывает батч целиком и отдаёт record'ы дальше.
Из этого вытекает и сила, и предел сжатия. На толстом батче (десятки-сотни record'ов с похожей структурой) — степень сжатия отличная: повторяющиеся поля, общие префиксы, словарь работает. На тонком батче (один record на батч) — почти ноль выигрыша; CPU потратили, ratio ≈ 1.
Вот почему имеет смысл связка linger + compression. Без linger в момент низкой нагрузки батчи остаются тонкими, compression не окупается. Со средним linger (5–20 мс) даже на медленном потоке успевает накопиться нормальный батч — и кодек начинает работать.
Кодеки в порядке от быстрого/слабого к медленному/сильному:
- none — без сжатия. Простой, минимум CPU, ratio = 1.
- lz4 — очень быстрое, ratio в районе 1.5–2× на структурированных данных. Промышленный дефолт.
- snappy — где-то рядом с lz4 по соотношению. Старее, в franz-go так же поддерживается через
kgo.SnappyCompression(). - zstd — медленнее по CPU, но сильно лучше по ratio: 2–4× на JSON, ещё больше на текстах. Поддерживается с Kafka 2.1.
- gzip — медленнее zstd, ratio часто хуже. Используется редко, чаще из соображений совместимости.
В нашем бенчмарке мы гоняем none / lz4 / zstd — это три точки, по которым видно практическую разницу.
Что показывает наш код
Бинарник один: cmd/bench. Он гоняет матрицу из трёх linger'ов (0/5/50 мс) и трёх кодеков (none/lz4/zstd), на каждой комбинации пишет 100 000 JSON-сообщений по ~1 КБ — это модель промо-пятничного потока заказов (OrderPlaced в brew.orders.v1). На выходе — таблица с throughput, P50/P99/P99.9 latency и размером на диске.
Каждая комбинация пишет в свой топик — иначе размеры на диске смешаются и колонка про сжатие потеряет смысл. Имена топиков lecture-02-04-batching-l<linger>-<codec>.
Запись принципиально асинхронная — cl.Produce плюс callback. Если бы мы делали ProduceSync (как в Acks и durability), батч-эффекты бы исчезли: пока один record не добьёт round-trip, следующий не уезжает.
Сам цикл — это просто cl.Produce в loop'е, с замером времени до колбэка:
for i := 0; i < msgs; i++ {
rec := &kgo.Record{Value: payloads[i]}
sendAt := time.Now()
cl.Produce(ctx, rec, func(_ *kgo.Record, err error) {
took := time.Since(sendAt)
// запись в res.latencies, инкремент счётчиков
})
}
flushCtx, flushCancel := context.WithTimeout(ctx, 2*time.Minute)
_ = cl.Flush(flushCtx)
flushCancel()
res.elapsed = time.Since(start)Тут две тонкости. Первая — Flush обязателен. Без него цикл закончится мгновенно (мы отдали 100к записей в очередь буфера за миллисекунды), и elapsed окажется фейковым. Вторая — latency, которую мы записываем в callback, это не round-trip к брокеру. Это полный путь record'а: «положил в очередь» → «попал в батч» → «батч отправлен» → «брокер ответил» → «callback вызван». На толстой нагрузке у первых записей в очереди latency будет огромная — они ждут, пока пройдут все батчи. У последних — маленькая. P50 на 100к получается ~средний шаг по всему прогону, не «время одного RPC».
Это ОК для нашего сравнения — мы сравниваем сценарии между собой при одинаковой нагрузке. Если хочется честную per-record latency на низкой нагрузке — это уже другой эксперимент (и он был в Acks и durability с ProduceSync).
Размер на диске считаем через kadm.DescribeAllLogDirs:
all, err := admin.DescribeAllLogDirs(rpcCtx, nil)
// ...
all.Each(func(d kadm.DescribedLogDir) {
d.Topics.Each(func(p kadm.DescribedLogDirPartition) {
if !wanted[p.Topic] { return }
if seen[p.Topic][p.Partition] { return }
seen[p.Topic][p.Partition] = true
sizes[p.Topic] += p.Size
})
})Дельта после − до — сколько байт лёг в логи одной реплики. На rf=3 в кластере на самом деле в три раза больше; для сравнения сценариев это не важно. Берём строго первую найденную реплику на каждую партицию (через seen-мапу), иначе при rf=3 размер размножится втрое.
Payload-генератор тоже не случайный кусок кода. Если бы я переиспользовал один и тот же буфер на все 100k record'ов — zstd закодировал бы его как «повтор N раз» и compression-ratio улетел бы в небо. Поэтому каждый record получает уникальный JSON: фиксированная структура (ключи seq, id, ts, event, payload) и случайный hex-наполнитель до целевого размера. Так zstd видит реалистичную картину — структура жмётся, random-часть нет.
Что видно на прогоне
Здоровый кластер, 100 000 сообщений по ~1 КБ JSON, partitions=3, rf=3. Прогон занимает ~30 секунд суммарно по всем девяти сценариям.
SCENARIO SENT FAILED ELAPSED THROUGHPUT P50 P99 P99.9 DISK
linger=0ms compression=none 100000 0 1.10s 91200 msg/s 363.79ms 1.07s 1.08s 98.6MB
linger=0ms compression=lz4 100000 0 1.28s 77887 msg/s 548.80ms 1.25s 1.26s 92.2MB
linger=0ms compression=zstd 100000 0 1.01s 99229 msg/s 531.73ms 968.58ms 969.76ms 48.2MB
linger=5ms compression=none 100000 0 1.45s 69135 msg/s 936.45ms 1.42s 1.42s 98.6MB
linger=5ms compression=lz4 100000 0 752.63ms 132868 msg/s 386.57ms 708.90ms 717.08ms 92.2MB
linger=5ms compression=zstd 100000 0 704.46ms 141952 msg/s 436.75ms 632.30ms 636.41ms 48.2MB
linger=50ms compression=none 100000 0 1.69s 59335 msg/s 930.63ms 1.64s 1.66s 98.6MB
linger=50ms compression=lz4 100000 0 2.60s 38507 msg/s 1.06s 2.51s 2.52s 92.2MB
linger=50ms compression=zstd 100000 0 1.24s 80911 msg/s 781.35ms 1.14s 1.14s 48.3MBЧто отсюда читается.
Колонка DISK — самое чистое. На JSON 1 КБ:
none— 98.6 МБ. Это ровно 100 000 × ~1 КБ плюс немного метаданных батча.lz4— 92.2 МБ. Жмёт всего на 6%. На JSON-with-random-id структура есть, но её мало; основная масса — random hex, который lz4 не берёт.zstd— 48.2 МБ. Половина отnone. Тот же random hex, но zstd использует словарь и берёт куски структуры значительно лучше.
Числа стабильные между прогонами: компрессия — функция от данных, не от тайминга.
Колонка THROUGHPUT — а вот тут интересно. Лучшая комбинация — linger=5ms × zstd (~142k msg/s). Хуже всего — linger=50ms × lz4 (~38k msg/s), и это контринтуитивно: линкер же должен помогать?
Объяснение в том, что мы пишем все 100к сразу, продьюсер набирает огромные батчи в любом случае. Толкаем cl.Flush(), и он ждёт, пока все таймеры linger'а истекут. С linger=50ms каждый последний кусок ждёт ещё 50 мс перед отправкой — а таких «последних кусков» в конце много (хвосты по партициям). На разреженном потоке linger=50 был бы полезен; на «накормили клиента и ждём» — он только тормозит хвост.
Это тоже урок. Linger — это про сглаживание неравномерности нагрузки, не про «всегда лучше». Если у тебя ровный поток и хочется минимальной end-to-end latency — оставляй 0. Если поток рваный, с пиками и провалами — 5–20 мс убирают пилу. 50 мс в продакшне — почти всегда перебор.
P99 / P99.9 на 100к async-produce — это «сколько занял путь у самого позднего record'а в буфере». Лучшие хвосты на linger=5ms × {lz4, zstd} (~700 мс). Худшие — на linger=50ms × lz4 (~2.5 с). На реальной нагрузке с разреженным потоком эти числа выглядели бы иначе — там P99 определяется бы тем самым 50-мс linger'ом, а не размером буфера.
А что с размером сообщения
Мы взяли 1 КБ — типичный размер JSON-события (id, поля заказа, метаданные). Если поднять payload до 10 КБ:
- Размер на диске пропорционально вырастет.
- Compression-ratio станет лучше — больше повторов внутри одного record'а.
- Throughput по сообщениям упадёт, throughput по байтам останется того же порядка.
Если уменьшить до 100 байт:
- Метаданные record'а (headers, key, partition, offset, CRC, timestamp) станут заметной долей. Compression почти не работает — мало данных в записи.
- Throughput по сообщениям может вырасти до сотен тысяч msg/s — но throughput по байтам упадёт.
Граница — где-то 200–300 байт. Меньше — пиши пачкой в одном record'е (если бизнес-логика позволяет) или соглашайся на накладные. Больше — батчинг и compression работают как ожидается.
Что взять с собой
- Батчинг в Kafka — на стороне продьюсера, per-partition. Сам собирает, сам отправляет. Размер регулируется
ProducerBatchMaxBytesиProducerLinger. - Дефолт franz-go v1.21.0 —
ProducerLinger=10ms. Это уже не «отправляй сразу», но и не агрессивный linger: естественные батчи набираются толще, чем при 0, и end-to-end latency почти не растёт. - linger > 10ms нужен для разреженного потока, чтобы дать накопиться. На плотном потоке от него только хвостовая latency растёт — дешевле оставить дефолт или явный
ProducerLinger(0). - Compression работает на батч. На тонком батче compression не даёт ничего; на толстом — даёт всё.
- Кодеки.
lz4дёшево по CPU и берёт умеренно.zstdдороже, но ratio в 2–4× на JSON. На случайных байтах никакой кодек не поможет. MaxBufferedRecords— лимит после которогоProduceблокируется. Дефолт 10 000 в реальной нагрузке нужно поднимать; иначе backpressure будет упираться в буфер, а не в брокер.- При sizing'е партиций смотри на скорость записи в одну партицию, а не на топик. Слишком много партиций при низком потоке = тонкие батчи = плохой compression-ratio.
В Ошибки, retries и headers разберёмся с классами ошибок продьюсера, retry/timeout-настройками и headers — это последний кусок, после которого продьюсер у нас будет полным.
Запуск
Стенд из корня репозитория должен быть поднят (docker compose up -d). Дальше:
make runЭто создаст 9 топиков и прогонит матрицу с дефолтами (100 000 × 1024 байт). Прогон занимает примерно 30 секунд на ноуте.
Меньшую нагрузку можно гонять для быстрых итераций:
make run MESSAGES=10000 PAYLOAD=512Между прогонами полезно прибирать топики, иначе размер на диске просто складывается:
make topic-deleteОписать партиции:
make topic-describe