Kafka CookbookПродьюсерКлючи и партиционирование
0 / 42 (0%)

Ключи и партиционирование

В прошлом модуле Brew перешёл на Kafka: order-service пишет заказы в brew.orders.v1, мы записали 10 сообщений и увидели, как пара (partition, offset) ложится на лог. Тогда было обещано: про partitioner — отдельный разговор. Вот он.

Тема, на которой ломаются почти все, кто только пришёл в Kafka. Ключ в сообщении выглядит как мелочь — просто строка рядом с payload'ом. На самом деле он определяет порядок обработки внутри потока. Перепутал ключ — потерял order. Поменял число партиций — потерял order для тех же ключей задним числом. Для Brew это значит, что события одного заказа разъедутся по партициям и кухня увидит отмену раньше оформления. Эта лекция — про то, как именно это работает.

Зачем вообще ключ

Топик в Kafka — это не одна очередь. Внутри он разбит на партиции, и каждая партиция — независимый упорядоченный лог. Между партициями порядка нет. Это первая опорная точка.

Когда продюсер записывает сообщение без ключа, partitioner волен раскидывать его как угодно — реализаций много. Запись 100 идёт в партицию 2, запись 101 — в партицию 0, запись 102 — туда, где сейчас свободнее.

Когда продюсер ставит ключ — partitioner обязан превратить этот ключ в номер партиции детерминированно. Хеш от ключа, mod на количество партиций. Один и тот же ключ всегда едет в одну и ту же партицию (пока число партиций не меняется — оговорка, к ней вернёмся).

Зачем это нужно. Возьмём поток заказов Brew: топик brew.orders.v1, в нём жизненный цикл разных заказов. Клиент оформляет заказ. Потом платёж проходит. Потом бариста отдаёт напиток. Потом — отмена. Если эти события одного заказа (а это законная тройка-список, поэтому пронумерую):

  1. OrderPlaced — клиент разместил заказ.
  2. PaymentReceived — платёж прошёл.
  3. OrderReady — напиток готов (или CANCELLED, если клиент передумал).

лягут в три разные партиции — три разных консьюмера (kitchen-service, payment-service, analytics-service) прочитают их в произвольном порядке относительно друг друга. Можно увидеть OrderReady раньше, чем OrderPlaced. Логика consumer'а сходит с ума.

Решение — ключ order_id. Все события одного заказа летят в одну партицию, читаются строго в порядке записи. Между разными заказами порядок нас не интересует — у них вообще ничего общего. Альтернатива — ключ shop_id: тогда в одну партицию садится весь поток конкретной кофейни, упорядоченный по точке. Что выбрать — зависит от того, что в этом потоке обязано быть консистентным: жизненный цикл одного заказа (order_id) или последовательность событий одной точки (shop_id).

Принцип короткий: что должно быть упорядочено — должно жить в одной партиции, что должно быть в одной партиции — должно иметь одинаковый ключ.

Как работает дефолтный partitioner

В franz-go дефолт — kgo.UniformBytesPartitioner. Эквивалент Java-клиента Kafka: для записей с ключом он считает murmur2(key) mod N, где N — число партиций. Без ключа — sticky-стратегия: одна партиция держится «горячей», в неё льётся всё, что приходит подряд, до тех пор пока батч не отправлен или не достиг лимита; потом sticky прыгает на следующую.

Murmur2 тут выбран намеренно. Распределение хорошее, по CPU дёшево, и главное — совместимо с Java. Если ты пишешь в один топик из Go-сервиса и из Java-сервиса — оба зальют сообщение с ключом order-42 в одну и ту же партицию. Для Brew это важно: часть сервисов на Go, и при миграции/гибридном стеке заказ с одним order_id обязан садиться в одну партицию независимо от того, какой клиент его записал.

