Schema Registry
В Protobuf в Go producer писал в Kafka сырые protobuf-байты, а в headers клал строку schema: orders.v1.Order. Это работает ровно до тех пор, пока order-service единственный, кто пишет в brew.orders.v1. Дальше — типичная история роста Brew. Аналитику заказов вынесли в отдельный сервис на Python со своим генерёным кодом и со своим взглядом на то, как маршалить. Кто-то поменял поле в Order, забыл сказать кухне. Кто-то решил подставить туда JSON «временно, на демку». Через полгода в топике лежит зоопарк из четырёх несовместимых форматов, и никто уже не помнит, какой из них «правильный».
Schema Registry — это попытка лечить именно эту боль. Один источник правды для схем. Любой продьюсер перед записью получает у него schema_id и кладёт его в первые байты payload'а. Любой консьюмер по этому id находит схему — а уже зная схему, парсит сообщение правильно.
В лекции — как это устроено на проводе, как franz-go умеет такое делать через sr.Serde. Заодно где у подхода тонкое место.
Что это вообще такое
Confluent Schema Registry — отдельный сервис, REST API. У него своё хранилище: ключ — пара (subject, version), значение — текст схемы плюс её тип. Нотации SR поддерживает разные — у нас Protobuf, рядом обычно живёт Avro. При регистрации сервис возвращает globally-unique schema_id — целое число, которое и поедет в Kafka.
Subject — это «логическое имя контракта». Конвенция по умолчанию (TopicNameStrategy) — <topic>-value для значений и <topic>-key для ключей. Внутри одного subject'а живёт цепочка версий — добавил поле в схему, зарегистрировал новую версию, получил новый id, старые id никуда не делись. Об эволюции — Эволюция схем.
В нашем стенде SR крутится на http://localhost:8081. У него есть пара десятков ручек, нам нужны вот эти:
POST /subjects/<subject>/versions— зарегистрировать схему, получить id.GET /schemas/ids/<id>— взять текст схемы по id.GET /subjects— список всех subject'ов в реестре.DELETE /subjects/<subject>— удалить subject (soft).
Всё остальное — варианты этих четырёх плюс админка.
Wire format
Тут самое неочевидное. SR никак не трогает байты в Kafka напрямую — это просто отдельный HTTP-сервис. Но он диктует формат payload'а, который должны соблюдать все клиенты. Иначе никакой совместимости не получится.
Confluent wire format для значения:
+---------+---------------+----------------+----------------+
| 1 байт | 4 байта | N байт | M байт |
| magic=0 | schema_id | message-index | proto/avro |
| | (big-endian) | (только PB) | payload |
+---------+---------------+----------------+----------------+Сначала — нулевой magic byte. Дальше четыре байта schema_id в big-endian. Дальше — message-index: массив zigzag-варинтов с префиксом длины (для protobuf), в котором лежит путь к нужному message внутри .proto-файла (несколько top-level message'ей — массив выбирает один; вложенные message'и — полный путь через дерево). Для самого частого случая [0] (первый top-level message) реализован shortcut: один байт 0x00 вместо двух ([len=1][idx=0]). Только потом — сам сериализованный payload.
Для Avro и JSON message-index не нужен — они однозначно описывают одно сообщение, magic byte и id хватает. Protobuf исторически такой, потому что один .proto может содержать произвольное число message'ей.
В коде это всё прячется за sr.Serde. Но раз в жизни полезно увидеть руками. В нашем producer'е я печатаю magic и id ровно так, как они лежат в первых пяти байтах:
ok order_id=ord-00000 status=ORDER_STATUS_PLACED magic=0x00 schema_id=1 bytes=67 -> ...Если вдруг в первых пяти байтах что-то не то — Kafka тут ни при чём, проблема в клиенте, который пишет «не то».
sr.Serde — как это собрано в franz-go
franz-go/pkg/sr — родной для franz-go SR-клиент. Внутри две вещи: sr.Client (HTTP к реестру) и sr.Serde (склейка id + encode/decode для конкретного типа).
Serde работает по простой модели. Один раз регистрируешь связку «schema_id + Go-тип + EncodeFn». Дальше Encode(v) сам прибавит правильный заголовок (magic byte и schema_id, для protobuf — ещё и message-index), а Decode(b, &v) пройдёт обратный путь: распарсит первые пять байт и применит зарегистрированную DecodeFn к остатку.
Вот ядро регистрации в продьюсере:
serde := sr.NewSerde()
serde.Register(
id,
&ordersv1.Order{},
sr.EncodeFn(func(v any) ([]byte, error) {
return proto.Marshal(v.(*ordersv1.Order))
}),
sr.Index(0),
)Тут видно три кусочка. id — это глобальный schema_id, который мы получили от SR ещё до создания Serde. sr.EncodeFn — обычная обёртка над proto.Marshal, никакого волшебства. sr.Index(0) — обязательный для Protobuf message-index, который говорит: «top-level Order, без вложений». Если бы в одном .proto лежал ещё один message и мы хотели бы кодировать второй — было бы sr.Index(1).
Encode дальше выглядит обыденно:
payload, err := serde.Encode(order)
// payload = [0x00, id_b3, id_b2, id_b1, id_b0, 0x00, ...proto bytes...]Не забудь — payload в Kafka кладётся целиком в Record.Value. Никаких отдельных headers под schema_id, никаких костылей. Wire format — внутри value.
Регистрация схемы — что и когда
Самый частый вопрос: «когда я регистрирую схему?». В разных подходах ответ разный.
Первый вариант — статически, через CI. В пайплайне есть шаг, который смотрит на .proto-файлы в репо, дёргает SR, сверяется с зарегистрированными версиями, регистрирует новые. Прод-код стартует с уже известным id (или находит его через subject lookup). Этот вариант чище и больше подходит для регулируемых сред.
Второй — динамически, при старте приложения. Сервис при запуске сам зарегистрирует свою схему и закеширует id в памяти. Прост, но даёт сервису право писать в Registry — иногда это не то, чего ты хочешь.
В лекции я показываю второй вариант, потому что он короче и нагляднее. На проде — обычно первый.
Регистрация в нашем producer'е — это пять строк:
cl, err := sr.NewClient(sr.URLs(url))
ss, err := cl.CreateSchema(ctx, subject, sr.Schema{
Schema: orderProtoSchema,
Type: sr.TypeProtobuf,
})
return ss.ID, nilorderProtoSchema — это просто текст моего .proto-файла, зашитый константой в main.go. Лучше так, чем читать файл рантайм-FS — меньше сюрпризов с относительными путями. Если та же схема уже зарегистрирована (по нормализованному содержимому) — SR вернёт тот же id, новой версии не появится. Это удобно: можно гонять make run-producer сколько угодно раз, в реестре будет ровно одна запись.
Что показывает наш consumer
Consumer интереснее. Он не знает заранее, какой schema_id придёт в первом сообщении. И вообще, в одном топике могут лежать сообщения от разных версий одной схемы — и каждая со своим id.
Поэтому стратегия простая. Распаковали id из первых пяти байт. Если он у Serde уже есть — декодим сразу. Если нет — сходили в SR за схемой, зарегистрировали под этот id DecodeFn, и только потом декодим. Каждый новый id обходится в один HTTP-запрос, дальше кешируется внутри Serde.
Сердце цикла:
id, _, err := serde.DecodeID(rec.Value)
if _, ok := registered.Load(id); !ok {
schema, err := srCl.SchemaByID(ctx, id)
serde.Register(
id,
&ordersv1.Order{},
sr.DecodeFn(func(b []byte, v any) error {
return proto.Unmarshal(b, v.(*ordersv1.Order))
}),
sr.Index(0),
)
registered.Store(id, struct{}{})
}
var order ordersv1.Order
if err := serde.Decode(rec.Value, &order); err != nil { ... }Тут есть тонкое место. Я делаю proto.Unmarshal в локальный тип ordersv1.Order — то есть consumer всё равно знает, какой Go-тип ожидать. Schema Registry сам по себе не выдаёт generated Go-код, он отдаёт только текст .proto. Превращать этот текст в Go-структуры на лету никто обычно не пытается (это технически возможно через dynamicpb, но громоздко).
Это значит: SR полезен для валидации wire format'а и управления эволюцией, но кодген всё равно остаётся на стороне разработчика. Поменялась схема — производишь новый Go-код через buf generate, перевыпускаешь сервис.
Динамический Decode (через dynamicpb) применяют разве что в инструментах — kcat, kafka-ui, всякие тестовые утилиты, иногда дебаг-сайдкары. Прод-сервис обычно зашит на конкретную версию.
Печать сама по себе — обычные generated getter'ы, как в Protobuf в Go:
fmt.Printf("--- %s/%d@%d key=%s schema_id=%d ---\n",
rec.Topic, rec.Partition, rec.Offset, string(rec.Key), schemaID)
fmt.Printf(" status = %s\n", o.GetStatus().String())
if ts := o.GetCreatedAt(); ts != nil {
fmt.Printf(" created_at = %s\n", ts.AsTime().Format("2006-01-02 15:04:05Z07:00"))
}schema_id я вытаскиваю в каждое сообщение — на проде это полезно для отладки: «а какой версией продьюсер записал вот эту запись» сразу видно.
Кеширование и производительность
Один важный нюанс про SR — он живёт отдельно. HTTP-вызов к нему стоит латенси. Если на каждое kafka-сообщение лезть в /schemas/ids/... — производительность развалится мгновенно.
sr.Serde решает это так: после первого Register(id, ...) весь декод идёт по локальной map'е, без HTTP. SR трогается ровно один раз на каждый новый id. Если producer'ы держат стабильные id (не меняют схему каждые 5 минут) — это меньше десятка вызовов за всё время жизни консьюмера.
Producer'у проще. Один HTTP-запрос при старте — регистрирует схему, получает id. Дальше Encode идёт по памяти, без хождения в Registry. SR на горячем пути не светится вообще.
В нашей лекции consumer лезет в SR один раз — на самом первом fetch'е увидит незнакомый id, сходит за схемой, закеширует в Serde, дальше летит по памяти. Лог честно показывает этот момент:
INFO msg="registering schema id" id=1 type=PROTOBUFПосле этой строки уже никаких HTTP к SR не будет.
Что лежит в сообщении: смотрим руками
Иногда хочется проверить, что producer действительно пишет правильный wire format. Самый дешёвый способ — kcat с флагом -s value=s (декодировать значение через SR):
kcat -b localhost:19092 -t lecture-05-03-orders-sr -C -e -o beginning \
-s value=s -r http://localhost:8081Если установлен — увидишь распарсенные сообщения. Если нет — можно дёрнуть SR напрямую:
make sr-list-subjects # покажет lecture-05-03-orders-sr-value
make sr-describe # вернёт текст схемы и её idЭти ручки хорошо помогают, когда что-то пошло не так. Сообщение не парсится — глянь, та ли схема в реестре. Subject пропал — кто-то стёр (DELETE /subjects/... бывает разрушительным, особенно с ?permanent=true).
Запуск
Нужны бинари в $PATH:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install github.com/bufbuild/buf/cmd/buf@latestПоследовательность:
make proto-gen # сгенерить gen/orders/v1/order.pb.go
make topic-create # lecture-05-03-orders-sr, RF=3, 3 партиции
make run-producer # registers schema in SR + writes 10 Order'ов
make sr-list-subjects # cross-check: subject появился
make sr-describe # глянуть текст и id
make run-consumer # читает, печатает schema_id у каждой записиПри первом прогоне в логе producer'а увидишь schema registered subject=... id=N. Запомни N — это и есть глобальный id, который теперь физически живёт в первых 5 байтах каждого value в этом топике. У consumer'а после первого fetch'а появится строка registering schema id=N type=PROTOBUF. Это он сходил в SR за определением и зарегистрировал DecodeFn под этот id.
Если запустить make run-producer ещё раз — id не изменится. SR увидит ту же схему (по нормализованному содержимому) и вернёт ту же запись.
На что обратить внимание
- Я зашил текст
.protoконстантой прямо вcmd/producer/main.go. Это не идеально для прода — если схема меняется, надо не забыть перегенерить и Go-код, и константу. На реальном сервисе обычно либоembed.FSот файлаproto/orders/v1/order.proto, либо отдельный пайплайн для регистрации. В лекции зашил константу для самодостаточности. - TopicNameStrategy (
<topic>-value) — самая частая, но не единственная. Есть RecordNameStrategy (subject = полное имя message типа) и TopicRecordNameStrategy (комбинация). Если в одном топике лежат разные message-типы — TopicNameStrategy не подходит, перепрыгивай на RecordName. В курсе не разбираем — это редкий случай. sr.Index(0)для top-level message — обязателен для Protobuf, иначе магического байта0x00после schema_id не появится и Java-клиент Confluent'а не сможет это прочитать. Avro и JSON этого не требуют.sr.Serdeхранит mappingid -> tserdeподatomic.Value. Это значит — потокобезопасно для чтения, регистрация под мьютексом. Использовать один Serde из множества goroutine'ов можно (и нужно).- DELETE на subject — операция, к которой стоит относиться осторожно. Soft-delete (
DELETE /subjects/<sub>) удаляет subject, но id остаётся в реестре под своим ключом — старые сообщения в Kafka всё ещё можно декодировать. Hard-delete (?permanent=true) стирает id окончательно, и старые сообщения становятся нечитаемыми. В Makefile у насcleanделает hard-delete для воспроизводимости — на проде так делать не надо. gen/— в репо. Те же аргументы, что и в Protobuf в Go: воспроизводимость безbufна чистом clone'е, ревьюер видит, как сгенерёный код реагирует на правку схемы.
Что дальше
В Эволюция схем — про эволюцию схем. Что значит BACKWARD/FORWARD/FULL compatibility, какие правки в Protobuf SR пропустит, а какие — нет. И как buf breaking ловит ломающие изменения ещё до того, как они доедут до SR.
А отсюда уже понятно, зачем вообще нужен Registry: он один точно знает, какие схемы в системе живут, и не пускает в реестр то, что сломает потребителей.