Kafka CookbookОсновыПервый продьюсер на franz-go
0 / 42 (0%)

Первый продьюсер на franz-go

В Brew наступил день миграции: order-service пишет свой первый OrderPlaced в Kafka, а не в RabbitMQ. Дежурный жмёт деплой, открывает логи и читает строку produce ok partition=2 offset=17. И сразу вопрос - что эти три слова означают на уровне гарантий? Брокер положил запись на диск или просто принял в RAM? Если сейчас выдернуть кабель из лидера, заказ останется или растворится? Запись с offset=17 повторится при ретрае или нет?

Это лекция про producer. Программу, которая берёт событие приложения и кладёт его в партицию. До этого мы смотрели на Kafka со стороны брокера - топики, партиции, ISR, offset, retention. Теперь меняем сторону: код, который пишет.

Цель скромная. Записать 10 сообщений с ключами order-0..order-9, увидеть пары (partition, offset) для каждого и убедиться, что это та самая модель из лекций 01-02 и 01-04. Сложные параметры - идемпотентность, acks, батчинг - проговорим словами и оставим для модуля 02. Тут - голая базовая запись плюс честная инвентаризация гарантий.

kgo.Client - долгоживущий объект

Первый рефлекс HTTP-разработчика - открыть соединение, отправить запрос, закрыть. С Kafka так не работают. kgo.Client в franz-go это долгоживущий объект: один на процесс, один на сервис. У order-service ровно один такой клиент на старте процесса, и он живёт до shutdown'а.

Внутри клиента живёт пул соединений до брокеров, кэш метаданных топиков (кто сейчас leader для каждой партиции), фоновые горутины для батчинга, отправки и ретраев. Создание клиента дорогое - dial до брокеров, обмен metadata-запросами, прогрев. Запись через готового клиента дешёвая - кладёшь Record в буфер, дальше работает фон.

Аналогия для бэкендера. kgo.Client ближе всего к *sql.DB в Go: тоже не одно физическое соединение, а пул, тоже долгоживущий, тоже с фоновыми задачами. Никому не приходит в голову на каждый SQL-запрос делать sql.Open и db.Close() - и тут так же.

Базовая инициализация:

go
cl, err := kgo.NewClient(
    kgo.SeedBrokers("localhost:19092", "localhost:19093", "localhost:19094"),
    kgo.ClientID("order-service"),
)
if err != nil { ... }
defer cl.Close()

SeedBrokers это точки входа, не «полный список брокеров». Клиент стучится к любому из перечисленных адресов, через него получает актуальный список всех нод кластера и дальше работает уже с ним. Поэтому достаточно указать один-три адреса; в курсе пишем все три на случай, если кто-то лежит в момент старта.

В лекционном коде клиент создаётся через internal.kafka.NewClient - обёртка с дефолтами курса. SeedBrokers подхватываются из KAFKA_BOOTSTRAP, ClientID lectures, плюс разумные таймауты на dial и retry. Если нужны нестандартные опции - передаём их вторым аргументом, они дописываются последними и могут перетирать дефолты (этим, например, пользуются транзакционные продьюсеры в Транзакции и EOS).

kgo.Record - что мы вообще пишем

kgo.Record это структура, которая описывает одно сообщение. Не JSON, не protobuf, не string - просто описание байтов плюс адресация.

go
rec := &kgo.Record{
    Topic:     "brew.orders.v1",
    Key:       []byte("order-7"),
    Value:     []byte(`{"type":"OrderPlaced","order_id":"order-7","shop":"baker-st","total":420}`),
    Headers:   []kgo.RecordHeader{{Key: "type", Value: []byte("OrderPlaced")}},
    Partition: -1, // -1 = доверить partitioner'у; явный номер тоже можно
}

