Kafka CookbookОсновыТопики и партиции
0 / 42 (0%)

Топики и партиции

В пятницу утром Brew запустил промо «бесплатный кофе по пятницам»: подержал клиент в приложении кнопку три секунды - в очередь падает бесплатный заказ. К одиннадцати order-service забуксовал. 8000 заказов в минуту в один топик brew.orders.v1 с одной партицией. Одна нода-leader, один append-канал, один CPU на запись, одна сетевая карта. Дёшево упереться.

Эта лекция про то, как нарезать топик на несколько партиций и зачем.

Что такое топик

Топик это именованный канал, в который пишут продьюсеры и из которого читают консьюмеры. На уровне модели тут всё просто: называешь топик brew.orders.v1 - туда летят заказы, называешь brew.payments.v1 - платежи. Никаких exchange'ей и routing key'ев, как в RabbitMQ. Сообщения просто лежат под этим именем.

На физическом уровне (это уже трогали в Архитектуре и KRaft) топик это директория на диске брокера. Внутри директории лог-файлы, разбитые на сегменты. Когда сегмент дорастает до настройки segment.bytes или segment.ms, он закрывается, открывается новый. Старые сегменты выкидывает retention (про retention - лекция Offsets и retention). Это всё, что есть на физическом уровне. Без всякой магии.

Имя brew.orders.v1 - не случайное. Brew выбрал схему <домен>.<сущность>.v<версия>, чтобы при breaking change в формате события можно было создать brew.orders.v2 рядом, продюсить туда новую схему, дать консьюмерам мигрировать. Старый brew.orders.v1 живёт, пока кто-то его читает, потом удаляется. Версионирование на уровне топика - типичный паттерн в Kafka, потому что внутри топика схема одна, развивать её in-place больно.

Но если бы топик был одним файлом, всё упиралось бы в одну ноду. Один диск, один CPU, одна сетевая карта, один процесс брокера. Эту картину Brew увидел в пятницу. Поэтому топик режется на партиции.

Партиция - единица параллелизма

Партиция это шард топика. Самостоятельный append-only лог. Топик brew.orders.v1 с тремя партициями это три независимых лога: brew.orders.v1-0, brew.orders.v1-1, brew.orders.v1-2. Каждая партиция живёт на своём наборе брокеров (про RF и ISR будет Репликация и ISR), и у каждой свой leader - тот брокер, через которого идёт запись.

Сравнение для бэкендера. Если бы ты в PostgreSQL шардировал таблицу orders по customer_id на 16 шардов через 4 машины, каждая машина стала бы мини-leader-ом своего среза. Партиция в Kafka это та же идея, только встроена в брокер: шардирование «из коробки», вся механика рассылки писем продьюсеру про новый leader, перебалансировки при падении и репликации внутри партиции - готова.

Из этой модели растут все остальные свойства Kafka:

  • запись параллелится: продьюсеры одновременно пишут в разные партиции через разных leader'ов;
  • чтение параллелится: несколько консьюмеров одной группы делят между собой партиции (один потребитель на партицию максимум, см. Первый консьюмер);
  • хранилище горизонтально: больше партиций - равномернее раскладка по брокерам;
  • порядок гарантируется только внутри партиции, между партициями его в общем виде нет.

Последний пункт - главный камень преткновения для новичков. Brew хотел «упорядочи мне все события заказа», и Kafka честно отвечает: упорядочу внутри партиции. Между партициями порядок не определён. Если важно, чтобы все события одного order_id шли в правильной последовательности (сначала OrderPlaced, потом PaymentReceived, потом KitchenStarted, потом OrderReady, потом OrderDelivered) - клади order_id в ключ. Все события одного заказа попадут в одну партицию.

Как сообщение попадает в партицию