Стоит проговорить, что хеш считается по сырым байтам ключа. Никакой нормализации перед хешированием — байты есть байты. "ORDER-42" и "order-42" — разные ключи и поедут в разные партиции с вероятностью, близкой к 1. Если у тебя в данных где-то прошёл case-фолд или trim в одной части пайплайна и не прошёл в другой — потерял ordering, и ловить это будешь долго.

Что произойдёт при изменении числа партиций

Сценарий частый: топик создавался когда-то с 3 партициями, нагрузка выросла, увеличили до 6. И тут возникает грустная штука.

hash(key) mod N — это функция от N. Меняешь N — меняется результат для тех же ключей. Запись с ключом order-42, которая до изменения летела в партицию 1, после изменения может пойти в партицию 4. Старые события этого заказа остались в партиции 1, новые ложатся в партицию 4 — порядок между ними потерян.

Из этого два следствия:

  • Уменьшать число партиций нельзя в принципе. Kafka просто не даёт. Increase partitions — да, decrease — нет. Никогда. Чтобы «уменьшить», нужно создавать новый топик и переливать туда данные руками.
  • Увеличивать можно, но осторожно. Это операция, которая требует решения на уровне дизайна. Либо ты готов к временной поломке ordering для тех ключей, что окажутся «переразложенными». Либо делаешь это в момент, когда нет live-трафика. Либо переходишь на consistent hashing на уровне приложения. Либо — что чаще всего — глушишь продюсеров на короткое окно, ждёшь, пока консьюмеры съедят бэклог по старым партициям, и потом расширяешь. Стандартного безболезненного способа нет.

Поэтому в проде обычно стараются угадать число партиций сразу с запасом. Лучше иметь 24 партиции и недоиспользовать половину, чем потом героически решать проблему increase'а.

В лекции мы создаём топики на 3 партициях — этого хватает, чтобы увидеть распределение, и хватает, чтобы приоритетное/обычное разделение в custom-partitioner не выродилось.

Custom partitioner — когда дефолта мало

Дефолтный partitioner оптимизирован под одно: размазать нагрузку равномерно. Это правильно для почти всех случаев. Но иногда у тебя есть приоритетный поток, который нельзя смешивать с обычным.

Пример из Brew. У заказов есть приоритет по уровню лояльности (loyalty_tier): заказы из VIP-кофеен и клиентов высшего тира получают SLA на обработку — median latency не выше 100ms (быстро в готовку, чтобы напиток не остыл к выдаче). Обычные заказы SLA не имеют, всё в режиме best effort. Если оба потока сидят в одном топике с дефолтным partitioner'ом, приоритетный заказ может оказаться в той же партиции, что и всплеск обычных в промо-пятницу, и встанет в очередь за чужим бэклогом. Просадка SLA — гарантированно.

Решение — выделенная партиция (или партиции) под приоритетный поток и отдельный консьюмер с приоритетом на этой партиции. Custom partitioner кладёт записи туда, куда нужно по бизнес-правилу. В демо технические префиксы ключей — prem- (приоритетный заказ) и reg- (обычный): по ним partitioner и принимает решение.

Интерфейс в franz-go простой. kgo.Partitioner — фабрика топик-уровневых partitioner'ов:

go
type Partitioner interface {
    ForTopic(string) TopicPartitioner
}

Под топик создаётся TopicPartitioner, у которого две основные функции:

go
type TopicPartitioner interface {
    RequiresConsistency(*Record) bool
    Partition(*Record, int) int
}

Partition(rec, n) возвращает индекс партиции от 0 до n-1. RequiresConsistency(rec) отвечает на вопрос «обязательно ли положить именно туда?» — если true, клиент будет ждать недоступную партицию; если false, может перенаправить в другую при сбое лидера.

Для приоритетного потока мы хотим строгую гарантию (только в партицию 0, без альтернатив). Для обычных заказов — round-robin между двумя оставшимися, и тут консистентность не нужна, всё равно куда лечь, лишь бы записалось.

Что делает наш код

В лекции два бинарника. Один показывает дефолтное поведение с ключами, второй — пример своего partitioner'а.

keyed-producer

