Sizing и tuning
Лекция Retention и compaction была про то, как Kafka режет лог на сегменты и что с ними потом делает retention/compaction. Эта — на ступень выше. Фокус смещается с «как работает retention» на «какой retention я ставлю на этот топик и почему именно такой». Пара ручек на топике решает, во что превратится диск и как кластер ведёт себя в инциденте. Хочется понимать, что выбирает оператор.
У Brew две принципиально разные нагрузки, и sizing считается под пик, а не под среднее. Обычный день — заказы текут ровным фоном. Промо «бесплатный кофе по пятницам» — brew.orders.v1 ловит около 8000 заказов в минуту, и под этот пик надо посчитать партиции, диск и ручки топика заранее. Партиций «потом докинем» не выйдет красиво (увеличить можно, но это перетряхивает hash-распределение по ключам), а места на диске под промо-пятницу должно хватить ещё до пятницы.
Начнём с партиций, потом — диск, потом пройдёмся по топ-конфигу, и в конце соберём это в три профиля, которые программа topic-profiles создаёт прямо на стенде.
Сколько партиций
Это первое, что спрашивают, когда делают новый топик. Цифру обычно берут «на глаз», и долго потом с этим живут (изменить можно только в большую сторону, и это перестраивает hash-распределение по ключам — больно).
Базовая модель — простая. Партиция это единица параллелизма. Сколько партиций — столько параллельных консьюмеров может работать в одной consumer group. Один консьюмер берёт одну или несколько партиций, две группы поверх одного топика — независимо. Потолок параллелизма для группы равен числу партиций. Ниже — простаивают.
Грубая прикидка такая. Берём пиковую нагрузку в сообщениях в секунду. Прикидываем, сколько успевает обработать один воркер консьюмера на нашей логике (не на голом poll'е — а реально, с базой/HTTP-вызовом/сериализацией). Делим. Получаем минимум. Дальше накидываем запас процентов 30–50 — на бюджет роста и на шероховатости при ребалансе.
Пример. На промо-пятнице brew.payments.v1 идёт на пике 8000 сообщений в секунду, один воркер payment-service тянет 800 (валидация, запись в БД, проверка идемпотентности). 8000 / 800 = 10 — это нижняя граница. С запасом — 12–16 партиций. Шесть точно не хватит, тридцать — лишнее. В обычный день этот поток на порядок ниже, но партиции мы режем под пятницу, иначе в пик payment-service упрётся в потолок параллелизма и платежи начнут отставать.
Вторая сторона — стоимость партиции. Каждая партиция на каждом брокере это файлы на диске, открытые file descriptor'ы, индексы, метаданные. Полтора миллиона партиций на кластер не положат брокер сразу, но контроллер начинает буксовать на ребалансе и старте, ISR ходит кругами, JVM-heap (если у вас ZK-эра) распухает, а fetch-запросы между брокерами становятся хроническим bottleneck'ом. На стенде с тремя нодами это далеко, но «давай поставим 1000 партиций на всякий случай» — не лучшая идея даже тут.
И ещё. Партиции с replication factor=3 это троекратные копии всего на разных брокерах. Если в топике 12 партиций, на кластере физически 36 партиций-реплик. Это не мелочь.
Диск
Диск считается просто, если ничего не забыть. Формула — голая арифметика:
disk = throughput × retention × replication_factor / compression_ratioThroughput — в байтах в секунду (после батчинга и без сжатия). Retention — в секундах. Replication factor — обычно 3. Compression ratio — то, во сколько раз сжимается batch. На JSON-полезной нагрузке zstd даёт ~4×, lz4 ~2.5×, на бинарном protobuf без явных строк — компрессия слабее.
Пример прикидки. Кликстрим brew.clickstream.v1 из мобильного приложения — события по 600 байт, 5000 в секунду на промо-пятнице, retention 7 дней, RF=3, компрессия lz4 (~2.5×):
3000000 байт/с × 604800 с × 3 / 2.5 ≈ 2.18 ТБЭто суммарно по кластеру, не на одну ноду. Чтобы посчитать на ноду — делишь на число нод и проверяешь, что у тебя есть запас под реассайнменты (когда брокер падает и его партиции временно живут не у трёх, а у двух — копии должны разъехаться, а места хватить).
И запас закладывают всегда — как минимум 30%, лучше 50%. На пиках растёт всё: и retention тиков чуть больше, и сжатие хуже, и failover съедает отдельный бюджет. Пустой диск брокера на 95% — близкий incident, как бы убедительно ни звучали «ещё пять процентов»: запись блокируется, контроллер начинает ругаться, и поднимать всё это среди ночи никто не хочет.
Конфиги топика, которые правда стоят настройки
Брокер выставляет дефолты на все ручки. Дефолты — компромисс, рассчитанный на «что-то среднее». На реальных топиках их почти всегда крутят.
Дальше — те ручки, которые хочется знать на пальцах.
cleanup.policy
Две политики из Retention и compaction — delete и compact. Бывает ещё compact,delete (и compact, и retention доедает старое). Выбор — это в первую очередь ответ на вопрос «что в топике лежит». Поток событий — delete. Снимок состояния по ключу — compact. Снимок с TTL «удалять профили старше года, даже если tombstone не пришёл» — compact,delete.
Перепутаешь — упрёшься в боль. На событиях с compact потеряешь все, кроме последних версий per-key (а порядок per-key полезен сам по себе, для аудита). На стейте с delete retention срежет нужные ключи, и downstream останется без актуального состояния.
retention.ms / retention.bytes
Для delete-топиков — главные ручки. retention.ms это TTL по времени (срез по возрасту последней записи в сегменте). retention.bytes — лимит размера лога на партицию (per-partition, не на топик). Срабатывает условие, которое наступит первым.
Ставить только что-то одно — нормально. Ставить оба — нормально, и иногда даже нужно (например, защита от внезапного всплеска: TTL=30d, но если за 24 часа налило больше 100 ГБ — всё равно срежем). На compact-топике обе ручки имеют смысл только при compact,delete, и работают они там как fallback к компакции.
segment.ms / segment.bytes
Сегмент — это файл, в который пишет активный кусок партиции. Закрывается по достижении segment.bytes (1 ГБ дефолт) или по возрасту segment.ms (неделя дефолт). Retention и compaction трогают только закрытые сегменты — это крутили в Retention и compaction.
Дефолт «1 ГБ или неделя» хорош для среднего топика. Под высокий поток метрик (хочется быстрого retention) ставь segment.ms=10m — сегменты будут закрываться часто, retention срабатывает близко к заявленному. На compact-топике, где обновления редкие, segment.ms=1d достаточно. Слишком короткие сегменты — слишком много мелких файлов и метаданных, FD'ов и индексов; слишком длинные — лог не уменьшается, retention «врёт».
min.insync.replicas
С RF=3 ставится min.insync.replicas=2. Это значит: при acks=all запись считается успешной только если её подтвердили хотя бы 2 реплики из 3. Если ISR упал до одного брокера (две ноды лежат), продьюсер с acks=all получит NOT_ENOUGH_REPLICAS и не запишет. Это и есть страховка от split-brain'а: лучше отказать клиенту, чем потерять данные, когда восстановится упавший брокер.
С RF=3 и min.ISR=2 кластер переживает падение одной ноды без потери записи. Двух — нет. Это base sane setting'и для production-топиков; в sandbox'е оставляем те же.
max.message.bytes
Максимальный размер одного record'а в брокере. Дефолт ~1 МБ. Если продьюсер шлёт больше — RecordTooLargeException, сообщение в Kafka не попадает.
Поднимать стоит осознанно. Большие сообщения это:
- Больший network round-trip и хуже батчинг.
- Обязательное согласование с broker-уровневым
message.max.bytesиreplica.fetch.max.bytes— иначе брокеры между собой не реплицируют. - Долгие сегменты, потому что один record «съедает» сразу заметный кусок.
Если у вас в payload 5–10 МБ — обычно правильнее положить blob во внешнее хранилище и в Kafka гнать только ключ.
compression.type
Уровни от none до zstd. Можно ставить и на producer'е (тогда брокер просто хранит сжатый batch как есть), и на топике (брокер пересжимает входящее под свою настройку). Обычно ставят на producer'е — меньше CPU на брокере.
zstd — лучшее сжатие на текстовых/JSON-полезных нагрузках, чуть дороже по CPU. lz4 — дёшево и быстро, ratio послабее. snappy похож на lz4, чуть слабее. none — только если payload уже бинарный и сжатие добавит CPU без выгоды (снимки изображений, например).
В sandbox-профилях ниже cdc идёт под zstd (длинное хранение, экономия диска важна), metrics/orders — под lz4 (быстро и дёшево, retention всё равно режет лог).
unclean.leader.election.enable
Этот флажок обычно оставляют в false. Если включить — при падении лидера и недоступности всех ISR Kafka может выбрать leader'ом replica, которая отстала от ISR. Это означает: записи, которые лидер уже принял, но не успел реплицировать в ISR, исчезают. Тихая потеря данных. Включают только в очень специфичных сценариях, где availability важнее любой потери, и обычно — на временной топик.
message.timestamp.type
Две стратегии. CreateTime — Kafka хранит timestamp, который проставил продьюсер. LogAppendTime — Kafka проставляет свой timestamp в момент append'а на брокере, перетирая то, что прислал клиент.
CreateTime нужен, когда event-time важен для downstream — например, для windowing в Stream processing: концепции. LogAppendTime — когда вы не доверяете часам клиентов и retention важнее event-time. На метриках с агрессивным TTL LogAppendTime стабильнее: retention режет по «времени брокера», а не по тому, что нарисует продьюсер с поехавшими часами.
Три профиля
Это и есть тот скелет, который собирает программа topic-profiles. Программа создаёт три топика, каждый со своим набором настроек, и распечатывает их через DescribeTopicConfigs. Идея — рядом видеть, как одни и те же ручки крутят в разные стороны под разный сценарий Brew.
cdc — partitions=6, RF=3, compact, retention=-1, zstd, max=2 MB
metrics — partitions=12, RF=3, delete, retention=24h, segment=10m, lz4
orders — partitions=12, RF=3, delete, retention=7d, segment=1d, lz4cdc — это про state по ключу, профиль для brew.cdc.public.* (CDC из Postgres, модуль 07). Хранится бесконечно, компактится периодически, max.message.bytes поднят до 2 МБ под крупные снимки таблиц от Debezium. zstd — потому что long-tail хранения, и там CPU на сжатие отбивается дисковым местом многократно.
metrics — это профиль операционной телеметрии brew.telemetry.v1: короткая жизнь и много мелких записей. 12 партиций, чтобы 12 воркеров pulled'или параллельно. Сегмент 10 минут — retention срабатывает близко к 24 часам без больших дрейфов. lz4 — дешёвое сжатие, не тратим CPU на zstd там, где данные всё равно через сутки уйдут. LogAppendTime — потому что timestamp метрики в самом payload'е и так есть, а retention хочется иметь предсказуемый.
orders — профиль brew.orders.v1, недельный buffer для replay. retention=7d, segment=1d — каждый день закрываем новый сегмент, в субботу retention начинает резать понедельник. lz4. CreateTime — потому что заказ может перечитываться задним числом (kitchen-service, analytics-service), и event-time нужен downstream'у. Именно этот профиль режется под пик промо-пятницы: 12 партиций — это запас под 8000 заказов/мин.
Что делает наша программа
cmd/topic-profiles/main.go — это один проход, никакого long-running цикла. Создаёт три топика, дёргает DescribeTopicConfigs и печатает таблицу. Если запустить с -recreate — сначала удалит существующие.
Сами профили зашиты в коде — список profiles(prefix). Вот как выглядит профиль cdc:
{
name: "cdc",
topic: prefix + "-cdc",
parts: 6,
rf: 3,
configs: map[string]*string{
"cleanup.policy": kadm.StringPtr("compact"),
"retention.ms": kadm.StringPtr("-1"),
"min.insync.replicas": kadm.StringPtr("2"),
"compression.type": kadm.StringPtr("zstd"),
"max.message.bytes": kadm.StringPtr("2097152"),
"min.cleanable.dirty.ratio": kadm.StringPtr("0.1"),
"unclean.leader.election.enable": kadm.StringPtr("false"),
"message.timestamp.type": kadm.StringPtr("CreateTime"),
},
rationale: "long-lived state по ключу (CDC brew.cdc.public.*): compact + retention=-1 + zstd, ...",
},Идемпотентное создание — стандартная для курса заготовка. Если топик уже есть — TopicAlreadyExists, и мы переходим в режим AlterTopicConfigs:
resp, err := admin.CreateTopic(rpcCtx, p.parts, p.rf, p.configs, p.topic)
if err == nil && resp.Err == nil {
return nil
}
cause := err
if cause == nil { cause = resp.Err }
if !errors.Is(cause, kerr.TopicAlreadyExists) {
return cause
}
alters := make([]kadm.AlterConfig, 0, len(p.configs))
for k, v := range p.configs {
alters = append(alters, kadm.AlterConfig{Op: kadm.SetConfig, Name: k, Value: v})
}
alterResp, err := admin.AlterTopicConfigs(rpcCtx, alters, p.topic)После создания — DescribeTopicConfigs с retry'ем (на свежесозданном топике метаданные иногда не успевают разойтись по всем брокерам, и первая ошибка — UNKNOWN_TOPIC_OR_PARTITION):
rcs, err := admin.DescribeTopicConfigs(ctx, names...)
retryNeeded := false
for _, rc := range rcs {
if errors.Is(rc.Err, kerr.UnknownTopicOrPartition) {
retryNeeded = true
break
}
}Дальше — простая сборка таблицы: одна колонка на профиль, одна строка на каждую важную ручку. Список ручек зашит в shownConfigs — те самые cleanup.policy, retention.*, segment.*, min.insync.replicas, max.message.bytes, compression.type, unclean.leader.election.enable, message.timestamp.type. Значения форматируются человекочитаемо: 86400000 превращается в 1d (86400000), 1073741824 — в 1.0 GB, -1 для retention.bytes — в -1 (без лимита).
Сравнение партиций и replication factor — отдельным запросом через ListTopics (DescribeConfigs возвращает только конфиги, не layout):
td, err := admin.ListTopics(ctx, names...)
for _, p := range sorted {
t, ok := td[p.topic]
if !ok || t.Err != nil { ... continue }
fmt.Fprintf(tw, "%s\t%d\t%d\n", p.topic, len(t.Partitions), t.Partitions.NumReplicas())
}Запуск
make help # шпаргалка
make run # создать три топика и распечатать таблицу
make run-recreate # сначала удалить, потом создать (детерминированный вывод)
make describe # тот же DescribeTopicConfigs, только через kafka-configs.sh
make topic-delete-all # подчистить за собойПараметры:
PREFIX=my-topic make run # имена будут my-topic-cdc / my-topic-metrics / my-topic-ordersВ выводе make run интересно смотреть три вещи. Колонка CDC — почти все ручки руками выставлены. Колонка METRICS — тоже все, но с радикально другими значениями. Колонка ORDERS — нечто среднее. Если убрать одну из ручек из профиля — она в таблице покажет дефолт брокера, и сразу видно, чего нам не хватало.
Шпаргалка по профилям
| Сценарий | partitions | cleanup | retention | segment | compression | timestamp |
|---|---|---|---|---|---|---|
| CDC / state | 6 | compact | -1 | 7d | zstd | CreateTime |
| Telemetry / metrics | 12 | delete | 24h | 10m | lz4 | LogAppendTime |
| Orders / domain | 12 | delete | 7d | 1d | lz4 | CreateTime |
| Logs (raw) | 6–12 | delete | 3d | 1h | zstd | LogAppendTime |
| Audit | 3–6 | delete | 1y | 7d | zstd | CreateTime |
| Cache (key→value) | 6 | compact,delete | 90d | 1d | lz4 | CreateTime |
Цифры тут не претендуют на «правильные» — это рабочие отправные точки. Дальше крутишь под свой профиль трафика и смотришь на диск через DescribeAllLogDirs и на лаг через kadm.Lag.