Топики и партиции
В пятницу утром Brew запустил промо «бесплатный кофе по пятницам»: подержал клиент в приложении кнопку три секунды - в очередь падает бесплатный заказ. К одиннадцати order-service забуксовал. 8000 заказов в минуту в один топик brew.orders.v1 с одной партицией. Одна нода-leader, один append-канал, один CPU на запись, одна сетевая карта. Дёшево упереться.
Эта лекция про то, как нарезать топик на несколько партиций и зачем.
Что такое топик
Топик это именованный канал, в который пишут продьюсеры и из которого читают консьюмеры. На уровне модели тут всё просто: называешь топик brew.orders.v1 - туда летят заказы, называешь brew.payments.v1 - платежи. Никаких exchange'ей и routing key'ев, как в RabbitMQ. Сообщения просто лежат под этим именем.
На физическом уровне (это уже трогали в Архитектуре и KRaft) топик это директория на диске брокера. Внутри директории лог-файлы, разбитые на сегменты. Когда сегмент дорастает до настройки segment.bytes или segment.ms, он закрывается, открывается новый. Старые сегменты выкидывает retention (про retention - лекция Offsets и retention). Это всё, что есть на физическом уровне. Без всякой магии.
Имя brew.orders.v1 - не случайное. Brew выбрал схему <домен>.<сущность>.v<версия>, чтобы при breaking change в формате события можно было создать brew.orders.v2 рядом, продюсить туда новую схему, дать консьюмерам мигрировать. Старый brew.orders.v1 живёт, пока кто-то его читает, потом удаляется. Версионирование на уровне топика - типичный паттерн в Kafka, потому что внутри топика схема одна, развивать её in-place больно.
Но если бы топик был одним файлом, всё упиралось бы в одну ноду. Один диск, один CPU, одна сетевая карта, один процесс брокера. Эту картину Brew увидел в пятницу. Поэтому топик режется на партиции.
Партиция - единица параллелизма
Партиция это шард топика. Самостоятельный append-only лог. Топик brew.orders.v1 с тремя партициями это три независимых лога: brew.orders.v1-0, brew.orders.v1-1, brew.orders.v1-2. Каждая партиция живёт на своём наборе брокеров (про RF и ISR будет Репликация и ISR), и у каждой свой leader - тот брокер, через которого идёт запись.
Сравнение для бэкендера. Если бы ты в PostgreSQL шардировал таблицу orders по customer_id на 16 шардов через 4 машины, каждая машина стала бы мини-leader-ом своего среза. Партиция в Kafka это та же идея, только встроена в брокер: шардирование «из коробки», вся механика рассылки писем продьюсеру про новый leader, перебалансировки при падении и репликации внутри партиции - готова.
Из этой модели растут все остальные свойства Kafka:
- запись параллелится: продьюсеры одновременно пишут в разные партиции через разных leader'ов;
- чтение параллелится: несколько консьюмеров одной группы делят между собой партиции (один потребитель на партицию максимум, см. Первый консьюмер);
- хранилище горизонтально: больше партиций - равномернее раскладка по брокерам;
- порядок гарантируется только внутри партиции, между партициями его в общем виде нет.
Последний пункт - главный камень преткновения для новичков. Brew хотел «упорядочи мне все события заказа», и Kafka честно отвечает: упорядочу внутри партиции. Между партициями порядок не определён. Если важно, чтобы все события одного order_id шли в правильной последовательности (сначала OrderPlaced, потом PaymentReceived, потом KitchenStarted, потом OrderReady, потом OrderDelivered) - клади order_id в ключ. Все события одного заказа попадут в одну партицию.
Как сообщение попадает в партицию
Когда продьюсер пишет, он отдаёт (topic, key, value). Дальше клиент решает, в какую партицию положить:
- если
keyпустой - sticky-стратегия: дефолтныйkgo.UniformBytesPartitioner(KIP-794, в Java-клиенте с 3.3) копит ~64 KiB в одну партицию, потом переключается. Альтернатива - явныйkgo.RoundRobinPartitioner; - если
keyесть -partition = hash(key) mod N, где N это число партиций.
Хеш по умолчанию murmur2 (как в Java-клиенте, чтобы Go и Java писали в одну и ту же партицию для одного ключа). Сменить стратегию можно опцией kgo.RecordPartitioner - передаёшь туда kgo.RoundRobinPartitioner, kgo.StickyKeyPartitioner или собственную реализацию интерфейса kgo.Partitioner. Подробнее в Ключи и партиционирование, тут пока важен сам факт: ключ через простую арифметику решает партицию.
Из этой формулы вытекает свойство, которое на собеседованиях любят. Сообщения с одинаковым ключом попадают в одну и ту же партицию. Гарантировано. И, значит, прочитаются по порядку. Это «one-key, one-partition, one-order» гарантия.
topic = brew.orders.v1, partitions = 3
hash(key) mod 3
key="order-001" ──┐ ┌──> partition-0
key="order-004" ──┤ (h(k)%3 == 0) │ [r0 r1 r2 ...]
key="order-007" ──┘ │
│
key="order-002" ──┐ ├──> partition-1
key="order-005" ──┤ (h(k)%3 == 1) │ [r0 r1 r2 ...]
key="order-008" ──┘ │
│
key="order-003" ──┐ └──> partition-2
key="order-006" ──┤ (h(k)%3 == 2) [r0 r1 r2 ...]
key="order-009" ──┘Каждая партиция - упорядоченная последовательность записей с возрастающим offset'ом (r0, r1, r2, …). Между партициями offset'ы не сравниваются - у каждой свой счётчик с нуля.
Выбор ключа: чем заплатишь
Ключ не декорация. От него зависит, что Kafka умеет, а что нет. Brew перебрал четыре кандидата на топик brew.orders.v1 и прошёлся по последствиям:
- пустой ключ - самая простая схема: kgo раскидывает sticky-батчами по партициям, нагрузка распределяется ровно, throughput максимальный. Минус один и главный: порядок событий одного заказа теряется.
OrderPlacedможет оказаться в партиции 2,PaymentReceived- в партиции 0. Консьюмер, читающий партиции параллельно, увидит платёж раньше заказа. order_id- сохраняет порядок событий одного заказа. Все событияOrderPlaced→PaymentReceived→KitchenStarted→OrderReady→OrderDeliveredдля заказаorder-123идут в одну партицию. Ключей миллионы (каждый заказ свой), распределение почти случайное (hash(uuid)это и есть равномерная случайность). Аналитика по конкретному клиенту требует перебора всех партиций.customer_id- все события одного клиента в одной партиции. Полезно, если консьюмер строит профиль клиента в локальном кеше: можно держать кеш per-partition, не синхронизируя его с соседями. Минус: «горячий» клиент (корпоративный аккаунт, скидающий 1000 заказов в день) перекосит партицию - партиция Pareto, остальные простаивают.shop_id- все заказы кофейни в одной партиции. Звучит логично дляkitchen-service: бариста одной кофейни видит свой ровный поток без шума соседей. На практике рушится в пятницу: топовая кофейня в центре города даёт 30% всего трафика и её партиция тащит треть промо.
Brew выбрал order_id для топиков заказов и платежей (важен порядок событий внутри заказа) и shop_id для кухонного топика (бариста группирует свою кофейню, и пиковая партиция терпима, потому что и людей в той кофейне больше). Универсального правильного ответа нет: каждый выбор оптимизирует одну штуку и ломает другую. Дальше в курсе (Keys & Partitioning) разбираем выбор ключа под микроскопом.
Почему число партиций нельзя уменьшать
Тут штука, на которой спотыкается каждый второй. Партиции в топик можно только добавлять. Уменьшить никак. Никаким admin-запросом. Никаким alter.
Причина в формуле hash(key) mod N. Brew создал brew.orders.v1 с N=3, год писал туда заказы. Каждый ключ попадал в свою партицию. Если завтра сделать N=2, для того же ключа hash(key) mod 2 даст совсем другое число. То есть данные одного и того же ключа окажутся «исторически в одной партиции, новыми в другой». Вся гарантия порядка по ключу разваливается. Kafka просто не предлагает такой операции - нет UI ошибки нажать.
Расширение тоже ломает раскладку. При N=3 → N=4 ровно те же ключи, что попадали в партицию 0, теперь могут попасть в любую из четырёх. Поэтому kafka-topics --alter --partitions обычно делают на свежий топик или когда временное нарушение порядка по ключу допустимо. На прод-топиках это не операция «по щелчку».
Практический вывод: число партиций надо прикинуть заранее с запасом. Партиций столько, сколько диктует ожидаемая throughput / per-partition limit. «10 партиций потому что круглое число» - плохой ориентир. Грубая прикидка: партиция держит ~10-20 MB/s записи и столько же чтения. Если ждёшь 100 MB/s, партиций хочется минимум 6-8, лучше с запасом до 12-16. Это эмпирика - детально разобрано в Sizing и tuning.
После пятничного промо Brew пересоздал brew.orders.v1 под три партиции вместо одной. Старый топик в имени остался тем же (версию .v1 не поднимали, потому что менялась только раскладка партиций, а не схема событий), но число партиций выросло втрое. Под старые ключи, разумеется, раскидало по-новому, но Brew был готов: на момент пересоздания все события прошлой недели уже были перелиты в S3 для долгого хранения, оперативная аналитика жила в свежем потоке. На стенде курса дальше работаем именно с тремя - чтобы таблицы вывода умещались на экране.
Что показывает программа inspect
cmd/inspect/main.go идёт по трём шагам. Сначала создаёт топик brew.orders.v1 идемпотентно: пробует admin.CreateTopic(ctx, partitions, rf, configs, topic), и если упёрся в kerr.TopicAlreadyExists - значит топик уже есть, описываем его дальше. Поэтому make run второй раз отрабатывает спокойно, печатает текущее состояние. Дальше описывает топик через admin.ListTopics(ctx, "brew.orders.v1") - под капотом это metadata-запрос к брокеру, возвращает TopicDetails (мапу имя → детали: TopicID, флаг internal, число партиций, per-partition leader/replicas/ISR). И, наконец, печатает таблицу per-partition: кто leader, на каких нодах реплики и какие из них в ISR.
Флаг -recreate=true сначала удаляет топик, потом создаёт заново. Полезно, чтобы посмотреть, как контроллер раскидывает leader'ов между нодами - kafka-балансер старается распределить лидерство равномерно (тут 3 партиции на 3 ноды → один leader на ноду).
Вот ядро идемпотентного создания. Ошибка TopicAlreadyExists это штатная ситуация, «уже есть»:
func ensureTopic(ctx context.Context, admin *kadm.Client, topic string, partitions int32, rf int16) (bool, error) {
resp, err := admin.CreateTopic(ctx, partitions, rf, nil, topic)
if err == nil && resp.Err == nil {
return true, nil
}
cause := err
if cause == nil {
cause = resp.Err
}
if errors.Is(cause, kerr.TopicAlreadyExists) {
return false, nil
}
return false, cause
}После ensureTopic дёргается ListTopics - это и есть тот самый metadata-запрос:
details, err := admin.ListTopics(rpcCtx, topic)
td := details[topic]
// td.Topic - имя
// td.ID - TopicID (UUID)
// td.Partitions - мапа partition → детали (Leader, Replicas, ISR, OfflineReplicas)И сама печать таблицы. Partitions.Sorted() отдаёт срез, отсортированный по номеру партиции:
parts := td.Partitions.Sorted()
for _, p := range parts {
offline := fmt.Sprintf("%v", p.OfflineReplicas)
if len(p.OfflineReplicas) == 0 {
offline = "-"
}
fmt.Fprintf(tw, "%d\t%d\t%v\t%v\t%s\n",
p.Partition, p.Leader, p.Replicas, p.ISR, offline)
}Что в таблице - то и в PartitionDetail. LEADER это p.Leader, REPLICAS это p.Replicas, ISR это p.ISR, OFFLINE это p.OfflineReplicas. Печать просто подставляет числа в строку формата.
Запуск
Стенд должен быть поднят (docker compose up -d из корня репо).
make runОжидаемый вывод (id'ы и leader'ы у тебя будут другие):
brew-topic "brew.orders.v1" создан: partitions=3 rf=3
Topic: brew.orders.v1
TopicID: kcFo++q0QQ+xaKj0pnwWGA==
Partitions: 3
PARTITION LEADER REPLICAS ISR OFFLINE
0 2 [2 3 1] [2 3 1] -
1 3 [3 1 2] [3 1 2] -
2 1 [1 2 3] [1 2 3] -На что обратить внимание:
LEADERу каждой партиции свой - нагрузка по записи распределяется по нодам;REPLICASэто три числа (RF=3), и порядок в списке это preferred leader: первая запись в списке - кого контроллер хотел бы видеть лидером (контроллер старается, но не всегда получается сразу - leader-election выбирает живого, не «правильного»);ISR == REPLICASозначает, что все реплики в синхроне; приacks=allзапись подтверждается мгновенно (если бы одна нода отвалилась, ISR стал бы короче, см. Репликация и ISR);OFFLINEпустой - все реплики живы.
Сравнить с CLI:
make topic-describeЭта цель внутри контейнера kafka-1 запускает kafka-topics.sh --describe --topic brew.orders.v1. Картинка та же (поля чуть по-другому называются, но Leader/Replicas/Isr совпадают). В этом и смысл лекции: то, что в дистрибутиве делается shell-скриптом, через franz-go делается одной строкой admin.ListTopics.
Хочешь увидеть, как меняется ассайнмент лидеров?
make topic-recreateУдалит топик, создаст заново. На освобождённых партициях контроллер выберет лидеров по preferred-replica логике. Запусти пару раз - заметишь, что числа стабильны (контроллер выбирает не случайно), но при удалении и пересоздании id'ы партиций распределяются между нодами по-разному.
После лекции можно убрать за собой:
make topic-deleteЧто узнал
- Топик это именованный канал; на диске директория с сегментами лог-файлов; одна партиция упирается в одну ноду.
- Партиция это самостоятельный append-only лог внутри топика; единица параллелизма для записи и чтения; шардирование, встроенное в брокер.
- Сообщение с непустым ключом всегда попадает в одну и ту же партицию через
hash(key) mod N. Все события одного заказа Brew с ключомorder_idидут по порядку. - Выбор ключа это выбор того, что ты готов оптимизировать и что ломать: порядок внутри сущности, локальность кеша, равномерность нагрузки.
- Партиций можно добавлять, но нельзя уменьшать - иначе раскладка
hash(key) mod Nломается ретроактивно, а вместе с ней - все гарантии порядка по ключу. admin.ListTopicsотдаёт metadata: per-partition leader, replicas, ISR - этого достаточно, чтобы понять текущее состояние топика без shell'а.
Дальше (Репликация и ISR) копнём, что именно стоит за Replicas и ISR - что значит «реплика отстала», как ISR сжимается при падении ноды и при чём тут min.insync.replicas.