Kafka CookbookProducerKeys and partitioning
0 / 42 (0%)

Keys and partitioning

In the previous module Brew moved to Kafka: order-service writes orders to brew.orders.v1, we wrote 10 messages and saw how the (partition, offset) pair maps onto the log. A separate conversation about partitioners was promised. Here it is.

This is the topic that trips up almost everyone new to Kafka. The message key looks like a small thing — just a string next to the payload. In reality it determines processing order within a stream. Get the key wrong — lose order. Change the partition count — lose order for the same keys retroactively. For Brew that means events of a single order scatter across partitions and the kitchen sees the cancellation before the placement. This lecture is about exactly how that works.

Why the key exists

A Kafka topic is not a single queue. Internally it is split into partitions, and each partition is an independent ordered log. There is no ordering between partitions. That is the first anchor point.

When a producer writes a message without a key, the partitioner is free to place it anywhere — there are many implementations. Message 100 goes to partition 2, message 101 to partition 0, message 102 wherever is least busy.

When a producer sets a key, the partitioner must convert that key to a partition number deterministically. Hash the key, mod by partition count. The same key always goes to the same partition (as long as the partition count does not change — a caveat we will return to).

Why this matters. Take the Brew order stream: a topic brew.orders.v1, with the lifecycle of different orders. A customer places an order. Then the payment clears. Then the barista hands over the drink. Then — a cancellation. If these events of a single order (a legitimate list of three, so I will number them):

  1. OrderPlaced — the customer placed the order.
  2. PaymentReceived — the payment cleared.
  3. OrderReady — the drink is ready (or CANCELLED, if the customer changed their mind).

land in three different partitions — three different consumers (kitchen-service, payment-service, analytics-service) will read them in arbitrary order relative to each other. You can see OrderReady before OrderPlaced. Consumer logic breaks.

The solution is an order_id key. All events for a single order go to the same partition, read in strict write order. Ordering between different orders does not matter — they share nothing. An alternative is a shop_id key: then a single coffee shop's entire stream lands in one partition, ordered per shop. Which to pick depends on what must stay consistent in this stream: a single order's lifecycle (order_id) or the sequence of events of a single shop (shop_id).

The principle is short: what must be ordered must live in one partition; what must be in one partition must share the same key.

How the default partitioner works

The default in franz-go is kgo.UniformBytesPartitioner. It mirrors the Kafka Java client: for records with a key it computes murmur2(key) mod N, where N is the partition count. Without a key — sticky strategy: one partition is kept "hot", everything arriving in sequence is poured into it until the batch is sent or hits the size limit; then sticky jumps to the next.

Murmur2 is chosen deliberately. Distribution is good, CPU cost is low, and most importantly — it is compatible with Java. If you write to the same topic from a Go service and a Java service, both will send a message with key order-42 to the same partition. For Brew this matters: some services are in Go, and during a migration or hybrid stack an order with a given order_id must land in one partition regardless of which client wrote it.

Worth stating explicitly: the hash is computed over the raw bytes of the key. No normalization before hashing — bytes are bytes. "ORDER-42" and "order-42" are different keys and will go to different partitions with probability close to 1. If a case-fold or trim happened somewhere in one part of the pipeline but not another — you lost ordering, and tracking it down will take a while.

What happens when the partition count changes

The scenario is common: a topic was created with 3 partitions, load grew, bumped to 6. And here the painful part begins.

hash(key) mod N is a function of N. Change N — change the result for the same keys. A record with key order-42 that went to partition 1 before the change may go to partition 4 after. Old events for that order stayed in partition 1, new ones land in partition 4 — ordering between them is lost.

Two consequences follow:

  • Decreasing the partition count is simply not possible. Kafka does not allow it. Increase partitions — yes, decrease — no. Never. To "decrease", you create a new topic and migrate the data manually.
  • Increasing is possible, but carefully. It is a design-level decision. Either you accept a temporary ordering break for the keys that get re-mapped. Or you do it when there is no live traffic. Or you switch to consistent hashing at the application level. Or — most commonly — you pause producers for a short window, wait for consumers to drain the backlog on the old partitions, then expand. There is no standard painless way.

So in production you typically try to size the partition count with headroom from the start. Better to have 24 partitions and underuse half than to heroically solve an increase problem later.

In this lecture we create topics with 3 partitions — enough to see the distribution, and enough that the priority/regular split in custom-partitioner does not degenerate.

Custom partitioner — when the default is not enough