Пишет 1000 сообщений в lecture-02-01-keyed (учебный демо-топик; в проде это был бы brew.orders.v1), ключ — order-{i mod 10}, то есть 10 уникальных заказов на 1000 сообщений по 100 событий каждому. Топик — 3 партиции, replication factor 3.

После записи код собирает две таблицы. Первая — сколько сообщений ушло в каждую партицию, чтобы видеть общую раскладку. Вторая — на какие партиции разъехался каждый ключ. Если partitioner работает корректно (а он работает), у каждого ключа должна быть ровно одна партиция в столбце.

Вот сам цикл записи — голый ProduceSync плюс аккумуляция статистики:

go
for i := 0; i < o.messages; i++ {
    key := fmt.Sprintf("order-%d", i%o.orders)
    val := fmt.Sprintf("event-%d", i)
    rec := &kgo.Record{
        Topic: o.topic,
        Key:   []byte(key),
        Value: []byte(val),
    }
 
    rpcCtx, rpcCancel := context.WithTimeout(ctx, 10*time.Second)
    res := cl.ProduceSync(rpcCtx, rec)
    rpcCancel()
    if err := res.FirstErr(); err != nil {
        return fmt.Errorf("produce %d: %w", i, err)
    }
    got := res[0].Record
 
    if _, ok := keyToPartitions[key]; !ok {
        keyToPartitions[key] = make(map[int32]int)
    }
    keyToPartitions[key][got.Partition]++
    partitionCount[got.Partition]++
}

got.Partition — то, что вернул брокер. Накапливаем по два среза: распределение по партициям всего и распределение по партициям внутри каждого ключа.

Что увидишь в выводе:

plaintext
пишем 1000 сообщений с ключами order-0..order-9 в топик "lecture-02-01-keyed" (3 партиций)
 
распределение по партициям:
PARTITION  COUNT
0          300
1          400
2          300
 
в какую партицию ложился каждый ключ:
KEY      PARTITION  COUNT  NOTE
order-0  1          100    one-key-one-partition ✓
order-1  1          100    one-key-one-partition ✓
order-2  0          100    one-key-one-partition ✓
order-3  0          100    one-key-one-partition ✓
order-4  2          100    one-key-one-partition ✓
order-5  2          100    one-key-one-partition ✓
order-6  0          100    one-key-one-partition ✓
order-7  1          100    one-key-one-partition ✓
order-8  2          100    one-key-one-partition ✓
order-9  1          100    one-key-one-partition ✓
 
сверка с end offsets из лога:
PARTITION  LATEST
0          300
1          400
2          300
TOTAL      1000

Что важно. Во-первых, у каждого ключа ровно одна партиция — one-key-one-partition ✓ стоит у всех. Это и есть гарантия дефолтного partitioner'а: все события одного заказа едут в одну партицию и читаются по порядку. Во-вторых, между ключами раскладка детерминированная: order-2, order-3, order-6 всегда в партиции 0 (на 3-партиционном топике с murmur2), order-0, order-1, order-7, order-9 — в партиции 1, order-4, order-5, order-8 — в партиции 2. На 4 партициях раскладка будет другая — и та же. На 6 партициях — снова другая. Меняется N — меняется всё.

В-третьих, по партициям получилось 300/400/300 — небольшой перекос. На 10 уникальных ключах murmur2 раскидал их 3/4/3, отсюда и сдвиг: 10 — слишком мало для идеально ровного хеша на 3 партициях. На реальном потоке заказов с тысячами уникальных order_id было бы заметно ровнее.

custom-partitioner

Тот же сценарий, но с собственным partitioner'ом. Топик lecture-02-01-custom на 3 партициях с RF=3, шлём 1000 сообщений. По дефолту 30% записей — приоритетные (ключ начинается с prem-, это заказы из VIP-кофеен и высшего тира лояльности), остальные — обычные (ключ reg-N).

Приоритетные всегда в партиции 0. Обычные — round-robin между 1 и 2. Никакого приоритетного потока в partition 1 или 2. Никаких обычных заказов в partition 0.