Ключевые свойства, которые удобно проговорить сразу:

  • Key и Value это []byte. Сериализация (JSON / Protobuf / Avro) - твоя ответственность. Брокер данные не парсит и тип не проверяет: для него это просто байты с длиной.
  • Topic обязателен. Partition обычно нет - partitioner сам выберет, ориентируясь на ключ (про это Ключи и партиционирование).
  • Headers это []byte-пары рядом с payload'ом. Туда складывают trace_id, message_type, source_service - всё, что удобно прочитать без десериализации тела. Развёрнуто разберём в Ошибки, retries и headers, здесь только упоминаем.
  • Timestamp можно задать руками или оставить нулевым. В нулевом случае franz-go при produce подставит time.Now() на стороне клиента (см. doc на kgo.Record.Timestamp). В лог попадёт именно это значение, если у топика message.timestamp.type=CreateTime (дефолт Kafka, и для brew.orders.v1 ничего не меняли). При LogAppendTime брокер перезапишет timestamp своим wall-clock - в курсе этот режим не используется.

После успешной записи брокер возвращает заполненные Partition, Offset и Timestamp - те, которые легли в лог. Это и есть «адрес» сообщения в Kafka. Дальше OrderPlaced order-7 живёт по координате brew.orders.v1 / partition=2 / offset=17 столько, сколько разрешает retention (для brew.orders.v1 это 30 дней - смотри Offsets и retention).

ProduceSync vs Produce

В franz-go две формы записи. Produce асинхронный, ProduceSync синхронный. Под капотом разницы по гарантиям нет: оба используют один и тот же сетевой канал, тот же батчинг, ту же retry-логику. Разница в том, как код узнаёт результат.

Produce кладёт запись во внутренний буфер клиента и сразу возвращается. Доставку колбэк сообщит потом:

go
cl.Produce(ctx, rec, func(r *kgo.Record, err error) {
    if err != nil { /* ругаемся в лог */ return }
    fmt.Printf("partition=%d offset=%d\n", r.Partition, r.Offset)
})

Тут можно за миллисекунды наштамповать миллион вызовов, клиент сам соберёт их в батчи и отправит. Это «быстрая дорога» для горячих путей: вызывающая горутина не блокируется, а ошибки обрабатываются отдельно в колбэке.

ProduceSync блокируется до тех пор, пока запись не получит ответ от брокера, и возвращает срез kgo.ProduceResults:

go
res := cl.ProduceSync(ctx, rec)
if err := res.FirstErr(); err != nil { ... }
fmt.Printf("partition=%d offset=%d\n", res[0].Record.Partition, res[0].Record.Offset)

«Медленная, но прямолинейная дорога». Управление возвращается только после ответа. Можно передать сразу несколько записей одним вызовом - ProduceSync(ctx, rec1, rec2, rec3) - и получить срез результатов в том же порядке. Для учебной лекции, где важно увидеть partition+offset сразу после строки OrderPlaced, ProduceSync удобнее некуда.

В проде на горячих путях обычно Produce. На холодных, типа «отправили один welcome-email», иногда ProduceSync - синхронность тут не стоит ничего, а код проще. Вопрос про батчинг и пропускную способность отдельно разбирается в Батчинг и пропускная способность.

acks - три цифры, три истории

Параметр acks отвечает на вопрос «когда брокер скажет продьюсеру, что запись принята». Допустимы три значения: 0, 1, all. Они дают разный уровень гарантий и разный latency. Чтобы не учить таблицу, разберу их через истории Brew.

acks=0 - «отправил и забыл». Продьюсер шлёт пакет в сокет и считает запись успешной по факту записи в TCP-буфер. Брокер ничего не подтверждает. Если пакет потеряется по дороге, упадёт лидер партиции, или сервер откажется его принимать - продьюсер не узнает. Запись просто исчезнет.

История из Brew. Когда промо-команда запускала рассылку «бесплатный кофе по пятницам», метрики кликов писались в brew.clickstream.v1 с acks=0. Терять отдельный клик - копеечная боль, аналитика всё равно сглаживает за час. Зато латенси записи 0.5ms, а не 5ms. Вопросов не возникало, пока кто-то не попытался поставить acks=0 на платежах - и за неделю «потерял» 200 транзакций. Уроки разобрал postmortem, acks на платежах теперь all без обсуждения.