Когда продьюсер пишет, он отдаёт (topic, key, value). Дальше клиент решает, в какую партицию положить:

  • если key пустой - sticky-стратегия: дефолтный kgo.UniformBytesPartitioner (KIP-794, в Java-клиенте с 3.3) копит ~64 KiB в одну партицию, потом переключается. Альтернатива - явный kgo.RoundRobinPartitioner;
  • если key есть - partition = hash(key) mod N, где N это число партиций.

Хеш по умолчанию murmur2 (как в Java-клиенте, чтобы Go и Java писали в одну и ту же партицию для одного ключа). Сменить стратегию можно опцией kgo.RecordPartitioner - передаёшь туда kgo.RoundRobinPartitioner, kgo.StickyKeyPartitioner или собственную реализацию интерфейса kgo.Partitioner. Подробнее в Ключи и партиционирование, тут пока важен сам факт: ключ через простую арифметику решает партицию.

Из этой формулы вытекает свойство, которое на собеседованиях любят. Сообщения с одинаковым ключом попадают в одну и ту же партицию. Гарантировано. И, значит, прочитаются по порядку. Это «one-key, one-partition, one-order» гарантия.

plaintext
                       topic = brew.orders.v1, partitions = 3
                       hash(key) mod 3
 
  key="order-001"  ──┐                          ┌──> partition-0
  key="order-004"  ──┤   (h(k)%3 == 0)          │    [r0 r1 r2 ...]
  key="order-007"  ──┘                          │

  key="order-002"  ──┐                          ├──> partition-1
  key="order-005"  ──┤   (h(k)%3 == 1)          │    [r0 r1 r2 ...]
  key="order-008"  ──┘                          │

  key="order-003"  ──┐                          └──> partition-2
  key="order-006"  ──┤   (h(k)%3 == 2)               [r0 r1 r2 ...]
  key="order-009"  ──┘

Каждая партиция - упорядоченная последовательность записей с возрастающим offset'ом (r0, r1, r2, …). Между партициями offset'ы не сравниваются - у каждой свой счётчик с нуля.

Выбор ключа: чем заплатишь

Ключ не декорация. От него зависит, что Kafka умеет, а что нет. Brew перебрал четыре кандидата на топик brew.orders.v1 и прошёлся по последствиям:

  • пустой ключ - самая простая схема: kgo раскидывает sticky-батчами по партициям, нагрузка распределяется ровно, throughput максимальный. Минус один и главный: порядок событий одного заказа теряется. OrderPlaced может оказаться в партиции 2, PaymentReceived - в партиции 0. Консьюмер, читающий партиции параллельно, увидит платёж раньше заказа.
  • order_id - сохраняет порядок событий одного заказа. Все события OrderPlacedPaymentReceivedKitchenStartedOrderReadyOrderDelivered для заказа order-123 идут в одну партицию. Ключей миллионы (каждый заказ свой), распределение почти случайное (hash(uuid) это и есть равномерная случайность). Аналитика по конкретному клиенту требует перебора всех партиций.
  • customer_id - все события одного клиента в одной партиции. Полезно, если консьюмер строит профиль клиента в локальном кеше: можно держать кеш per-partition, не синхронизируя его с соседями. Минус: «горячий» клиент (корпоративный аккаунт, скидающий 1000 заказов в день) перекосит партицию - партиция Pareto, остальные простаивают.
  • shop_id - все заказы кофейни в одной партиции. Звучит логично для kitchen-service: бариста одной кофейни видит свой ровный поток без шума соседей. На практике рушится в пятницу: топовая кофейня в центре города даёт 30% всего трафика и её партиция тащит треть промо.

Brew выбрал order_id для топиков заказов и платежей (важен порядок событий внутри заказа) и shop_id для кухонного топика (бариста группирует свою кофейню, и пиковая партиция терпима, потому что и людей в той кофейне больше). Универсального правильного ответа нет: каждый выбор оптимизирует одну штуку и ломает другую. Дальше в курсе (Keys & Partitioning) разбираем выбор ключа под микроскопом.

Почему число партиций нельзя уменьшать

