Offsets и retention
Ночью в Brew у notification-service отвалился процесс. Под утро on-call поднял его за минуту, но три часа сервис лежал. За это время в brew.orders.v1 приехало тысяч двадцать OrderPlaced и столько же PaymentReceived. Вопрос дежурному: что сервис должен сейчас читать? Перечитать всё, что пропустил? Только новое? И где Kafka вообще помнит, докуда он успел дочитать до падения?
Эта лекция отвечает на оба вопроса. Первый - про offset: каждое сообщение в партиции имеет свой номер, и консьюмер хранит закладку «я был тут». Второй - про retention: Kafka сама удаляет старые сообщения по таймеру, и закладка может оказаться в куске лога, которого уже нет на диске.
Offset - просто номер записи
Партиция это упорядоченный append-only лог (смотри Топики и партиции). Записи в этом логе пронумерованы по порядку: 0, 1, 2, 3 и так далее. Этот номер и есть offset. Брокер выдаёт его при записи: продьюсер пишет OrderPlaced, брокер отвечает «принял, partition=2 offset=17». Дальше пара (partition, offset) идентифицирует сообщение однозначно и навсегда.
Аналогия для бэкендера с PostgreSQL опытом. Offset похож на ctid или последовательный id строки в таблице, но без UPDATE'ов: запись с offset=10 всегда была раньше записи с offset=11, и так останется. Это и есть базовая гарантия порядка внутри партиции.
Несколько свойств, которые удобно проговорить сразу:
- offset живёт в одной партиции. Между партициями offset'ы независимы. Партиция-0 на offset=42 и партиция-1 на offset=42 это разные сообщения, никак не связанные.
- offset выдаёт брокер, не клиент. Клиент не может попросить «запиши, пожалуйста, под offset=100», брокер сам решает следующий номер.
- offset монотонно растёт. Дырок в номерах не бывает (точнее, дырки возможны у idempotent producer'а после ретраев, но в модели курса считаем их деталью).
- offset переживает рестарт брокера. Это число лежит на диске рядом с сообщением, не в RAM. Перезапуск Kafka не меняет нумерацию.
У партиции в каждый момент времени есть две границы. Earliest это offset самого старого живого сообщения, latest это offset, который получит следующая запись (на единицу больше offset'а последнего лежащего сообщения). На пустой партиции earliest=latest=0. Когда order-service пишет в топик, latest растёт. Когда retention сметает старые сегменты, earliest растёт. Лог как бы течёт - заливается сверху, утекает снизу.
LEO, HWM и leader epoch - под микроскопом
Тут начинается путаница, в которой полезно разобраться один раз.
Log End Offset (LEO) это позиция, куда leader партиции запишет следующее сообщение. То есть offset «следующей записи», которой ещё нет. У leader'а и каждого follower'а свой LEO; у follower'а он обычно слегка отстаёт, потому что follower тянет данные асинхронно (про роли смотри Репликация и ISR).
High Watermark (HWM) это offset, до которого консьюмер имеет право читать. HWM равен минимальному LEO среди реплик в ISR. Идея простая: пока сообщение не подхватили все ISR-реплики, видеть его никому нельзя. Иначе после failover'а получится так, что новый leader не помнит того, что уже увидел консьюмер - читаемое прошлое исчезло после переключения. Kafka такого не допускает.
Между LEO лидера и HWM есть зазор - это записи, которые leader уже принял, но ISR ещё не догнал. Они физически лежат в логе, но консьюмеру их не видно.
Leader Epoch это счётчик, который тикает на каждой смене leader'а. Нужен для корректного обрезания лога follower'ов после переключения - редкая инвариантная штука, которая чинит сложные баги failover'а. Знать достаточно, что он есть; разбирать в этом курсе мы его не будем.
В нашем коде kadm.ListEndOffsets возвращает offset, эквивалентный HWM (для in-sync клиента это LEO лидера, ограниченный сверху ISR - Kafka не отдаёт записи, которые ещё не зафиксированы).
partition: brew.orders.v1-0
earliest latest = HWM
│ │
▼ ▼
┌──────────────────────────────────────────────┐
│ msg msg msg msg msg msg msg msg msg msg msg │
└──────────────────────────────────────────────┘
offset: 17 18 19 20 21 22 23 24 25 26 27 ◄── сюда придёт следующий OrderPlaced
retained = latest - earliest = 27 - 17 = 10
старые сегменты (offsets 0..16) уже удалил retentionЗакладка консьюмера - committed offset
Brokers выдают offset на запись. А кто помнит, докуда дочитал notification-service? Сам сервис помнит у себя в памяти, пока живой. Но если процесс упал и его подняли через три часа - память пуста. Нужен внешний механизм закладки.
Этот механизм называется committed offset. Консьюмер периодически говорит Kafka: «группа notification-service, топик brew.orders.v1, partition=0 - я обработал записи 0..41, в следующий раз начни с 42». Это и есть commit.
Где Kafka хранит эти закладки? В служебном топике __consumer_offsets. Внутри стенда у него 50 партиций (offsets.topic.num.partitions=50 по дефолту), replication factor совпадает с KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR (на стенде Brew это 3). Каждая запись в __consumer_offsets это обычное Kafka-сообщение с ключом (group, topic, partition) и значением «committed offset, метаданные».
Важная штука. Committed offset это указатель на следующую запись для чтения, не на последнюю обработанную. То есть «committed=42» значит «я обработал 0..41, читай дальше с 42». Это банальная деталь, но на ней спотыкаются: легко записать lastProcessed вместо lastProcessed+1 и потом каждый раз перечитывать одно и то же сообщение.
Когда notification-service поднимется, он сходит в __consumer_offsets, найдёт там запись для своей группы и партиции 0, прочитает committed offset (например, 4781) и начнёт fetch с 4781. Все 20 тысяч OrderPlaced, приехавших за три часа простоя, дочитаются по очереди - Kafka работает как очередь догоняющего чтения.
При первом запуске группы committed offset'а просто нет. Что делать? Это решает параметр auto.offset.reset. Значение latest означает старт с конца - группа видит только новые события, всё, что было до запуска, пропускает. Значение earliest означает старт с начала - группа перечитает весь лог с offset=0, полезно для аналитики или восстановления состояния. Значение none падает с ошибкой - полезно в системах, где «забывание» позиции должно быть видно явно и требует ручного разбора.
Для notification-service Brew стоит latest: на первом запуске не хочется засыпать клиентов пуш-уведомлениями за три недели. Для analytics-service, который пересчитывает агрегаты, наоборот earliest. Подробности про сам топик __consumer_offsets (формат записей, как Kafka выбирает партицию закладки по hash(group)) разберём в Коммиты offset'ов, здесь фиксируем модель: позиция консьюмера живёт отдельно от данных, в служебном compacted-топике.
Retention - две оси, по которым лог стареет
Параметр, который отвечает на «сколько сообщения живут в Kafka». Их два.
retention.ms - по времени. Сегмент лога, в котором последняя запись старше retention.ms миллисекунд, признаётся устаревшим и удаляется целиком. Дефолт - 7 дней (604800000).
retention.bytes - по объёму. Когда суммарный размер партиции на диске превышает retention.bytes, самые старые сегменты удаляются, пока размер не вернётся в норму. Дефолт -1, то есть лимита по объёму нет.
Параметры не взаимоисключающие. Сегмент удаляется, если он попал под любой из лимитов. На критичных топиках Brew ставит оба: время для гарантии «у консьюмеров есть N дней на догон», объём чтобы случайный пик трафика не съел диск. Конкретно для brew.orders.v1 это выглядит так:
retention.ms = 2592000000 # 30 дней
retention.bytes = 53687091200 # 50 GiB на партицию
cleanup.policy = delete
segment.ms = 86400000 # 1 деньТут же - самый частый прокол новичков. «У нас retention.ms=86400000, значит сообщения живут ровно сутки.» Нет. Живут минимум сутки и максимум сутки плюс длительность активного сегмента. Активный сегмент (тот, в который сейчас пишут) не удаляется никогда. Retention смотрит на время последней записи в сегменте, не на время каждого отдельного сообщения. Сообщение, попавшее в сегмент в самом начале его жизни, проживёт retention.ms + segment.ms - пока сегмент закроется, состарится и попадёт под уборку.
И ещё. Уборка отложенная. Брокер запускает retention-checker раз в log.retention.check.interval.ms (дефолт 5 минут). На стенде Brew это значение по дефолту - поэтому в демо ниже earliest будет ползти дискретными прыжками раз в несколько минут, а не плавно.
Retention по топикам Brew
Каждый топик в Brew получил свой retention под профиль использования:
brew.orders.v1- 30 дней. Аналитики строят воронки за месяц, восстановление состояния заказа после бага требует replay.brew.payments.v1- 30 дней. Идёт синхронно с заказами, финансовый аудит запрашивает выборки за месяц.brew.kitchen.v1- 7 дней. Оперативные события готовки, дольше недели никто не смотрит.brew.shipments.v1- 7 дней. Трекинг курьеров живёт коротко, после доставки запись становится бесполезной.
А что если бухгалтерия требует хранить финансовые события 7 лет под compliance? Kafka на эту роль не подходит. Долгое хранение в Brew выгружается ночным job'ом из brew.payments.v1 в S3 (или в озеро данных - в зависимости от инфраструктуры). Kafka в этом сценарии быстрый буфер на месяц, S3 архивное хранилище на годы. Никто не пытается заставить Kafka хранить семь лет: и дорого, и неэффективно (S3 в 50 раз дешевле за гигабайт), и не для того она.
cleanup.policy - delete и compact
Раз уж тронули __consumer_offsets, добавлю двумя словами про сам параметр уборки. У топика есть cleanup.policy, и он отвечает на вопрос «как именно Kafka вычищает старое». Доступно четыре поведения:
delete- стандартное поведение, дефолт. Старые сегменты удаляются по retention.ms / retention.bytes. Это про обычные топики событий:brew.orders.v1,brew.payments.v1,brew.kitchen.v1,brew.shipments.v1- все наdelete.compact- log compaction. Сегменты целиком тут не удаляются. Уходят старые версии каждого ключа - самая свежая запись с ключомKостаётся жить, пока не появится новая с тем же ключом. Это про state-топики: текущий профиль клиента, текущий конфиг, committed offset группы.delete,compact- гибрид. Сегмент сжимается по ключу, а потом то, что старше retention, удаляется полностью. Полезно, когда нужен и snapshot, и временная граница.- Без параметра. Топик унаследует дефолт брокера (
log.cleanup.policy, обычноdelete). На проде Brew event-топики явно ставятcleanup.policy=delete(видно в конфигеbrew.orders.v1выше и в кодеload-and-watchниже) - читается прямо в манифесте топика, без оглядки на дефолты брокера.
Топик __consumer_offsets на compact именно поэтому, как мы сказали выше: closing offset'ов миллионы, но интересна всегда только последняя позиция группы. Compaction разберём подробно в Retention и compaction, там она живёт по делу. Тут достаточно знать, что вариантов несколько и что __consumer_offsets использует compact.
Что показывает load-and-watch
cmd/load-and-watch/main.go готовит маленькую песочницу retention'а на отдельном тренировочном топике brew.orders.retention-demo. Настоящий brew.orders.v1 с 30-дневным retention'ом утилита не трогает - это сделано умышленно, чтобы соседние лекции 01-05 и 01-06 потом работали с чистым brew.orders.v1, а не получили партицию, на которой записи испаряются через минуту. Создаёт демо-топик с partitions=3, rf=3, retention.ms=60000 (минута), segment.ms=10000 (десять секунд). Это демо-цифры - они нужны, чтобы за пять минут увидеть, как старые сегменты удаляются.
Идемпотентно. Если демо-топик уже есть, конфигурация подгоняется через AlterTopicConfigs - не падает и не застревает с прошлым retention. Дальше пишет 100 «заказов» через ProduceSync с ключами order-0..order-99 и payload вида OrderPlaced order_id=order-N - изображаем шквал заказов на пятничном промо (про hash-партиционирование смотри Ключи и партиционирование).
Конфиги топика передаются прямо в CreateTopic четвёртым аргументом - мапой name → *string:
configs := map[string]*string{
"retention.ms": kadm.StringPtr(strconv.FormatInt(o.retention.Milliseconds(), 10)),
"segment.ms": kadm.StringPtr(strconv.FormatInt(o.segment.Milliseconds(), 10)),
"cleanup.policy": kadm.StringPtr("delete"),
}
resp, err := admin.CreateTopic(rpcCtx, o.partitions, o.rf, configs, o.topic)После записи 100 сообщений стартует тикер на 10 секунд. На каждом тике:
- Пишет один heartbeat-сообщение
hb-N. Зачем - ниже. - Дёргает
kadm.ListStartOffsets(earliest = log start offset). - Дёргает
kadm.ListEndOffsets(latest = HWM). - Печатает таблицу: PARTITION / EARLIEST / LATEST / RETAINED, плюс TOTAL.
В коде это два запроса подряд, оба возвращают мапу (topic, partition) → offset:
starts, err := admin.ListStartOffsets(rpcCtx, topic) // earliest = log start
ends, err := admin.ListEndOffsets(rpcCtx, topic) // latest = HWM
starts.Each(func(o kadm.ListedOffset) {
rows = append(rows, row{partition: o.Partition, earliest: o.Offset})
})
for i := range rows {
if eo, ok := ends.Lookup(topic, rows[i].partition); ok && eo.Err == nil {
rows[i].latest = eo.Offset
}
}
// retained := latest - earliest - сколько сообщений сейчас живётHeartbeat'ы здесь не для красоты. Сегмент закрывается по segment.ms от момента последней записи в него, а активный сегмент никогда не удаляется. Без heartbeat'ов после первых 100 сообщений активный сегмент завис бы вечно - retention ничего бы не съел, потому что весь лог сидит в одном незакрытом сегменте. Heartbeat каждые 10 секунд двигает текущий сегмент: он закрывается по segment.ms, на его место открывается новый, а закрытый теперь может попасть под retention.
Что увидит запустивший:
[16:42:11] heartbeats=0
PARTITION EARLIEST LATEST RETAINED
0 0 34 34
1 0 33 33
2 0 33 33
TOTAL 0 100 100
---Старт. Все 100 сообщений на месте. EARLIEST везде 0.
[16:43:21] heartbeats=7
PARTITION EARLIEST LATEST RETAINED
0 0 36 36
1 0 35 35
2 0 36 36
TOTAL 0 107 107
---Через минуту LATEST подрос (heartbeat'ы добавились), EARLIEST всё ещё 0. Старые сегменты уже устарели, но retention-checker ещё не пробежал.
Через несколько минут (5-7 на стенде с дефолтным log.retention.check.interval.ms=300000):
[16:48:31] heartbeats=37
PARTITION EARLIEST LATEST RETAINED
0 34 47 13
1 33 45 12
2 33 45 12
TOTAL 100 137 37
---Вот тут интересное. EARLIEST на каждой партиции прыгнул с 0 до 33-34. Retention-checker запустился, нашёл сегменты, у которых max timestamp старше 60s, и удалил их целиком. Вместе с ними ушли исходные 100 записей: они теперь не читаются никем. RETAINED показывает «сколько сообщений сейчас в логе» - около 12 на партицию (это свежие heartbeat'ы).
Если оставить программу работать дальше, картина продолжит дрейфовать вправо. EARLIEST догоняет LATEST с задержкой retention.ms + segment.ms + log.retention.check.interval.ms - около 6-7 минут.
Этот сценарий - микромодель того, что произошло бы с notification-service после трёхчасового даунтайма, если бы retention brew.orders.v1 стоял меньше трёх часов. Сервис вернулся бы, сходил за committed offset в __consumer_offsets, получил бы, например, 5000. Сходил бы в брокер за записями с 5000 - и получил OFFSET_OUT_OF_RANGE, потому что retention уже снёс этот диапазон. Дальше поведение зависит от auto.offset.reset: latest пропустит пропущенное и продолжит с конца, earliest начнёт с текущего earliest (а не с 5000), none упадёт с ошибкой. У Brew retention на orders 30 дней именно для того, чтобы такое не случалось при типовых инцидентах.
Запуск
Стенд должен быть поднят (docker compose up -d из корня).
make runВ соседнем терминале параллельно полезно сравнить с CLI:
make topic-describeПолучишь kafka-topics.sh --describe (RF, partitions, leader/replicas/ISR - картина из Топики и партиции и Репликация и ISR) плюс kafka-configs.sh --describe, в котором видны заданные retention.ms=60000, segment.ms=10000, cleanup.policy=delete.
Перезапустить с чистого листа:
make run RECREATE=trueПотрогать руками retention пожёстче - поставить retention=10s, segment=5s:
make run RETENTION=10s SEGMENT=5sПрибрать после лекции:
make topic-deleteЧто забрать с собой
Практические следствия:
- «Хранится X дней» значит до X дней плюс длительность сегмента. Если контракт с потребителями требует «гарантированно есть последние 7 дней» - retention.ms ставится на 7 дней с запасом, не ровно. Активный сегмент висит до закрытия и съедает время сверху.
- Earliest растёт сам. Консьюмер, отставший по lag'у больше retention'а, при попытке прочитать свою позицию получит
OFFSET_OUT_OF_RANGE. Её просто нет в логе. Это штатное поведение Kafka. Поведение настраивается черезauto.offset.reset(latest/earliest/none) - детали в Коммиты offset'ов. - Committed offset это указатель на следующую запись. Не на последнюю обработанную. Перепутаешь - получишь либо вечное перечитывание одного сообщения, либо тихую потерю записи на старте.
- Retention.bytes твой друг. Без него один кривой продьюсер с раздутыми сообщениями забьёт диск брокера за ночь. На критичных топиках Brew всегда стоят оба лимита, время и объём. Дополнительный диск, заказанный за день до промо, спас бы Brew от пары инцидентов.
Дальше в Первый продьюсер на franz-go пишем первое OrderPlaced руками и видим, как offset, который вернёт брокер из ProduceSync, ложится в ту самую модель, которую мы тут разобрали.