acks=1 - «лидер подтвердил». Брокер отвечает «принял», как только лидер партиции записал данные себе на диск (точнее, в page cache - fsync это отдельный разговор). Реплики могут ещё не догнать. Если сразу после ответа лидер упадёт и failover выберет реплику, которая не успела подтянуть запись - данные потеряются.

Это middle ground: latency меньше, чем all, гарантия сильнее, чем 0. Подходит для метрик и логов, где «99.99% доходит» это нормально. Для бизнес-событий слабовато: каскадный отказ kafka-1 в Brew однажды привёл к потере полудня телеметрии именно из-за acks=1 на brew.telemetry.v1. После этого телеметрия переехала на acks=all, лимиты пересмотрели.

acks=all - «все ISR подтвердили». Брокер ждёт, пока запись подтвердят все реплики из ISR (про ISR смотри Репликация и ISR), и только потом отвечает продьюсеру. С min.insync.replicas=2 и RF=3 это значит: запись на двух нодах из трёх к моменту, как ProduceSync вернул управление. Failover теряет данные только если падают все ISR одновременно - сценарий, после которого у Brew проблемы посерьёзнее, чем потерянный заказ.

Цена acks=all - latency. Записи добавляется один сетевой round-trip между лидером и follower'ами в той же стойке (на стенде Brew - 1-3ms). Для критичных топиков это копейки. Для метрик с трафиком в сотни тысяч событий в секунду - заметно.

В franz-go дефолт уже acks=all (опция kgo.RequiredAcks(kgo.AllISRAcks())). Учебный продьюсер этой лекции на дефолт и опирается - явно опцию не переопределяет. На brew.orders.v1 и brew.payments.v1 стоит acks=all, и менять это никто не собирается. Подробный разбор acks и связанных гарантий лежит в Acks и durability.

Идемпотентность - история про двойное списание

В апреле платёжная команда Brew получила два жалобных тикета подряд. У клиентов списалось дважды: один заказ, две одинаковые PaymentReceived в brew.payments.v1. Аналитики прошли по логам - всё сходится: продьюсер в payment-service отправлял запись, ловил connection reset, повторял запрос, второй раз он успешно проходил. Брокер не догадывался, что это та же самая запись - писал обе.

Эта проблема называется дубль на ретрае. Она встроена в любую RPC-семантику at-least-once: клиент не знает, потерялся ли запрос или ответ, и от безысходности повторяет. Для платежей это катастрофа.

Решение в Kafka называется идемпотентный продьюсер (enable.idempotence=true). Брокер выдаёт каждому продьюсеру Producer ID (PID) при первом подключении, а продьюсер нумерует свои сообщения внутри сессии монотонным sequence number. Дальше брокер видит входящую запись, проверяет пару (PID, sequence) и:

  • если такой (PID, sequence) уже принят - тихо отвечает «ок» и не пишет дубль;
  • если sequence идёт по порядку - принимает запись и продвигает счётчик;
  • если sequence приходит из будущего (продьюсер пропустил) - отвечает ошибкой OUT_OF_ORDER_SEQUENCE_NUMBER, продьюсер пересобирает батч;
  • если sequence приходит из прошлого глубже окна (5 батчей назад) - тоже ошибка, нужен внешний разбор.

Идемпотентность бесплатна по latency и почти бесплатна по throughput. В franz-go она включена по умолчанию (раньше нужно было руками выставлять EnableIdempotence, сейчас уже нет). Это значит, что наши 10 записей OrderPlaced не задублируются при сетевой потере - брокер увидит повтор по (PID, seq) и отбросит. Подробный разбор внутренностей идемпотентного продьюсера ждёт в Идемпотентный продьюсер.