Тут штука, на которой спотыкается каждый второй. Партиции в топик можно только добавлять. Уменьшить никак. Никаким admin-запросом. Никаким alter.

Причина в формуле hash(key) mod N. Brew создал brew.orders.v1 с N=3, год писал туда заказы. Каждый ключ попадал в свою партицию. Если завтра сделать N=2, для того же ключа hash(key) mod 2 даст совсем другое число. То есть данные одного и того же ключа окажутся «исторически в одной партиции, новыми в другой». Вся гарантия порядка по ключу разваливается. Kafka просто не предлагает такой операции - нет UI ошибки нажать.

Расширение тоже ломает раскладку. При N=3 → N=4 ровно те же ключи, что попадали в партицию 0, теперь могут попасть в любую из четырёх. Поэтому kafka-topics --alter --partitions обычно делают на свежий топик или когда временное нарушение порядка по ключу допустимо. На прод-топиках это не операция «по щелчку».

Практический вывод: число партиций надо прикинуть заранее с запасом. Партиций столько, сколько диктует ожидаемая throughput / per-partition limit. «10 партиций потому что круглое число» - плохой ориентир. Грубая прикидка: партиция держит ~10-20 MB/s записи и столько же чтения. Если ждёшь 100 MB/s, партиций хочется минимум 6-8, лучше с запасом до 12-16. Это эмпирика - детально разобрано в Sizing и tuning.

После пятничного промо Brew пересоздал brew.orders.v1 под три партиции вместо одной. Старый топик в имени остался тем же (версию .v1 не поднимали, потому что менялась только раскладка партиций, а не схема событий), но число партиций выросло втрое. Под старые ключи, разумеется, раскидало по-новому, но Brew был готов: на момент пересоздания все события прошлой недели уже были перелиты в S3 для долгого хранения, оперативная аналитика жила в свежем потоке. На стенде курса дальше работаем именно с тремя - чтобы таблицы вывода умещались на экране.

Что показывает программа inspect

cmd/inspect/main.go идёт по трём шагам. Сначала создаёт топик brew.orders.v1 идемпотентно: пробует admin.CreateTopic(ctx, partitions, rf, configs, topic), и если упёрся в kerr.TopicAlreadyExists - значит топик уже есть, описываем его дальше. Поэтому make run второй раз отрабатывает спокойно, печатает текущее состояние. Дальше описывает топик через admin.ListTopics(ctx, "brew.orders.v1") - под капотом это metadata-запрос к брокеру, возвращает TopicDetails (мапу имя → детали: TopicID, флаг internal, число партиций, per-partition leader/replicas/ISR). И, наконец, печатает таблицу per-partition: кто leader, на каких нодах реплики и какие из них в ISR.

Флаг -recreate=true сначала удаляет топик, потом создаёт заново. Полезно, чтобы посмотреть, как контроллер раскидывает leader'ов между нодами - kafka-балансер старается распределить лидерство равномерно (тут 3 партиции на 3 ноды → один leader на ноду).

Вот ядро идемпотентного создания. Ошибка TopicAlreadyExists это штатная ситуация, «уже есть»:

go
func ensureTopic(ctx context.Context, admin *kadm.Client, topic string, partitions int32, rf int16) (bool, error) {
    resp, err := admin.CreateTopic(ctx, partitions, rf, nil, topic)
    if err == nil && resp.Err == nil {
        return true, nil
    }
 
    cause := err
    if cause == nil {
        cause = resp.Err
    }
    if errors.Is(cause, kerr.TopicAlreadyExists) {
        return false, nil
    }
    return false, cause
}

После ensureTopic дёргается ListTopics - это и есть тот самый metadata-запрос:

go
details, err := admin.ListTopics(rpcCtx, topic)
td := details[topic]
// td.Topic       - имя
// td.ID          - TopicID (UUID)
// td.Partitions  - мапа partition → детали (Leader, Replicas, ISR, OfflineReplicas)

