Stream processing: концепции
До этого момента мы смотрели на Kafka как на транспорт: пишем сообщения, читаем сообщения, обрабатываем по одной штуке. Большинство реальных задач так и устроены. Consumer читает запись, что-то с ней делает, пишет результат куда-то ещё, коммитит offset, идёт дальше. В Brew так работает kitchen-service: взял заказ из brew.orders.v1, приготовил, пошёл дальше.
Но есть второй большой класс задач — где мы хотим считать что-то поверх потока. Сколько заказов в минуту по всей сети. Топ напитков за час. Сколько раз клиент открыл меню за последние 5 секунд. Это работа analytics-service в Brew, и тут одних offset'ов уже недостаточно.
Это и есть stream processing. Лекция вводная — с кодом разберёмся в Stream processing на Go (franz-go + Pebble). Тут — про идеи. Что такое event-time, чем он отличается от processing-time, что такое windowing, почему он почти всегда event-time-based, что значит «late events» и зачем нужен watermark. Плюс короткие сноски про KStream/KTable, repartitioning и stateful operations — это словарь, без которого Stream processing на Go (franz-go + Pebble) будет читаться как иероглифы.
Два времени
У одного события всегда есть несколько штампов времени, и путать их — главный источник багов в стримах.
Event-time — время, когда событие случилось в реальном мире. Клиент оформил заказ в приложении Brew. POS-терминал в кофейне пробил латте. Бариста нажал «готово». Это «правильное» время для аналитики и для бизнеса.
Processing-time — время, когда наш стрим-процесс взял сообщение в работу. То есть time.Now() внутри consumer'а в момент, когда мы видим record.
Ingestion-time — время, когда сообщение легло в Kafka log. Это record.Timestamp, которое брокер либо берёт у producer'а (CreateTime — дефолт), либо штампует сам при append'е (LogAppendTime — настраивается через message.timestamp.type).
Между ними может быть пропасть. POS-терминал в одной из 80 кофеен Brew лежал в офлайне три часа — упал интернет в торговом центре. Потом подключился к 4G и слил пачку накопленных заказов. Event-time у этих заказов — три часа назад. Ingestion-time — сейчас. Processing-time — через секунду после ingestion'а (когда analytics-service проснётся).
Когда мы говорим «заказы за минуту с 14:00 до 14:01», какое время мы имеем в виду? Если бизнесовое — то event-time. Если «сколько работы пришло на наш кластер за эту минуту» — processing-time. Спутать одно с другим — привет, странные пики на дашборде Brew в самый неподходящий момент: промо-пятница вроде закончилась, а график продаж всё дёргается.
Windowing
Стрим бесконечен. Чтобы посчитать «среднее за период», надо как-то нарезать его на конечные куски. Куски называются окнами (windows). Их несколько типов.
Tumbling — фиксированный размер, без перекрытий. Например, окна по 1 минуте: 14:00–14:01, 14:01–14:02, 14:02–14:03 и так далее. Каждое событие попадает ровно в одно окно. Это то, что мы используем в коде ниже.
Hopping — фиксированный размер плюс шаг. Окно 5 минут с шагом 1 минута: 14:00–14:05, 14:01–14:06, 14:02–14:07 и так далее. Окна перекрываются, событие попадает в несколько штук. Полезно для скользящих средних.
Sliding — окно «вокруг каждого события». Размер фиксированный, привязка — к моменту прихода записи. Используется реже, потому что считать дороже.
Session — окно по активности. Открывается с приходом события, закрывается, если новых событий нет дольше gap'а (например, 30 минут). Размер у каждого окна свой. Так считают, например, web-сессии пользователей.
В нашем коде — простейшие tumbling-окна по 1 минуте, рассчитанные двумя способами одновременно: по event-time и по processing-time. Цель — глазами увидеть, что одни и те же события ложатся в разные минутные ведра.
Watermark и late events
Раз мы агрегируем по event-time, появляется неприятный вопрос. Когда закрыть окно? Окно 14:00–14:01 в event-time может получать новые события и в 14:02 (только дошедшие из офлайна), и через сутки (после полного восстановления связи). Если ждать «всех опоздавших», окно не закроется никогда.
Решение — watermark. Это монотонно растущая оценка вида «я уверен, что событий с event-time меньше W больше не будет». Когда watermark проходит конец окна, окно эмитят как готовое. События, пришедшие после — late events. С ними обращаются по-разному. Можно дропнуть. Можно отложить в боковой топик для пост-обработки. Можно «ретроактивно» обновить уже эмитнутое окно — если downstream такое переваривает.
Простейшая стратегия watermark'а — max(event-time виденных записей) - tolerance. Tolerance — например, 1 минута. То есть мы считаем, что если самое свежее событие, которое мы видели, было в 14:30, то окна до 14:29 можно закрывать. Записи с event-time раньше 14:29 после этого момента — late.
Тонкая деталь. Watermark — per-partition, не глобальный. Если у нас 6 партиций и одна из них тормозит (нет новых записей), watermark всего топика стоит на месте — ждём отстающего. Это та же логика, что с HWM в репликации: не двигаемся, пока самая медленная сторона не дотянется.
В нашем коде watermark'а нет — мы просто бесконечно копим окна в памяти и печатаем. Это упрощение для лекции. В Stream processing на Go (franz-go + Pebble) watermark'а тоже не будет — для топа напитков он не нужен. Реальные fully-fledged системы (Flink, Beam, Kafka Streams 3.x+) считают watermark'и автоматически.
KStream vs KTable
Два слова, которые приходят из мира Kafka Streams и которые полезно знать, даже если на Go нет нативного Streams.
KStream — это stream of events. Каждая запись — самостоятельное событие. Дубликаты ключей — нормально (один и тот же клиент может сделать пять заказов, это пять событий). Историчность важна. Семантика — append.
KTable — это stateful snapshot, проекция compacted-топика на «текущее состояние». Запись с тем же ключом перетирает предыдущую. Семантика — upsert. Tombstone (value=nil) удаляет ключ.
Разница на пальцах. KStream brew.orders.v1 — все заказы за всё время. KTable drink-totals — текущее число проданных порций по каждому напитку на этот момент. Ключ один и тот же (drink_id), смысл совершенно разный.
Под капотом KTable обычно — compacted-топик плюс локальный state store (RocksDB или Pebble), куда записи проецируются. При рестарте store восстанавливается из compacted-топика. В Stream processing на Go (franz-go + Pebble) мы повторим эту схему руками: Pebble + changelog topic для топа напитков Brew.
Repartitioning
Допустим, мы читаем brew.orders.v1 (key=order_id) и хотим сгруппировать по drink_id для подсчёта проданных порций каждого напитка. Беда в том, что записи об одном напитке сейчас размазаны по всем партициям топика — потому что хешировались по order_id. Складывать count'ы между партициями локально нельзя: разные worker'ы считают разные подмножества.
Решение — repartition. Шаг 1: читаем brew.orders.v1, перепаковываем в orders-by-drink с key=drink_id. Шаг 2: читаем orders-by-drink уже как stream, сгруппированный по нужному ключу. Теперь в одной партиции лежат все позиции одного напитка — можно держать локальный state и честно считать топ напитков.
Repartition — недешёвая штука. Лишняя серилизация, лишний топик, лишний сетевой трип. В Kafka Streams она делается неявно при операциях groupBy (если key меняется) — поэтому в API библиотеки часто видна предупредительная пометка «may trigger repartitioning». Когда сами строим стрим на franz-go, repartition нужно делать руками: producer пишет в repartition-топик, consumer читает оттуда же.
Stateful operations
Stateless — это там, где обработка одной записи не зависит от других. map, filter, flatMap. Перевернул, отбросил, размножил. State не нужен, рестарт без потерь.
Stateful — где нужно помнить что-то между записями. count, sum, min/max, aggregate, join. Тут уже state store обязателен: либо на диске (Pebble/RocksDB), либо в памяти с дублированием в changelog-топик для durability. Иначе любой рестарт = потеря накопленного. В нашем aggregator'е state — это две in-memory мапы; kill -9 обнулит их. Для лекции это ок, для прода — нет.
В Stream processing на Go (franz-go + Pebble) мы добавим Pebble и changelog topic — получится рабочая модель stateful processing'а с восстановлением state'а после рестарта. Без этого слоя топ напитков Brew обнулялся бы при первом же сбое analytics-service.
Что показывает наш код
Один бинарь, две роли.
Producer (-role=events) тикает раз в rate (50ms по умолчанию) и пишет один заказ Brew (shop_id, drink, сумма). У события — синтетический event-time, который отстаёт от wall-clock'а на случайную величину. Нормальный заказ — лаг от 0 до 60 секунд. С вероятностью late-prob (10% по умолчанию) лаг прыгает в диапазон 90–240 секунд: симулируем POS-терминал кофейни, который слил пачку заказов из офлайна.
Само формирование event-time выглядит ровно так:
lag := time.Duration(rng.Int63n(int64(o.eventLagMax) + 1))
late := false
if rng.Float64() < o.lateProb && o.lateLagMax > o.lateLagMin {
lag = o.lateLagMin + time.Duration(rng.Int63n(int64(o.lateLagMax-o.lateLagMin)+1))
late = true
}
eventTime := now.Add(-lag)Дальше event-time кладётся в Kafka header event-time как 8 байт unix-nano big-endian (header-формат на уровне курса — наш собственный, не стандарт):
headers := []kgo.RecordHeader{
{Key: "event-time", Value: encodeUnixNano(eventTime)},
}
if late {
headers = append(headers, kgo.RecordHeader{Key: "late", Value: []byte("1")})
}Producer пишет через ProduceSync с ключом shop-XX (80 кофеен Brew по дефолту) — partitioner раскладывает их по 3 партициям детерминированно.
Aggregator (-role=aggregator) подписывается на топик в группе lecture-07-01-aggregator, стартует с AtEnd() (старые события из тестовых прогонов не интересны). На каждой записи считает два штампа времени:
processingTime := time.Now()
eventTime := processingTime
for _, h := range rec.Headers {
switch h.Key {
case "event-time":
if t, ok := decodeUnixNano(h.Value); ok {
eventTime = t
}
case "late":
late = string(h.Value) == "1"
}
}
agg.add(eventTime, processingTime, late, missing)Внутри add — одна и та же запись инкрементирует счётчики в двух мапах: byEventTime[eventTime.Truncate(window)]++ и byProcessing[processingTime.Truncate(window)]++. Truncate-окно — стандартный приём для tumbling-окон: 14:23:47 при размере окна 1m даёт ключ 14:23:00.
Раз в print (5s по умолчанию) фоновая горутина дёргает snapshot и печатает таблицу:
[15:42:11] total=512 late=48 no-header=0
WINDOW BY EVENT-TIME BY PROCESSING-TIME DIFF
15:38 3 0 +3
15:39 12 0 +12
15:40 38 0 +38
15:41 67 100 -33
15:42 13 412 -399
---Что тут видно. В колонке BY PROCESSING-TIME всё практически в текущей минуте — мы и есть «сейчас», поэтому процессинг попадает в одно или два последних окна. В BY EVENT-TIME распределение размазано назад на 4–5 минут: события, пришедшие сейчас, но c event-time из 15:38 или 15:39, легли в свои настоящие минуты. DIFF показывает, насколько одно и то же событие легло «не туда» в разрезе processing-time. Поздние события (5%–10% по умолчанию) уехали ещё дальше назад — они и составляют тот шум в окнах 15:38–15:39, которые в processing-time уже «закрыты».
Если бы у нас был watermark с tolerance 1 минута, окно 15:38 закрылось бы где-то в 15:39:30 — и записи, прибывшие в 15:42, оказались бы late events. Их можно было бы дропнуть, отправить в side-output, или ретроактивно обновить эмитнутое значение. Любой выбор — компромисс. Для метрик в дашборде обычно дропаем; для финансовых отчётов — обновляем; для аналитики — отправляем в late-topic для пост-обработки.
Запуск
Стенд должен быть поднят (docker compose up -d из корня).
Один раз создать топик:
make topic-createВ одном терминале — producer:
make run-eventsВ другом — aggregator:
make run-aggregatorРаз в 5 секунд aggregator печатает таблицу окон. Подожди минуту-две, чтобы заполнились несколько окон — расхождение между event-time и processing-time станет очевидным.
Покрутить параметры:
make run-events RATE=10ms LATE_PROB=0.30 # быстрее и с большим хвостом поздних
make run-aggregator WINDOW=30s PRINT=2s # окна по 30 сек, печать каждые 2 секПрибрать после лекции:
make topic-deleteЗачем это всё
Несколько вещей, которые отсюда стоит унести.
- Event-time — это ваш источник правды для аналитики. Processing-time гладкий и приятный, но он про вашу систему, не про реальность. Если бизнес считает «выручку за минуту», считайте по event-time. Если SRE считает «нагрузку на кластер» — processing-time.
- Tumbling-окна — самое простое и правильное по умолчанию. Не перекрываются. Понятная семантика. Каждое событие в одном окне ровно один раз. Hopping/sliding/session берите только когда tumbling реально не подходит.
- Late events — это норма. Разница в десятки секунд возникает мгновенно при любой сетевой нестабильности. Минуты — при offline-устройствах. Часы и сутки — при retry'ях из мёртвой очереди. Любая стрим-система должна явно отвечать на вопрос «что делаем с поздними».
- Watermark — это не точная наука. Это эвристика. Слишком короткий — окно закрывается рано, теряем данные. Слишком длинный — окно эмитится поздно, дашборд тормозит. Tuning'уется под профиль трафика.
В Stream processing на Go (franz-go + Pebble) возьмём это и положим на стрим, который реально хранит state — топ напитков Brew на franz-go + Pebble + changelog topic. После рестарта state восстанавливается из changelog, и накопленные подсчёты не теряются. Это уже близко к настоящему stream processing'у.