The default partitioner is optimized for one thing: spread load evenly. That is correct for almost all cases. But sometimes you have a priority stream that must not mix with the regular one.

An example from Brew. Orders have a priority by loyalty tier (loyalty_tier): orders from VIP shops and top-tier customers get a processing SLA — median latency no higher than 100ms (straight into the kitchen, so the drink does not go cold before handover). Regular orders have no SLA, everything is best effort. If both streams sit in the same topic with the default partitioner, a priority order can end up in the same partition as a regular spike during free-coffee-Friday and wait behind someone else's backlog. SLA degradation — guaranteed.

The solution is a dedicated partition (or partitions) for the priority stream and a separate consumer with priority on that partition. The custom partitioner puts records where the business rule dictates. In the demo the technical key prefixes are prem- (priority order) and reg- (regular): the partitioner makes its decision based on them.

The interface in franz-go is simple. kgo.Partitioner is a factory for topic-level partitioners:

go
type Partitioner interface {
    ForTopic(string) TopicPartitioner
}

A TopicPartitioner is created per topic, with two main methods:

go
type TopicPartitioner interface {
    RequiresConsistency(*Record) bool
    Partition(*Record, int) int
}

Partition(rec, n) returns the partition index from 0 to n-1. RequiresConsistency(rec) answers "must the record go exactly there?" — if true, the client waits for the unavailable partition; if false, it can redirect to another on leader failure.

For the priority stream we want a strict guarantee (partition 0 only, no alternatives). For regular orders — round-robin between the two remaining, and here consistency is not needed; it does not matter where it lands as long as it is written.

What the code does

The lecture has two binaries. One demonstrates default behavior with keys, the other shows a custom partitioner.

keyed-producer

Writes 1000 messages to lecture-02-01-keyed (a teaching demo topic; in production this would be brew.orders.v1), key is order-{i mod 10} — 10 unique orders across 1000 messages, 100 events each. Topic — 3 partitions, replication factor 3.

After writing, the code builds two tables. The first shows how many messages went to each partition — the overall distribution. The second shows which partitions each key landed on. If the partitioner works correctly (and it does), each key should have exactly one partition in the column.

The write loop — bare ProduceSync plus statistics accumulation:

go
for i := 0; i < o.messages; i++ {
    key := fmt.Sprintf("order-%d", i%o.orders)
    val := fmt.Sprintf("event-%d", i)
    rec := &kgo.Record{
        Topic: o.topic,
        Key:   []byte(key),
        Value: []byte(val),
    }
 
    rpcCtx, rpcCancel := context.WithTimeout(ctx, 10*time.Second)
    res := cl.ProduceSync(rpcCtx, rec)
    rpcCancel()
    if err := res.FirstErr(); err != nil {
        return fmt.Errorf("produce %d: %w", i, err)
    }
    got := res[0].Record
 
    if _, ok := keyToPartitions[key]; !ok {
        keyToPartitions[key] = make(map[int32]int)
    }
    keyToPartitions[key][got.Partition]++
    partitionCount[got.Partition]++
}

got.Partition is what the broker returned. Accumulated into two slices: overall partition distribution and per-key partition distribution.

What you will see in the output:

plaintext
пишем 1000 сообщений с ключами order-0..order-9 в топик "lecture-02-01-keyed" (3 партиций)
 
распределение по партициям:
PARTITION  COUNT
0          300
1          400
2          300
 
в какую партицию ложился каждый ключ:
KEY      PARTITION  COUNT  NOTE
order-0  1          100    one-key-one-partition ✓
order-1  1          100    one-key-one-partition ✓
order-2  0          100    one-key-one-partition ✓
order-3  0          100    one-key-one-partition ✓
order-4  2          100    one-key-one-partition ✓
order-5  2          100    one-key-one-partition ✓
order-6  0          100    one-key-one-partition ✓
order-7  1          100    one-key-one-partition ✓
order-8  2          100    one-key-one-partition ✓
order-9  1          100    one-key-one-partition ✓
 
сверка с end offsets из лога:
PARTITION  LATEST
0          300
1          400
2          300
TOTAL      1000

What matters. First, each key maps to exactly one partition — one-key-one-partition ✓ appears for all. That is the default partitioner's guarantee: all events of a single order go to one partition and are read in order. Second, the mapping between keys is deterministic: order-2, order-3, order-6 always go to partition 0 (on a 3-partition topic with murmur2), order-0, order-1, order-7, order-9 go to partition 1, order-4, order-5, order-8 end up in partition 2. On 4 partitions the mapping will be different — and stable at that value. On 6 partitions — different again. Change N — everything changes.