И сама печать таблицы. Partitions.Sorted() отдаёт срез, отсортированный по номеру партиции:

go
parts := td.Partitions.Sorted()
for _, p := range parts {
    offline := fmt.Sprintf("%v", p.OfflineReplicas)
    if len(p.OfflineReplicas) == 0 {
        offline = "-"
    }
    fmt.Fprintf(tw, "%d\t%d\t%v\t%v\t%s\n",
        p.Partition, p.Leader, p.Replicas, p.ISR, offline)
}

Что в таблице - то и в PartitionDetail. LEADER это p.Leader, REPLICAS это p.Replicas, ISR это p.ISR, OFFLINE это p.OfflineReplicas. Печать просто подставляет числа в строку формата.

Запуск

Стенд должен быть поднят (docker compose up -d из корня репо).

sh
make run

Ожидаемый вывод (id'ы и leader'ы у тебя будут другие):

plaintext
brew-topic "brew.orders.v1" создан: partitions=3 rf=3
 
Topic:       brew.orders.v1
TopicID:     kcFo++q0QQ+xaKj0pnwWGA==
Partitions:  3
 
PARTITION  LEADER  REPLICAS  ISR      OFFLINE
0          2       [2 3 1]   [2 3 1]  -
1          3       [3 1 2]   [3 1 2]  -
2          1       [1 2 3]   [1 2 3]  -

На что обратить внимание:

  • LEADER у каждой партиции свой - нагрузка по записи распределяется по нодам;
  • REPLICAS это три числа (RF=3), и порядок в списке это preferred leader: первая запись в списке - кого контроллер хотел бы видеть лидером (контроллер старается, но не всегда получается сразу - leader-election выбирает живого, не «правильного»);
  • ISR == REPLICAS означает, что все реплики в синхроне; при acks=all запись подтверждается мгновенно (если бы одна нода отвалилась, ISR стал бы короче, см. Репликация и ISR);
  • OFFLINE пустой - все реплики живы.

Сравнить с CLI:

sh
make topic-describe

Эта цель внутри контейнера kafka-1 запускает kafka-topics.sh --describe --topic brew.orders.v1. Картинка та же (поля чуть по-другому называются, но Leader/Replicas/Isr совпадают). В этом и смысл лекции: то, что в дистрибутиве делается shell-скриптом, через franz-go делается одной строкой admin.ListTopics.

Хочешь увидеть, как меняется ассайнмент лидеров?

sh
make topic-recreate

Удалит топик, создаст заново. На освобождённых партициях контроллер выберет лидеров по preferred-replica логике. Запусти пару раз - заметишь, что числа стабильны (контроллер выбирает не случайно), но при удалении и пересоздании id'ы партиций распределяются между нодами по-разному.

После лекции можно убрать за собой:

sh
make topic-delete

Что узнал

  • Топик это именованный канал; на диске директория с сегментами лог-файлов; одна партиция упирается в одну ноду.
  • Партиция это самостоятельный append-only лог внутри топика; единица параллелизма для записи и чтения; шардирование, встроенное в брокер.
  • Сообщение с непустым ключом всегда попадает в одну и ту же партицию через hash(key) mod N. Все события одного заказа Brew с ключом order_id идут по порядку.
  • Выбор ключа это выбор того, что ты готов оптимизировать и что ломать: порядок внутри сущности, локальность кеша, равномерность нагрузки.
  • Партиций можно добавлять, но нельзя уменьшать - иначе раскладка hash(key) mod N ломается ретроактивно, а вместе с ней - все гарантии порядка по ключу.
  • admin.ListTopics отдаёт metadata: per-partition leader, replicas, ISR - этого достаточно, чтобы понять текущее состояние топика без shell'а.

Дальше (Репликация и ISR) копнём, что именно стоит за Replicas и ISR - что значит «реплика отстала», как ISR сжимается при падении ноды и при чём тут min.insync.replicas.

·Модуль 01

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Основы / Топики и партиции