Чего идемпотентность не лечит. Между запусками процесса дубли возможны: PID живёт от подключения до подключения, и если процесс упал между ретраем и подтверждением, новый процесс получит новый PID, а брокер не свяжет его записи со старыми. На стороне источника тоже не спасает: дважды положил одно и то же событие в свою БД - продьюсер послушно отправит обе записи в Kafka, с точки зрения PID/sequence это два разных вызова. Отдельный сценарий это zombie-продьюсер: процесс завис, кто-то форкнул его копию, оба продолжают писать с разными PID, оба и пишут.

Полное решение «exactly once» дают транзакции, и о них целая отдельная лекция Транзакции и EOS. Здесь фиксируем уровень: в рамках одной producer-сессии Kafka сама ловит дубли, между сессиями - забота приложения.

Batching и compression - что они дают по цифрам

Когда order-service пишет 10 заказов в секунду, ничего интересного не происходит. Когда он пишет 10000 заказов в секунду в часы пик - выбор между acks=all плюс батчинг и acks=all без батчинга это разница в 5x по throughput.

Идея простая. Каждый сетевой round-trip до брокера стоит ~1ms (LAN). На каждое сообщение по отдельному round-trip - потолок порядка 1000 msg/s на одну партицию, дальше упирается в RTT. Если копить сообщения в RAM 5ms и потом отправлять одним пакетом по 50-200 записей - тот же тред делает 200000 msg/s.

Параметры, которые на это влияют:

  • linger.ms - сколько копить, прежде чем отправить. В Java-клиенте дефолт 0 (сразу), в franz-go дефолт уже 10ms - то есть наш учебный продьюсер из коробки получает разумный батчинг. Боевой типаж 5-20ms.
  • batch.size (в franz-go это ProducerBatchMaxBytes) - максимум, сколько в одном батче в байтах. Дефолт 1MB, для крупных payload'ов - больше.
  • compression.type - none, gzip, snappy, lz4, zstd. На JSON-payload'ах Brew zstd жмёт в 3-5 раз; экономия идёт по сети и по диску брокера.
  • max.in.flight.requests.per.connection - сколько незакрытых батчей одновременно держать в полёте. С идемпотентным продьюсером franz-go держит ≤5 чтобы не нарушать порядок.

Цифры на стенде Brew (RF=3, acks=all, 1KB JSON-payload, одна партиция):

КонфигThroughputp99 latency
linger=0, без compression~1200 msg/s4ms
linger=10ms, без compression~85000 msg/s14ms
linger=10ms, zstd~140000 msg/s16ms

Latency выросла на ~10ms (это и есть linger), throughput x70-x100. Для бэкенда это очень дешёвая сделка: 10ms задержки в обмен на разгрузку инфраструктуры по сети и брокерам в десятки раз.

Тонкости. На партиционированном топике батчи строятся per-partition - один батч живёт в одной партиции. То есть линковать рост throughput надо к числу партиций: 12 партиций даёт 12 параллельных батчевых каналов. Развёрнуто это разбирает Батчинг и пропускная способность, включая то, как linger взаимодействует с acks=all и idempotency.

В нашем учебном продьюсере мы пишем 10 сообщений по одному и не настраиваем ни linger, ни compression. Latency интересна, throughput нет. Дефолты franz-go подходят.

Ключ сообщения и связь с партицией

Поле Key в kgo.Record это руль к partitioner'у, а не случайный идентификатор. Дефолтный партиционер franz-go v1.21.0 это UniformBytesPartitioner(64KiB, true, true, nil) (KIP-794, в Java-клиенте дефолт с 3.3). Для записей с ключом он считает hash(key) mod N (хеш murmur2, как в Java-клиенте) и кладёт запись в выбранную партицию. Один и тот же ключ - всегда одна и та же партиция (пока число партиций не меняется), а значит порядок записей с этим ключом сохраняется.

