Первый продьюсер на franz-go
В Brew наступил день миграции: order-service пишет свой первый OrderPlaced в Kafka, а не в RabbitMQ. Дежурный жмёт деплой, открывает логи и читает строку produce ok partition=2 offset=17. И сразу вопрос - что эти три слова означают на уровне гарантий? Брокер положил запись на диск или просто принял в RAM? Если сейчас выдернуть кабель из лидера, заказ останется или растворится? Запись с offset=17 повторится при ретрае или нет?
Это лекция про producer. Программу, которая берёт событие приложения и кладёт его в партицию. До этого мы смотрели на Kafka со стороны брокера - топики, партиции, ISR, offset, retention. Теперь меняем сторону: код, который пишет.
Цель скромная. Записать 10 сообщений с ключами order-0..order-9, увидеть пары (partition, offset) для каждого и убедиться, что это та самая модель из лекций 01-02 и 01-04. Сложные параметры - идемпотентность, acks, батчинг - проговорим словами и оставим для модуля 02. Тут - голая базовая запись плюс честная инвентаризация гарантий.
kgo.Client - долгоживущий объект
Первый рефлекс HTTP-разработчика - открыть соединение, отправить запрос, закрыть. С Kafka так не работают. kgo.Client в franz-go это долгоживущий объект: один на процесс, один на сервис. У order-service ровно один такой клиент на старте процесса, и он живёт до shutdown'а.
Внутри клиента живёт пул соединений до брокеров, кэш метаданных топиков (кто сейчас leader для каждой партиции), фоновые горутины для батчинга, отправки и ретраев. Создание клиента дорогое - dial до брокеров, обмен metadata-запросами, прогрев. Запись через готового клиента дешёвая - кладёшь Record в буфер, дальше работает фон.
Аналогия для бэкендера. kgo.Client ближе всего к *sql.DB в Go: тоже не одно физическое соединение, а пул, тоже долгоживущий, тоже с фоновыми задачами. Никому не приходит в голову на каждый SQL-запрос делать sql.Open и db.Close() - и тут так же.
Базовая инициализация:
cl, err := kgo.NewClient(
kgo.SeedBrokers("localhost:19092", "localhost:19093", "localhost:19094"),
kgo.ClientID("order-service"),
)
if err != nil { ... }
defer cl.Close()SeedBrokers это точки входа, не «полный список брокеров». Клиент стучится к любому из перечисленных адресов, через него получает актуальный список всех нод кластера и дальше работает уже с ним. Поэтому достаточно указать один-три адреса; в курсе пишем все три на случай, если кто-то лежит в момент старта.
В лекционном коде клиент создаётся через internal.kafka.NewClient - обёртка с дефолтами курса. SeedBrokers подхватываются из KAFKA_BOOTSTRAP, ClientID lectures, плюс разумные таймауты на dial и retry. Если нужны нестандартные опции - передаём их вторым аргументом, они дописываются последними и могут перетирать дефолты (этим, например, пользуются транзакционные продьюсеры в Транзакции и EOS).
kgo.Record - что мы вообще пишем
kgo.Record это структура, которая описывает одно сообщение. Не JSON, не protobuf, не string - просто описание байтов плюс адресация.
rec := &kgo.Record{
Topic: "brew.orders.v1",
Key: []byte("order-7"),
Value: []byte(`{"type":"OrderPlaced","order_id":"order-7","shop":"baker-st","total":420}`),
Headers: []kgo.RecordHeader{{Key: "type", Value: []byte("OrderPlaced")}},
Partition: -1, // -1 = доверить partitioner'у; явный номер тоже можно
}Ключевые свойства, которые удобно проговорить сразу:
KeyиValueэто[]byte. Сериализация (JSON / Protobuf / Avro) - твоя ответственность. Брокер данные не парсит и тип не проверяет: для него это просто байты с длиной.Topicобязателен.Partitionобычно нет - partitioner сам выберет, ориентируясь на ключ (про это Ключи и партиционирование).Headersэто[]byte-пары рядом с payload'ом. Туда складываютtrace_id,message_type,source_service- всё, что удобно прочитать без десериализации тела. Развёрнуто разберём в Ошибки, retries и headers, здесь только упоминаем.Timestampможно задать руками или оставить нулевым. В нулевом случае franz-go при produce подставитtime.Now()на стороне клиента (см. doc наkgo.Record.Timestamp). В лог попадёт именно это значение, если у топикаmessage.timestamp.type=CreateTime(дефолт Kafka, и дляbrew.orders.v1ничего не меняли). ПриLogAppendTimeброкер перезапишет timestamp своим wall-clock - в курсе этот режим не используется.
После успешной записи брокер возвращает заполненные Partition, Offset и Timestamp - те, которые легли в лог. Это и есть «адрес» сообщения в Kafka. Дальше OrderPlaced order-7 живёт по координате brew.orders.v1 / partition=2 / offset=17 столько, сколько разрешает retention (для brew.orders.v1 это 30 дней - смотри Offsets и retention).
ProduceSync vs Produce
В franz-go две формы записи. Produce асинхронный, ProduceSync синхронный. Под капотом разницы по гарантиям нет: оба используют один и тот же сетевой канал, тот же батчинг, ту же retry-логику. Разница в том, как код узнаёт результат.
Produce кладёт запись во внутренний буфер клиента и сразу возвращается. Доставку колбэк сообщит потом:
cl.Produce(ctx, rec, func(r *kgo.Record, err error) {
if err != nil { /* ругаемся в лог */ return }
fmt.Printf("partition=%d offset=%d\n", r.Partition, r.Offset)
})Тут можно за миллисекунды наштамповать миллион вызовов, клиент сам соберёт их в батчи и отправит. Это «быстрая дорога» для горячих путей: вызывающая горутина не блокируется, а ошибки обрабатываются отдельно в колбэке.
ProduceSync блокируется до тех пор, пока запись не получит ответ от брокера, и возвращает срез kgo.ProduceResults:
res := cl.ProduceSync(ctx, rec)
if err := res.FirstErr(); err != nil { ... }
fmt.Printf("partition=%d offset=%d\n", res[0].Record.Partition, res[0].Record.Offset)«Медленная, но прямолинейная дорога». Управление возвращается только после ответа. Можно передать сразу несколько записей одним вызовом - ProduceSync(ctx, rec1, rec2, rec3) - и получить срез результатов в том же порядке. Для учебной лекции, где важно увидеть partition+offset сразу после строки OrderPlaced, ProduceSync удобнее некуда.
В проде на горячих путях обычно Produce. На холодных, типа «отправили один welcome-email», иногда ProduceSync - синхронность тут не стоит ничего, а код проще. Вопрос про батчинг и пропускную способность отдельно разбирается в Батчинг и пропускная способность.
acks - три цифры, три истории
Параметр acks отвечает на вопрос «когда брокер скажет продьюсеру, что запись принята». Допустимы три значения: 0, 1, all. Они дают разный уровень гарантий и разный latency. Чтобы не учить таблицу, разберу их через истории Brew.
acks=0 - «отправил и забыл». Продьюсер шлёт пакет в сокет и считает запись успешной по факту записи в TCP-буфер. Брокер ничего не подтверждает. Если пакет потеряется по дороге, упадёт лидер партиции, или сервер откажется его принимать - продьюсер не узнает. Запись просто исчезнет.
История из Brew. Когда промо-команда запускала рассылку «бесплатный кофе по пятницам», метрики кликов писались в brew.clickstream.v1 с acks=0. Терять отдельный клик - копеечная боль, аналитика всё равно сглаживает за час. Зато латенси записи 0.5ms, а не 5ms. Вопросов не возникало, пока кто-то не попытался поставить acks=0 на платежах - и за неделю «потерял» 200 транзакций. Уроки разобрал postmortem, acks на платежах теперь all без обсуждения.
acks=1 - «лидер подтвердил». Брокер отвечает «принял», как только лидер партиции записал данные себе на диск (точнее, в page cache - fsync это отдельный разговор). Реплики могут ещё не догнать. Если сразу после ответа лидер упадёт и failover выберет реплику, которая не успела подтянуть запись - данные потеряются.
Это middle ground: latency меньше, чем all, гарантия сильнее, чем 0. Подходит для метрик и логов, где «99.99% доходит» это нормально. Для бизнес-событий слабовато: каскадный отказ kafka-1 в Brew однажды привёл к потере полудня телеметрии именно из-за acks=1 на brew.telemetry.v1. После этого телеметрия переехала на acks=all, лимиты пересмотрели.
acks=all - «все ISR подтвердили». Брокер ждёт, пока запись подтвердят все реплики из ISR (про ISR смотри Репликация и ISR), и только потом отвечает продьюсеру. С min.insync.replicas=2 и RF=3 это значит: запись на двух нодах из трёх к моменту, как ProduceSync вернул управление. Failover теряет данные только если падают все ISR одновременно - сценарий, после которого у Brew проблемы посерьёзнее, чем потерянный заказ.
Цена acks=all - latency. Записи добавляется один сетевой round-trip между лидером и follower'ами в той же стойке (на стенде Brew - 1-3ms). Для критичных топиков это копейки. Для метрик с трафиком в сотни тысяч событий в секунду - заметно.
В franz-go дефолт уже acks=all (опция kgo.RequiredAcks(kgo.AllISRAcks())). Учебный продьюсер этой лекции на дефолт и опирается - явно опцию не переопределяет. На brew.orders.v1 и brew.payments.v1 стоит acks=all, и менять это никто не собирается. Подробный разбор acks и связанных гарантий лежит в Acks и durability.
Идемпотентность - история про двойное списание
В апреле платёжная команда Brew получила два жалобных тикета подряд. У клиентов списалось дважды: один заказ, две одинаковые PaymentReceived в brew.payments.v1. Аналитики прошли по логам - всё сходится: продьюсер в payment-service отправлял запись, ловил connection reset, повторял запрос, второй раз он успешно проходил. Брокер не догадывался, что это та же самая запись - писал обе.
Эта проблема называется дубль на ретрае. Она встроена в любую RPC-семантику at-least-once: клиент не знает, потерялся ли запрос или ответ, и от безысходности повторяет. Для платежей это катастрофа.
Решение в Kafka называется идемпотентный продьюсер (enable.idempotence=true). Брокер выдаёт каждому продьюсеру Producer ID (PID) при первом подключении, а продьюсер нумерует свои сообщения внутри сессии монотонным sequence number. Дальше брокер видит входящую запись, проверяет пару (PID, sequence) и:
- если такой
(PID, sequence)уже принят - тихо отвечает «ок» и не пишет дубль; - если sequence идёт по порядку - принимает запись и продвигает счётчик;
- если sequence приходит из будущего (продьюсер пропустил) - отвечает ошибкой
OUT_OF_ORDER_SEQUENCE_NUMBER, продьюсер пересобирает батч; - если sequence приходит из прошлого глубже окна (5 батчей назад) - тоже ошибка, нужен внешний разбор.
Идемпотентность бесплатна по latency и почти бесплатна по throughput. В franz-go она включена по умолчанию (раньше нужно было руками выставлять EnableIdempotence, сейчас уже нет). Это значит, что наши 10 записей OrderPlaced не задублируются при сетевой потере - брокер увидит повтор по (PID, seq) и отбросит. Подробный разбор внутренностей идемпотентного продьюсера ждёт в Идемпотентный продьюсер.
Чего идемпотентность не лечит. Между запусками процесса дубли возможны: PID живёт от подключения до подключения, и если процесс упал между ретраем и подтверждением, новый процесс получит новый PID, а брокер не свяжет его записи со старыми. На стороне источника тоже не спасает: дважды положил одно и то же событие в свою БД - продьюсер послушно отправит обе записи в Kafka, с точки зрения PID/sequence это два разных вызова. Отдельный сценарий это zombie-продьюсер: процесс завис, кто-то форкнул его копию, оба продолжают писать с разными PID, оба и пишут.
Полное решение «exactly once» дают транзакции, и о них целая отдельная лекция Транзакции и EOS. Здесь фиксируем уровень: в рамках одной producer-сессии Kafka сама ловит дубли, между сессиями - забота приложения.
Batching и compression - что они дают по цифрам
Когда order-service пишет 10 заказов в секунду, ничего интересного не происходит. Когда он пишет 10000 заказов в секунду в часы пик - выбор между acks=all плюс батчинг и acks=all без батчинга это разница в 5x по throughput.
Идея простая. Каждый сетевой round-trip до брокера стоит ~1ms (LAN). На каждое сообщение по отдельному round-trip - потолок порядка 1000 msg/s на одну партицию, дальше упирается в RTT. Если копить сообщения в RAM 5ms и потом отправлять одним пакетом по 50-200 записей - тот же тред делает 200000 msg/s.
Параметры, которые на это влияют:
linger.ms- сколько копить, прежде чем отправить. В Java-клиенте дефолт 0 (сразу), в franz-go дефолт уже 10ms - то есть наш учебный продьюсер из коробки получает разумный батчинг. Боевой типаж 5-20ms.batch.size(в franz-go этоProducerBatchMaxBytes) - максимум, сколько в одном батче в байтах. Дефолт 1MB, для крупных payload'ов - больше.compression.type-none,gzip,snappy,lz4,zstd. На JSON-payload'ах Brew zstd жмёт в 3-5 раз; экономия идёт по сети и по диску брокера.max.in.flight.requests.per.connection- сколько незакрытых батчей одновременно держать в полёте. С идемпотентным продьюсером franz-go держит ≤5 чтобы не нарушать порядок.
Цифры на стенде Brew (RF=3, acks=all, 1KB JSON-payload, одна партиция):
| Конфиг | Throughput | p99 latency |
|---|---|---|
linger=0, без compression | ~1200 msg/s | 4ms |
linger=10ms, без compression | ~85000 msg/s | 14ms |
linger=10ms, zstd | ~140000 msg/s | 16ms |
Latency выросла на ~10ms (это и есть linger), throughput x70-x100. Для бэкенда это очень дешёвая сделка: 10ms задержки в обмен на разгрузку инфраструктуры по сети и брокерам в десятки раз.
Тонкости. На партиционированном топике батчи строятся per-partition - один батч живёт в одной партиции. То есть линковать рост throughput надо к числу партиций: 12 партиций даёт 12 параллельных батчевых каналов. Развёрнуто это разбирает Батчинг и пропускная способность, включая то, как linger взаимодействует с acks=all и idempotency.
В нашем учебном продьюсере мы пишем 10 сообщений по одному и не настраиваем ни linger, ни compression. Latency интересна, throughput нет. Дефолты franz-go подходят.
Ключ сообщения и связь с партицией
Поле Key в kgo.Record это руль к partitioner'у, а не случайный идентификатор. Дефолтный партиционер franz-go v1.21.0 это UniformBytesPartitioner(64KiB, true, true, nil) (KIP-794, в Java-клиенте дефолт с 3.3). Для записей с ключом он считает hash(key) mod N (хеш murmur2, как в Java-клиенте) и кладёт запись в выбранную партицию. Один и тот же ключ - всегда одна и та же партиция (пока число партиций не меняется), а значит порядок записей с этим ключом сохраняется.
Для brew.orders.v1 это критично. Ключом стоит order_id: все события одного заказа (OrderPlaced, PaymentReceived, OrderReady, OrderDelivered) ложатся в одну партицию и читаются строго по порядку. Если бы ключом стоял shop_id, получили бы баланс по магазинам, но порядок внутри одного заказа потерялся бы между партициями. Полный разбор кандидатов и trade-off'ов был в Топики и партиции, детальнее в Ключи и партиционирование.
Если Key пустой - тот же UniformBytesPartitioner копит ~64 KiB в одну партицию, потом переключается на следующую (sticky-bytes, а не round-robin по каждой записи). Нагрузка между партициями ровная на длинной дистанции, но порядок не гарантируется ни для какой подгруппы. Это нормально для метрик и логов, плохо для бизнес-событий. Сменить стратегию можно опцией kgo.RecordPartitioner - туда передаётся kgo.RoundRobinPartitioner, kgo.StickyKeyPartitioner (старый дефолт до KIP-794) или собственная реализация интерфейса kgo.Partitioner.
Headers и timestamp в учебной лекции упоминаем одной фразой и идём дальше: Headers это metadata-пары рядом с payload'ом (trace_id, message_type), Timestamp это либо что задал продьюсер, либо что подставил брокер. Развёрнутый разбор обоих - Ошибки, retries и headers.
Что показывает наш код
cmd/producer/main.go делает ровно столько, сколько обещали.
- Создаёт
kgo.Clientчерез shared helper. - Создаёт топик
brew.orders.v1идемпотентно:partitions=3,rf=3. Если он уже есть - тихо едем дальше. - В цикле от 0 до 9 формирует
kgo.RecordсKey="order-N",Value="OrderPlaced order_id=order-N"и пишет черезProduceSync. - После каждой записи печатает строку таблицы - N, KEY, VALUE, PARTITION, OFFSET, BROKER-TS.
- После цикла дёргает
kadm.ListEndOffsets, печатает per-partition latest и сумму. На свежесозданном топике сумма ровно равна 10 - наглядное доказательство, что записи легли.
Цикл записи это и есть «голая» работа продьюсера курса:
for i := 0; i < o.messages; i++ {
key := fmt.Sprintf("order-%d", i)
val := fmt.Sprintf("OrderPlaced order_id=order-%d", i)
rec := &kgo.Record{
Topic: o.topic,
Key: []byte(key),
Value: []byte(val),
}
rpcCtx, rpcCancel := context.WithTimeout(ctx, 10*time.Second)
res := cl.ProduceSync(rpcCtx, rec)
rpcCancel()
if err := res.FirstErr(); err != nil {
return fmt.Errorf("produce %d: %w", i, err)
}
got := res[0].Record
fmt.Fprintf(tw, "%d\t%s\t%s\t%d\t%d\t%s\n",
i, key, val, got.Partition, got.Offset,
got.Timestamp.Format("15:04:05.000"))
}got.Partition и got.Offset это то, что вернул брокер, не то, что мы хотели. Вот те самые координаты в логе. Партиция получилась из ключа через hash(key) mod N, offset выдал leader партиции в момент записи (про earliest/latest и LEO/HWM смотри Offsets и retention).
После цикла - финальная сверка через ListEndOffsets:
ends, err := admin.ListEndOffsets(rpcCtx, topic)
ends.Each(func(o kadm.ListedOffset) {
fmt.Fprintf(tw, "%d\t%d\n", o.Partition, o.Offset)
total += o.Offset
})
fmt.Fprintf(tw, "TOTAL\t%d\n", total)Сумма latest по всем партициям свежего топика равна числу записанных сообщений. Запустишь программу второй раз - сумма станет 20, и так далее.
Что увидишь в выводе:
brew-topic "brew.orders.v1" создан: partitions=3 rf=3
пишем 10 OrderPlaced в топик "brew.orders.v1" через ProduceSync
N KEY VALUE PARTITION OFFSET BROKER-TS
0 order-0 OrderPlaced order_id=order-0 1 0 16:55:01.234
1 order-1 OrderPlaced order_id=order-1 1 1 16:55:01.241
2 order-2 OrderPlaced order_id=order-2 0 0 16:55:01.247
3 order-3 OrderPlaced order_id=order-3 0 1 16:55:01.253
4 order-4 OrderPlaced order_id=order-4 2 0 16:55:01.259
5 order-5 OrderPlaced order_id=order-5 2 1 16:55:01.265
6 order-6 OrderPlaced order_id=order-6 0 2 16:55:01.271
7 order-7 OrderPlaced order_id=order-7 1 2 16:55:01.277
8 order-8 OrderPlaced order_id=order-8 2 2 16:55:01.283
9 order-9 OrderPlaced order_id=order-9 1 3 16:55:01.289
готово. Смотрим ту же картину со стороны лога:
PARTITION LATEST
0 3
1 4
2 3
TOTAL 10Несколько наблюдений по этому выводу.
Каждая запись получила свой offset внутри своей партиции. OrderPlaced для order-0, order-1, order-7, order-9 лёг в партицию 1 с offset'ами 0, 1, 2, 3 - четыре сообщения, latest=4. В партиции 0 и 2 по три сообщения, latest=3. Сумма latest по всем партициям = 10. Всё сходится.
Распределение по партициям детерминированное, не случайное. Один и тот же order_id всегда попадает в одну и ту же партицию. Если перезапустить программу с тем же набором ключей - раскладка повторится (но offset'ы поедут вверх, потому что это новая запись поверх старого лога). Логика partitioner'а это Ключи и партиционирование.
Если ты дважды запустил программу без topic-delete, конкретные offset'ы у тебя будут другие - все offset'ы по сумме вырастут на 10, потому что новый прогон допишет поверх старого лога. Это нормально. Раскладка ключей по партициям при этом не меняется: тот же order-N всегда попадает в ту же партицию.
Запуск
Стенд должен быть поднят (docker compose up -d из корня).
make runПараллельно во втором терминале можно посмотреть на тот же топик через CLI-консьюмер:
make consume-cliЭто kafka-console-consumer.sh изнутри kafka-1 с флагом --from-beginning. Печатает партицию, offset, ключ и значение. Должны увидеть те же 10 OrderPlaced, что отдала наша программа - Kafka не различает «сообщение от Go-клиента» и «сообщение от kafka-console-producer», в логе они выглядят одинаково.
Описать топик через kafka-topics.sh:
make topic-describeПрибрать после лекции:
make topic-deleteЧто забрать с собой
База, на которой стоит весь дальнейший модуль 02. После этой лекции в голове должна сложиться такая модель:
- kgo.Client долгоживущий. Создал один раз - переиспользуй до shutdown'а. Это не HTTP-запрос.
- kgo.Record это байты плюс адресация (topic + key + headers + timestamp). Сериализацию делаешь сам. Брокер тип не проверяет.
acksопределяет уровень гарантий.0- fire-and-forget (метрики, кликстрим).1- лидер записал (компромисс).all- все ISR подтвердили (бизнес-события). Наbrew.orders.v1иbrew.payments.v1стоитall.- Идемпотентность включена по умолчанию. В рамках одной producer-сессии Kafka сама ловит дубли по
(PID, sequence). Между сессиями нужны транзакции - смотри Транзакции и EOS. - Брокер выдаёт offset, не клиент. Возвращённая пара
(partition, offset)- координата сообщения в логе на всю его жизнь до retention.
Дальше Первый консьюмер на franz-go. Прочитаем эти 10 OrderPlaced со стороны kitchen-service, увидим, что offset'ы в выводе совпадают с теми, что вернул нам ProduceSync, и через auto-commit получим первое знакомство с тем, что такое committed offset группы.