Ключи и партиционирование
В прошлом модуле Brew перешёл на Kafka: order-service пишет заказы в brew.orders.v1, мы записали 10 сообщений и увидели, как пара (partition, offset) ложится на лог. Тогда было обещано: про partitioner — отдельный разговор. Вот он.
Тема, на которой ломаются почти все, кто только пришёл в Kafka. Ключ в сообщении выглядит как мелочь — просто строка рядом с payload'ом. На самом деле он определяет порядок обработки внутри потока. Перепутал ключ — потерял order. Поменял число партиций — потерял order для тех же ключей задним числом. Для Brew это значит, что события одного заказа разъедутся по партициям и кухня увидит отмену раньше оформления. Эта лекция — про то, как именно это работает.
Зачем вообще ключ
Топик в Kafka — это не одна очередь. Внутри он разбит на партиции, и каждая партиция — независимый упорядоченный лог. Между партициями порядка нет. Это первая опорная точка.
Когда продюсер записывает сообщение без ключа, partitioner волен раскидывать его как угодно — реализаций много. Запись 100 идёт в партицию 2, запись 101 — в партицию 0, запись 102 — туда, где сейчас свободнее.
Когда продюсер ставит ключ — partitioner обязан превратить этот ключ в номер партиции детерминированно. Хеш от ключа, mod на количество партиций. Один и тот же ключ всегда едет в одну и ту же партицию (пока число партиций не меняется — оговорка, к ней вернёмся).
Зачем это нужно. Возьмём поток заказов Brew: топик brew.orders.v1, в нём жизненный цикл разных заказов. Клиент оформляет заказ. Потом платёж проходит. Потом бариста отдаёт напиток. Потом — отмена. Если эти события одного заказа (а это законная тройка-список, поэтому пронумерую):
OrderPlaced— клиент разместил заказ.PaymentReceived— платёж прошёл.OrderReady— напиток готов (илиCANCELLED, если клиент передумал).
лягут в три разные партиции — три разных консьюмера (kitchen-service, payment-service, analytics-service) прочитают их в произвольном порядке относительно друг друга. Можно увидеть OrderReady раньше, чем OrderPlaced. Логика consumer'а сходит с ума.
Решение — ключ order_id. Все события одного заказа летят в одну партицию, читаются строго в порядке записи. Между разными заказами порядок нас не интересует — у них вообще ничего общего. Альтернатива — ключ shop_id: тогда в одну партицию садится весь поток конкретной кофейни, упорядоченный по точке. Что выбрать — зависит от того, что в этом потоке обязано быть консистентным: жизненный цикл одного заказа (order_id) или последовательность событий одной точки (shop_id).
Принцип короткий: что должно быть упорядочено — должно жить в одной партиции, что должно быть в одной партиции — должно иметь одинаковый ключ.
Как работает дефолтный partitioner
В franz-go дефолт — kgo.UniformBytesPartitioner. Эквивалент Java-клиента Kafka: для записей с ключом он считает murmur2(key) mod N, где N — число партиций. Без ключа — sticky-стратегия: одна партиция держится «горячей», в неё льётся всё, что приходит подряд, до тех пор пока батч не отправлен или не достиг лимита; потом sticky прыгает на следующую.
Murmur2 тут выбран намеренно. Распределение хорошее, по CPU дёшево, и главное — совместимо с Java. Если ты пишешь в один топик из Go-сервиса и из Java-сервиса — оба зальют сообщение с ключом order-42 в одну и ту же партицию. Для Brew это важно: часть сервисов на Go, и при миграции/гибридном стеке заказ с одним order_id обязан садиться в одну партицию независимо от того, какой клиент его записал.
Стоит проговорить, что хеш считается по сырым байтам ключа. Никакой нормализации перед хешированием — байты есть байты. "ORDER-42" и "order-42" — разные ключи и поедут в разные партиции с вероятностью, близкой к 1. Если у тебя в данных где-то прошёл case-фолд или trim в одной части пайплайна и не прошёл в другой — потерял ordering, и ловить это будешь долго.
Что произойдёт при изменении числа партиций
Сценарий частый: топик создавался когда-то с 3 партициями, нагрузка выросла, увеличили до 6. И тут возникает грустная штука.
hash(key) mod N — это функция от N. Меняешь N — меняется результат для тех же ключей. Запись с ключом order-42, которая до изменения летела в партицию 1, после изменения может пойти в партицию 4. Старые события этого заказа остались в партиции 1, новые ложатся в партицию 4 — порядок между ними потерян.
Из этого два следствия:
- Уменьшать число партиций нельзя в принципе. Kafka просто не даёт. Increase partitions — да, decrease — нет. Никогда. Чтобы «уменьшить», нужно создавать новый топик и переливать туда данные руками.
- Увеличивать можно, но осторожно. Это операция, которая требует решения на уровне дизайна. Либо ты готов к временной поломке ordering для тех ключей, что окажутся «переразложенными». Либо делаешь это в момент, когда нет live-трафика. Либо переходишь на consistent hashing на уровне приложения. Либо — что чаще всего — глушишь продюсеров на короткое окно, ждёшь, пока консьюмеры съедят бэклог по старым партициям, и потом расширяешь. Стандартного безболезненного способа нет.
Поэтому в проде обычно стараются угадать число партиций сразу с запасом. Лучше иметь 24 партиции и недоиспользовать половину, чем потом героически решать проблему increase'а.
В лекции мы создаём топики на 3 партициях — этого хватает, чтобы увидеть распределение, и хватает, чтобы приоритетное/обычное разделение в custom-partitioner не выродилось.
Custom partitioner — когда дефолта мало
Дефолтный partitioner оптимизирован под одно: размазать нагрузку равномерно. Это правильно для почти всех случаев. Но иногда у тебя есть приоритетный поток, который нельзя смешивать с обычным.
Пример из Brew. У заказов есть приоритет по уровню лояльности (loyalty_tier): заказы из VIP-кофеен и клиентов высшего тира получают SLA на обработку — median latency не выше 100ms (быстро в готовку, чтобы напиток не остыл к выдаче). Обычные заказы SLA не имеют, всё в режиме best effort. Если оба потока сидят в одном топике с дефолтным partitioner'ом, приоритетный заказ может оказаться в той же партиции, что и всплеск обычных в промо-пятницу, и встанет в очередь за чужим бэклогом. Просадка SLA — гарантированно.
Решение — выделенная партиция (или партиции) под приоритетный поток и отдельный консьюмер с приоритетом на этой партиции. Custom partitioner кладёт записи туда, куда нужно по бизнес-правилу. В демо технические префиксы ключей — prem- (приоритетный заказ) и reg- (обычный): по ним partitioner и принимает решение.
Интерфейс в franz-go простой. kgo.Partitioner — фабрика топик-уровневых partitioner'ов:
type Partitioner interface {
ForTopic(string) TopicPartitioner
}Под топик создаётся TopicPartitioner, у которого две основные функции:
type TopicPartitioner interface {
RequiresConsistency(*Record) bool
Partition(*Record, int) int
}Partition(rec, n) возвращает индекс партиции от 0 до n-1. RequiresConsistency(rec) отвечает на вопрос «обязательно ли положить именно туда?» — если true, клиент будет ждать недоступную партицию; если false, может перенаправить в другую при сбое лидера.
Для приоритетного потока мы хотим строгую гарантию (только в партицию 0, без альтернатив). Для обычных заказов — round-robin между двумя оставшимися, и тут консистентность не нужна, всё равно куда лечь, лишь бы записалось.
Что делает наш код
В лекции два бинарника. Один показывает дефолтное поведение с ключами, второй — пример своего partitioner'а.
keyed-producer
Пишет 1000 сообщений в lecture-02-01-keyed (учебный демо-топик; в проде это был бы brew.orders.v1), ключ — order-{i mod 10}, то есть 10 уникальных заказов на 1000 сообщений по 100 событий каждому. Топик — 3 партиции, replication factor 3.
После записи код собирает две таблицы. Первая — сколько сообщений ушло в каждую партицию, чтобы видеть общую раскладку. Вторая — на какие партиции разъехался каждый ключ. Если partitioner работает корректно (а он работает), у каждого ключа должна быть ровно одна партиция в столбце.
Вот сам цикл записи — голый ProduceSync плюс аккумуляция статистики:
for i := 0; i < o.messages; i++ {
key := fmt.Sprintf("order-%d", i%o.orders)
val := fmt.Sprintf("event-%d", i)
rec := &kgo.Record{
Topic: o.topic,
Key: []byte(key),
Value: []byte(val),
}
rpcCtx, rpcCancel := context.WithTimeout(ctx, 10*time.Second)
res := cl.ProduceSync(rpcCtx, rec)
rpcCancel()
if err := res.FirstErr(); err != nil {
return fmt.Errorf("produce %d: %w", i, err)
}
got := res[0].Record
if _, ok := keyToPartitions[key]; !ok {
keyToPartitions[key] = make(map[int32]int)
}
keyToPartitions[key][got.Partition]++
partitionCount[got.Partition]++
}got.Partition — то, что вернул брокер. Накапливаем по два среза: распределение по партициям всего и распределение по партициям внутри каждого ключа.
Что увидишь в выводе:
пишем 1000 сообщений с ключами order-0..order-9 в топик "lecture-02-01-keyed" (3 партиций)
распределение по партициям:
PARTITION COUNT
0 300
1 400
2 300
в какую партицию ложился каждый ключ:
KEY PARTITION COUNT NOTE
order-0 1 100 one-key-one-partition ✓
order-1 1 100 one-key-one-partition ✓
order-2 0 100 one-key-one-partition ✓
order-3 0 100 one-key-one-partition ✓
order-4 2 100 one-key-one-partition ✓
order-5 2 100 one-key-one-partition ✓
order-6 0 100 one-key-one-partition ✓
order-7 1 100 one-key-one-partition ✓
order-8 2 100 one-key-one-partition ✓
order-9 1 100 one-key-one-partition ✓
сверка с end offsets из лога:
PARTITION LATEST
0 300
1 400
2 300
TOTAL 1000Что важно. Во-первых, у каждого ключа ровно одна партиция — one-key-one-partition ✓ стоит у всех. Это и есть гарантия дефолтного partitioner'а: все события одного заказа едут в одну партицию и читаются по порядку. Во-вторых, между ключами раскладка детерминированная: order-2, order-3, order-6 всегда в партиции 0 (на 3-партиционном топике с murmur2), order-0, order-1, order-7, order-9 — в партиции 1, order-4, order-5, order-8 — в партиции 2. На 4 партициях раскладка будет другая — и та же. На 6 партициях — снова другая. Меняется N — меняется всё.
В-третьих, по партициям получилось 300/400/300 — небольшой перекос. На 10 уникальных ключах murmur2 раскидал их 3/4/3, отсюда и сдвиг: 10 — слишком мало для идеально ровного хеша на 3 партициях. На реальном потоке заказов с тысячами уникальных order_id было бы заметно ровнее.
custom-partitioner
Тот же сценарий, но с собственным partitioner'ом. Топик lecture-02-01-custom на 3 партициях с RF=3, шлём 1000 сообщений. По дефолту 30% записей — приоритетные (ключ начинается с prem-, это заказы из VIP-кофеен и высшего тира лояльности), остальные — обычные (ключ reg-N).
Приоритетные всегда в партиции 0. Обычные — round-robin между 1 и 2. Никакого приоритетного потока в partition 1 или 2. Никаких обычных заказов в partition 0.
Сама реализация partitioner'а — это маленький тип:
type premiumTopicPartitioner struct {
rr int
}
func (p *premiumTopicPartitioner) RequiresConsistency(r *kgo.Record) bool {
return bytes.HasPrefix(r.Key, []byte(premiumPrefix))
}
func (p *premiumTopicPartitioner) Partition(r *kgo.Record, n int) int {
if n <= 0 {
return 0
}
if bytes.HasPrefix(r.Key, []byte(premiumPrefix)) {
if premiumPart < n {
return premiumPart
}
return 0
}
// round-robin между rrFirst и rrSecond
choice := rrFirst
if p.rr%2 == 1 {
choice = rrSecond
}
p.rr++
return choice
}RequiresConsistency для приоритетного потока возвращает true — без альтернатив, ждём именно партицию 0. Для обычных заказов — false, round-robin'у безразлично, кому лечь. Состояние счётчика rr без mutex'а: внутри одного топика в franz-go partitioner вызывается без конкуренции (см. doc клиента).
Передача partitioner'а клиенту — одна опция:
cl, err := kafka.NewClient(
kgo.RecordPartitioner(premiumPartitioner{}),
)Дальше тот же ProduceSync, никакой разницы с keyed-producer.
После 1000 записей программа считает три инварианта и печатает их явно:
проверки:
✓ все премиум-записи лежат в партиции 0
✓ regular-записи не зашли в премиум-партицию 0
✓ round-robin сбалансирован: P1=343, P2=342 (skew=1 ≤ 34)Толерантность для round-robin'а — 5% от суммы. Тут разъехалось на 1 запись из 685 — отлично.
В выводе таблица по партициям тоже наглядная:
распределение по партициям:
PARTITION TOTAL PREMIUM REGULAR NOTE
0 315 315 0 premium-only ожидаем
1 343 0 343 round-robin для regular
2 342 0 342 round-robin для regularЦифры под --premium-pct=30 детерминированы (PCG со стабильным seed) и складываются как 315/343/342. На других значениях -premium-pct будет другой расклад.
Что взять с собой
После лекции в голове должно сложиться примерно такое:
- Ключ — это детерминированная адресация в партицию. Один ключ → одна партиция, пока N не меняется.
- В дефолтном partitioner'е без ключа — sticky, с ключом —
murmur2(key) mod N. Совместимо с Java-клиентом по дефолту. - Менять число партиций — больно. Decrease — невозможно. Increase ломает ordering для перераспределённых ключей. Планируй на старте.
- Свой partitioner — это два метода (
Partition,RequiresConsistency) плюсForTopic. Применяй, когда дефолтное «равномерно по всем партициям» не годится по делу — например, нужно изолировать приоритетный поток заказов (VIP-кофейни, высший тир лояльности) в выделенной партиции и консьюмить его отдельно от общего бэклога.
В следующей лекции (Acks и durability) — про acks и durability. Тоже одна крошечная опция, которая определяет, потеряешь ты данные при падении брокера или нет.
Запуск
Стенд должен быть поднят (docker compose up -d из корня).
Ключевой сценарий:
make run-keyedCustom partitioner:
make run-customПоднять долю приоритетных заказов до 80%, чтобы увидеть, как партиция 0 заглатывает большую часть потока:
make run-custom PREMIUM_PCT=80Описать оба топика через kafka-topics.sh --describe:
make topic-describeПрочитать топик через CLI-консьюмер (по умолчанию keyed):
make consume-cli # keyed-producer
make consume-cli T=lecture-02-01-custom # custom-partitionerПрибрать после лекции:
make topic-delete