Для brew.orders.v1 это критично. Ключом стоит order_id: все события одного заказа (OrderPlaced, PaymentReceived, OrderReady, OrderDelivered) ложатся в одну партицию и читаются строго по порядку. Если бы ключом стоял shop_id, получили бы баланс по магазинам, но порядок внутри одного заказа потерялся бы между партициями. Полный разбор кандидатов и trade-off'ов был в Топики и партиции, детальнее в Ключи и партиционирование.

Если Key пустой - тот же UniformBytesPartitioner копит ~64 KiB в одну партицию, потом переключается на следующую (sticky-bytes, а не round-robin по каждой записи). Нагрузка между партициями ровная на длинной дистанции, но порядок не гарантируется ни для какой подгруппы. Это нормально для метрик и логов, плохо для бизнес-событий. Сменить стратегию можно опцией kgo.RecordPartitioner - туда передаётся kgo.RoundRobinPartitioner, kgo.StickyKeyPartitioner (старый дефолт до KIP-794) или собственная реализация интерфейса kgo.Partitioner.

Headers и timestamp в учебной лекции упоминаем одной фразой и идём дальше: Headers это metadata-пары рядом с payload'ом (trace_id, message_type), Timestamp это либо что задал продьюсер, либо что подставил брокер. Развёрнутый разбор обоих - Ошибки, retries и headers.

Что показывает наш код

cmd/producer/main.go делает ровно столько, сколько обещали.

  1. Создаёт kgo.Client через shared helper.
  2. Создаёт топик brew.orders.v1 идемпотентно: partitions=3, rf=3. Если он уже есть - тихо едем дальше.
  3. В цикле от 0 до 9 формирует kgo.Record с Key="order-N", Value="OrderPlaced order_id=order-N" и пишет через ProduceSync.
  4. После каждой записи печатает строку таблицы - N, KEY, VALUE, PARTITION, OFFSET, BROKER-TS.
  5. После цикла дёргает kadm.ListEndOffsets, печатает per-partition latest и сумму. На свежесозданном топике сумма ровно равна 10 - наглядное доказательство, что записи легли.

Цикл записи это и есть «голая» работа продьюсера курса:

go
for i := 0; i < o.messages; i++ {
    key := fmt.Sprintf("order-%d", i)
    val := fmt.Sprintf("OrderPlaced order_id=order-%d", i)
    rec := &kgo.Record{
        Topic: o.topic,
        Key:   []byte(key),
        Value: []byte(val),
    }
 
    rpcCtx, rpcCancel := context.WithTimeout(ctx, 10*time.Second)
    res := cl.ProduceSync(rpcCtx, rec)
    rpcCancel()
    if err := res.FirstErr(); err != nil {
        return fmt.Errorf("produce %d: %w", i, err)
    }
    got := res[0].Record
    fmt.Fprintf(tw, "%d\t%s\t%s\t%d\t%d\t%s\n",
        i, key, val, got.Partition, got.Offset,
        got.Timestamp.Format("15:04:05.000"))
}

got.Partition и got.Offset это то, что вернул брокер, не то, что мы хотели. Вот те самые координаты в логе. Партиция получилась из ключа через hash(key) mod N, offset выдал leader партиции в момент записи (про earliest/latest и LEO/HWM смотри Offsets и retention).

После цикла - финальная сверка через ListEndOffsets:

go
ends, err := admin.ListEndOffsets(rpcCtx, topic)
ends.Each(func(o kadm.ListedOffset) {
    fmt.Fprintf(tw, "%d\t%d\n", o.Partition, o.Offset)
    total += o.Offset
})
fmt.Fprintf(tw, "TOTAL\t%d\n", total)

Сумма latest по всем партициям свежего топика равна числу записанных сообщений. Запустишь программу второй раз - сумма станет 20, и так далее.

Что увидишь в выводе:

plaintext
brew-topic "brew.orders.v1" создан: partitions=3 rf=3
 
пишем 10 OrderPlaced в топик "brew.orders.v1" через ProduceSync
 