Сама реализация partitioner'а — это маленький тип:

go
type premiumTopicPartitioner struct {
    rr int
}
 
func (p *premiumTopicPartitioner) RequiresConsistency(r *kgo.Record) bool {
    return bytes.HasPrefix(r.Key, []byte(premiumPrefix))
}
 
func (p *premiumTopicPartitioner) Partition(r *kgo.Record, n int) int {
    if n <= 0 {
        return 0
    }
    if bytes.HasPrefix(r.Key, []byte(premiumPrefix)) {
        if premiumPart < n {
            return premiumPart
        }
        return 0
    }
    // round-robin между rrFirst и rrSecond
    choice := rrFirst
    if p.rr%2 == 1 {
        choice = rrSecond
    }
    p.rr++
    return choice
}

RequiresConsistency для приоритетного потока возвращает true — без альтернатив, ждём именно партицию 0. Для обычных заказов — false, round-robin'у безразлично, кому лечь. Состояние счётчика rr без mutex'а: внутри одного топика в franz-go partitioner вызывается без конкуренции (см. doc клиента).

Передача partitioner'а клиенту — одна опция:

go
cl, err := kafka.NewClient(
    kgo.RecordPartitioner(premiumPartitioner{}),
)

Дальше тот же ProduceSync, никакой разницы с keyed-producer.

После 1000 записей программа считает три инварианта и печатает их явно:

plaintext
проверки:
  ✓ все премиум-записи лежат в партиции 0
  ✓ regular-записи не зашли в премиум-партицию 0
  ✓ round-robin сбалансирован: P1=343, P2=342 (skew=1 ≤ 34)

Толерантность для round-robin'а — 5% от суммы. Тут разъехалось на 1 запись из 685 — отлично.

В выводе таблица по партициям тоже наглядная:

plaintext
распределение по партициям:
PARTITION  TOTAL  PREMIUM  REGULAR  NOTE
0          315    315      0        premium-only ожидаем
1          343    0        343      round-robin для regular
2          342    0        342      round-robin для regular

Цифры под --premium-pct=30 детерминированы (PCG со стабильным seed) и складываются как 315/343/342. На других значениях -premium-pct будет другой расклад.

Что взять с собой

После лекции в голове должно сложиться примерно такое:

  • Ключ — это детерминированная адресация в партицию. Один ключ → одна партиция, пока N не меняется.
  • В дефолтном partitioner'е без ключа — sticky, с ключом — murmur2(key) mod N. Совместимо с Java-клиентом по дефолту.
  • Менять число партиций — больно. Decrease — невозможно. Increase ломает ordering для перераспределённых ключей. Планируй на старте.
  • Свой partitioner — это два метода (Partition, RequiresConsistency) плюс ForTopic. Применяй, когда дефолтное «равномерно по всем партициям» не годится по делу — например, нужно изолировать приоритетный поток заказов (VIP-кофейни, высший тир лояльности) в выделенной партиции и консьюмить его отдельно от общего бэклога.

В следующей лекции (Acks и durability) — про acks и durability. Тоже одна крошечная опция, которая определяет, потеряешь ты данные при падении брокера или нет.

Запуск

Стенд должен быть поднят (docker compose up -d из корня).

Ключевой сценарий:

sh
make run-keyed

Custom partitioner:

sh
make run-custom

Поднять долю приоритетных заказов до 80%, чтобы увидеть, как партиция 0 заглатывает большую часть потока:

sh
make run-custom PREMIUM_PCT=80

Описать оба топика через kafka-topics.sh --describe:

sh
make topic-describe

Прочитать топик через CLI-консьюмер (по умолчанию keyed):

sh
make consume-cli                       # keyed-producer
make consume-cli T=lecture-02-01-custom  # custom-partitioner

Прибрать после лекции:

sh
make topic-delete
·Модуль 02

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Продьюсер / Ключи и партиционирование