Third, the partition split came out 300/400/300 — a slight skew. With 10 unique keys murmur2 spread them 3/4/3, hence the shift: 10 is too few for a perfectly even hash across 3 partitions. On a real order stream with thousands of unique order_id values it would be noticeably more balanced.

custom-partitioner

The same scenario but with a custom partitioner. Topic lecture-02-01-custom with 3 partitions and RF=3, sending 1000 messages. By default 30% of records are priority orders (key starts with prem- — orders from VIP shops and the top loyalty tier), the rest are regular (key reg-N).

Priority orders always go to partition 0. Regular — round-robin between 1 and 2. No priority stream in partition 1 or 2. No regular orders in partition 0.

The partitioner implementation is a small type:

go
type premiumTopicPartitioner struct {
    rr int
}
 
func (p *premiumTopicPartitioner) RequiresConsistency(r *kgo.Record) bool {
    return bytes.HasPrefix(r.Key, []byte(premiumPrefix))
}
 
func (p *premiumTopicPartitioner) Partition(r *kgo.Record, n int) int {
    if n <= 0 {
        return 0
    }
    if bytes.HasPrefix(r.Key, []byte(premiumPrefix)) {
        if premiumPart < n {
            return premiumPart
        }
        return 0
    }
    // round-robin между rrFirst и rrSecond
    choice := rrFirst
    if p.rr%2 == 1 {
        choice = rrSecond
    }
    p.rr++
    return choice
}

RequiresConsistency for the priority stream returns true — no alternatives, we wait for partition 0 specifically. For regular orders — false, round-robin does not care where it lands. The rr counter state has no mutex: within a single topic in franz-go the partitioner is called without concurrency (see the client docs).

Passing the partitioner to the client — one option:

go
cl, err := kafka.NewClient(
    kgo.RecordPartitioner(premiumPartitioner{}),
)

Then the same ProduceSync, no difference from keyed-producer.

After 1000 records the program checks three invariants and prints them explicitly:

plaintext
проверки:
  ✓ все премиум-записи лежат в партиции 0
  ✓ regular-записи не зашли в премиум-партицию 0
  ✓ round-robin сбалансирован: P1=343, P2=342 (skew=1 ≤ 34)

The tolerance for round-robin is 5% of total. Here the difference is 1 record out of 685 — excellent.

The partition table in the output is equally clear:

plaintext
распределение по партициям:
PARTITION  TOTAL  PREMIUM  REGULAR  NOTE
0          315    315      0        premium-only ожидаем
1          343    0        343      round-robin для regular
2          342    0        342      round-robin для regular

The numbers under --premium-pct=30 are deterministic (PCG with a stable seed) and break down as 315/343/342. With other values of --premium-pct the distribution will differ.

Key takeaways

After this lecture, the following should be clear:

  • The key is deterministic addressing to a partition. One key → one partition, as long as N does not change.
  • The default partitioner: without a key — sticky, with a key — murmur2(key) mod N. Compatible with the Java client by default.
  • Changing the partition count is painful. Decrease — impossible. Increase breaks ordering for re-mapped keys. Plan upfront.
  • A custom partitioner is two methods (Partition, RequiresConsistency) plus ForTopic. Use it when the default "spread evenly across all partitions" does not fit the business case — for example, when you need to isolate the priority order stream (VIP shops, top loyalty tier) in a dedicated partition and consume it separately from the general backlog.

The next lecture (Acks and durability) covers acks and durability. Again one tiny option that determines whether you lose data when a broker goes down.

Running

The sandbox must be running (docker compose up -d from the root).

Main scenario:

sh
make run-keyed

Custom partitioner:

sh
make run-custom

Raise the priority-order share to 80% to see partition 0 absorb most of the stream:

sh
make run-custom PREMIUM_PCT=80

Describe both topics with kafka-topics.sh --describe:

sh
make topic-describe

Read the topic via the CLI consumer (keyed by default):

sh
make consume-cli                       # keyed-producer
make consume-cli T=lecture-02-01-custom  # custom-partitioner

Clean up after the lecture:

sh
make topic-delete
·Module 02

This lesson is still ahead

The course goes in order — to open this step, finish the previous ones first. Context builds up without gaps that way.

/ you tried to open
Producer / Keys and partitioning