N  KEY      VALUE                         PARTITION  OFFSET  BROKER-TS
0  order-0  OrderPlaced order_id=order-0  1          0       16:55:01.234
1  order-1  OrderPlaced order_id=order-1  1          1       16:55:01.241
2  order-2  OrderPlaced order_id=order-2  0          0       16:55:01.247
3  order-3  OrderPlaced order_id=order-3  0          1       16:55:01.253
4  order-4  OrderPlaced order_id=order-4  2          0       16:55:01.259
5  order-5  OrderPlaced order_id=order-5  2          1       16:55:01.265
6  order-6  OrderPlaced order_id=order-6  0          2       16:55:01.271
7  order-7  OrderPlaced order_id=order-7  1          2       16:55:01.277
8  order-8  OrderPlaced order_id=order-8  2          2       16:55:01.283
9  order-9  OrderPlaced order_id=order-9  1          3       16:55:01.289
 
готово. Смотрим ту же картину со стороны лога:
PARTITION  LATEST
0          3
1          4
2          3
TOTAL      10

Несколько наблюдений по этому выводу.

Каждая запись получила свой offset внутри своей партиции. OrderPlaced для order-0, order-1, order-7, order-9 лёг в партицию 1 с offset'ами 0, 1, 2, 3 - четыре сообщения, latest=4. В партиции 0 и 2 по три сообщения, latest=3. Сумма latest по всем партициям = 10. Всё сходится.

Распределение по партициям детерминированное, не случайное. Один и тот же order_id всегда попадает в одну и ту же партицию. Если перезапустить программу с тем же набором ключей - раскладка повторится (но offset'ы поедут вверх, потому что это новая запись поверх старого лога). Логика partitioner'а это Ключи и партиционирование.

Если ты дважды запустил программу без topic-delete, конкретные offset'ы у тебя будут другие - все offset'ы по сумме вырастут на 10, потому что новый прогон допишет поверх старого лога. Это нормально. Раскладка ключей по партициям при этом не меняется: тот же order-N всегда попадает в ту же партицию.

Запуск

Стенд должен быть поднят (docker compose up -d из корня).

sh
make run

Параллельно во втором терминале можно посмотреть на тот же топик через CLI-консьюмер:

sh
make consume-cli

Это kafka-console-consumer.sh изнутри kafka-1 с флагом --from-beginning. Печатает партицию, offset, ключ и значение. Должны увидеть те же 10 OrderPlaced, что отдала наша программа - Kafka не различает «сообщение от Go-клиента» и «сообщение от kafka-console-producer», в логе они выглядят одинаково.

Описать топик через kafka-topics.sh:

sh
make topic-describe

Прибрать после лекции:

sh
make topic-delete

Что забрать с собой

База, на которой стоит весь дальнейший модуль 02. После этой лекции в голове должна сложиться такая модель:

  1. kgo.Client долгоживущий. Создал один раз - переиспользуй до shutdown'а. Это не HTTP-запрос.
  2. kgo.Record это байты плюс адресация (topic + key + headers + timestamp). Сериализацию делаешь сам. Брокер тип не проверяет.
  3. acks определяет уровень гарантий. 0 - fire-and-forget (метрики, кликстрим). 1 - лидер записал (компромисс). all - все ISR подтвердили (бизнес-события). На brew.orders.v1 и brew.payments.v1 стоит all.
  4. Идемпотентность включена по умолчанию. В рамках одной producer-сессии Kafka сама ловит дубли по (PID, sequence). Между сессиями нужны транзакции - смотри Транзакции и EOS.
  5. Брокер выдаёт offset, не клиент. Возвращённая пара (partition, offset) - координата сообщения в логе на всю его жизнь до retention.

Дальше Первый консьюмер на franz-go. Прочитаем эти 10 OrderPlaced со стороны kitchen-service, увидим, что offset'ы в выводе совпадают с теми, что вернул нам ProduceSync, и через auto-commit получим первое знакомство с тем, что такое committed offset группы.

·Модуль 01

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Основы / Первый продьюсер на franz-go