0 / 42 (0%)

Schema Registry

В Protobuf в Go producer писал в Kafka сырые protobuf-байты, а в headers клал строку schema: orders.v1.Order. Это работает ровно до тех пор, пока order-service единственный, кто пишет в brew.orders.v1. Дальше — типичная история роста Brew. Аналитику заказов вынесли в отдельный сервис на Python со своим генерёным кодом и со своим взглядом на то, как маршалить. Кто-то поменял поле в Order, забыл сказать кухне. Кто-то решил подставить туда JSON «временно, на демку». Через полгода в топике лежит зоопарк из четырёх несовместимых форматов, и никто уже не помнит, какой из них «правильный».

Schema Registry — это попытка лечить именно эту боль. Один источник правды для схем. Любой продьюсер перед записью получает у него schema_id и кладёт его в первые байты payload'а. Любой консьюмер по этому id находит схему — а уже зная схему, парсит сообщение правильно.

В лекции — как это устроено на проводе, как franz-go умеет такое делать через sr.Serde. Заодно где у подхода тонкое место.

Что это вообще такое

Confluent Schema Registry — отдельный сервис, REST API. У него своё хранилище: ключ — пара (subject, version), значение — текст схемы плюс её тип. Нотации SR поддерживает разные — у нас Protobuf, рядом обычно живёт Avro. При регистрации сервис возвращает globally-unique schema_id — целое число, которое и поедет в Kafka.

Subject — это «логическое имя контракта». Конвенция по умолчанию (TopicNameStrategy) — <topic>-value для значений и <topic>-key для ключей. Внутри одного subject'а живёт цепочка версий — добавил поле в схему, зарегистрировал новую версию, получил новый id, старые id никуда не делись. Об эволюции — Эволюция схем.

В нашем стенде SR крутится на http://localhost:8081. У него есть пара десятков ручек, нам нужны вот эти:

  • POST /subjects/<subject>/versions — зарегистрировать схему, получить id.
  • GET /schemas/ids/<id> — взять текст схемы по id.
  • GET /subjects — список всех subject'ов в реестре.
  • DELETE /subjects/<subject> — удалить subject (soft).

Всё остальное — варианты этих четырёх плюс админка.

Wire format

Тут самое неочевидное. SR никак не трогает байты в Kafka напрямую — это просто отдельный HTTP-сервис. Но он диктует формат payload'а, который должны соблюдать все клиенты. Иначе никакой совместимости не получится.

Confluent wire format для значения:

plaintext
+---------+---------------+----------------+----------------+
| 1 байт  |    4 байта    |   N байт       |   M байт       |
| magic=0 |  schema_id    |  message-index |  proto/avro    |
|         |  (big-endian) |  (только PB)   |  payload       |
+---------+---------------+----------------+----------------+

Сначала — нулевой magic byte. Дальше четыре байта schema_id в big-endian. Дальше — message-index: массив zigzag-варинтов с префиксом длины (для protobuf), в котором лежит путь к нужному message внутри .proto-файла (несколько top-level message'ей — массив выбирает один; вложенные message'и — полный путь через дерево). Для самого частого случая [0] (первый top-level message) реализован shortcut: один байт 0x00 вместо двух ([len=1][idx=0]). Только потом — сам сериализованный payload.

Для Avro и JSON message-index не нужен — они однозначно описывают одно сообщение, magic byte и id хватает. Protobuf исторически такой, потому что один .proto может содержать произвольное число message'ей.

В коде это всё прячется за sr.Serde. Но раз в жизни полезно увидеть руками. В нашем producer'е я печатаю magic и id ровно так, как они лежат в первых пяти байтах:

plaintext
ok  order_id=ord-00000 status=ORDER_STATUS_PLACED    magic=0x00 schema_id=1 bytes=67 -> ...

Если вдруг в первых пяти байтах что-то не то — Kafka тут ни при чём, проблема в клиенте, который пишет «не то».

sr.Serde — как это собрано в franz-go

franz-go/pkg/sr — родной для franz-go SR-клиент. Внутри две вещи: sr.Client (HTTP к реестру) и sr.Serde (склейка id + encode/decode для конкретного типа).

Serde работает по простой модели. Один раз регистрируешь связку «schema_id + Go-тип + EncodeFn». Дальше Encode(v) сам прибавит правильный заголовок (magic byte и schema_id, для protobuf — ещё и message-index), а Decode(b, &v) пройдёт обратный путь: распарсит первые пять байт и применит зарегистрированную DecodeFn к остатку.

Вот ядро регистрации в продьюсере:

go
serde := sr.NewSerde()
serde.Register(
    id,
    &ordersv1.Order{},
    sr.EncodeFn(func(v any) ([]byte, error) {
        return proto.Marshal(v.(*ordersv1.Order))
    }),
    sr.Index(0),
)

Тут видно три кусочка. id — это глобальный schema_id, который мы получили от SR ещё до создания Serde. sr.EncodeFn — обычная обёртка над proto.Marshal, никакого волшебства. sr.Index(0) — обязательный для Protobuf message-index, который говорит: «top-level Order, без вложений». Если бы в одном .proto лежал ещё один message и мы хотели бы кодировать второй — было бы sr.Index(1).

Encode дальше выглядит обыденно:

go
payload, err := serde.Encode(order)
// payload = [0x00, id_b3, id_b2, id_b1, id_b0, 0x00, ...proto bytes...]

Не забудь — payload в Kafka кладётся целиком в Record.Value. Никаких отдельных headers под schema_id, никаких костылей. Wire format — внутри value.

Регистрация схемы — что и когда

Самый частый вопрос: «когда я регистрирую схему?». В разных подходах ответ разный.

Первый вариант — статически, через CI. В пайплайне есть шаг, который смотрит на .proto-файлы в репо, дёргает SR, сверяется с зарегистрированными версиями, регистрирует новые. Прод-код стартует с уже известным id (или находит его через subject lookup). Этот вариант чище и больше подходит для регулируемых сред.

Второй — динамически, при старте приложения. Сервис при запуске сам зарегистрирует свою схему и закеширует id в памяти. Прост, но даёт сервису право писать в Registry — иногда это не то, чего ты хочешь.

В лекции я показываю второй вариант, потому что он короче и нагляднее. На проде — обычно первый.

Регистрация в нашем producer'е — это пять строк:

go
cl, err := sr.NewClient(sr.URLs(url))
ss, err := cl.CreateSchema(ctx, subject, sr.Schema{
    Schema: orderProtoSchema,
    Type:   sr.TypeProtobuf,
})
return ss.ID, nil

orderProtoSchema — это просто текст моего .proto-файла, зашитый константой в main.go. Лучше так, чем читать файл рантайм-FS — меньше сюрпризов с относительными путями. Если та же схема уже зарегистрирована (по нормализованному содержимому) — SR вернёт тот же id, новой версии не появится. Это удобно: можно гонять make run-producer сколько угодно раз, в реестре будет ровно одна запись.

Что показывает наш consumer

Consumer интереснее. Он не знает заранее, какой schema_id придёт в первом сообщении. И вообще, в одном топике могут лежать сообщения от разных версий одной схемы — и каждая со своим id.

Поэтому стратегия простая. Распаковали id из первых пяти байт. Если он у Serde уже есть — декодим сразу. Если нет — сходили в SR за схемой, зарегистрировали под этот id DecodeFn, и только потом декодим. Каждый новый id обходится в один HTTP-запрос, дальше кешируется внутри Serde.

Сердце цикла:

go
id, _, err := serde.DecodeID(rec.Value)
if _, ok := registered.Load(id); !ok {
    schema, err := srCl.SchemaByID(ctx, id)
    serde.Register(
        id,
        &ordersv1.Order{},
        sr.DecodeFn(func(b []byte, v any) error {
            return proto.Unmarshal(b, v.(*ordersv1.Order))
        }),
        sr.Index(0),
    )
    registered.Store(id, struct{}{})
}
 
var order ordersv1.Order
if err := serde.Decode(rec.Value, &order); err != nil { ... }

Тут есть тонкое место. Я делаю proto.Unmarshal в локальный тип ordersv1.Order — то есть consumer всё равно знает, какой Go-тип ожидать. Schema Registry сам по себе не выдаёт generated Go-код, он отдаёт только текст .proto. Превращать этот текст в Go-структуры на лету никто обычно не пытается (это технически возможно через dynamicpb, но громоздко).

Это значит: SR полезен для валидации wire format'а и управления эволюцией, но кодген всё равно остаётся на стороне разработчика. Поменялась схема — производишь новый Go-код через buf generate, перевыпускаешь сервис.

Динамический Decode (через dynamicpb) применяют разве что в инструментах — kcat, kafka-ui, всякие тестовые утилиты, иногда дебаг-сайдкары. Прод-сервис обычно зашит на конкретную версию.

Печать сама по себе — обычные generated getter'ы, как в Protobuf в Go:

go
fmt.Printf("--- %s/%d@%d key=%s schema_id=%d ---\n",
    rec.Topic, rec.Partition, rec.Offset, string(rec.Key), schemaID)
fmt.Printf("  status       = %s\n", o.GetStatus().String())
if ts := o.GetCreatedAt(); ts != nil {
    fmt.Printf("  created_at   = %s\n", ts.AsTime().Format("2006-01-02 15:04:05Z07:00"))
}

schema_id я вытаскиваю в каждое сообщение — на проде это полезно для отладки: «а какой версией продьюсер записал вот эту запись» сразу видно.

Кеширование и производительность

Один важный нюанс про SR — он живёт отдельно. HTTP-вызов к нему стоит латенси. Если на каждое kafka-сообщение лезть в /schemas/ids/... — производительность развалится мгновенно.

sr.Serde решает это так: после первого Register(id, ...) весь декод идёт по локальной map'е, без HTTP. SR трогается ровно один раз на каждый новый id. Если producer'ы держат стабильные id (не меняют схему каждые 5 минут) — это меньше десятка вызовов за всё время жизни консьюмера.

Producer'у проще. Один HTTP-запрос при старте — регистрирует схему, получает id. Дальше Encode идёт по памяти, без хождения в Registry. SR на горячем пути не светится вообще.

В нашей лекции consumer лезет в SR один раз — на самом первом fetch'е увидит незнакомый id, сходит за схемой, закеширует в Serde, дальше летит по памяти. Лог честно показывает этот момент:

plaintext
INFO msg="registering schema id" id=1 type=PROTOBUF

После этой строки уже никаких HTTP к SR не будет.

Что лежит в сообщении: смотрим руками

Иногда хочется проверить, что producer действительно пишет правильный wire format. Самый дешёвый способ — kcat с флагом -s value=s (декодировать значение через SR):

sh
kcat -b localhost:19092 -t lecture-05-03-orders-sr -C -e -o beginning \
     -s value=s -r http://localhost:8081

Если установлен — увидишь распарсенные сообщения. Если нет — можно дёрнуть SR напрямую:

sh
make sr-list-subjects        # покажет lecture-05-03-orders-sr-value
make sr-describe             # вернёт текст схемы и её id

Эти ручки хорошо помогают, когда что-то пошло не так. Сообщение не парсится — глянь, та ли схема в реестре. Subject пропал — кто-то стёр (DELETE /subjects/... бывает разрушительным, особенно с ?permanent=true).

Запуск

Нужны бинари в $PATH:

sh
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install github.com/bufbuild/buf/cmd/buf@latest

Последовательность:

sh
make proto-gen          # сгенерить gen/orders/v1/order.pb.go
make topic-create       # lecture-05-03-orders-sr, RF=3, 3 партиции
make run-producer       # registers schema in SR + writes 10 Order'ов
make sr-list-subjects   # cross-check: subject появился
make sr-describe        # глянуть текст и id
make run-consumer       # читает, печатает schema_id у каждой записи

При первом прогоне в логе producer'а увидишь schema registered subject=... id=N. Запомни N — это и есть глобальный id, который теперь физически живёт в первых 5 байтах каждого value в этом топике. У consumer'а после первого fetch'а появится строка registering schema id=N type=PROTOBUF. Это он сходил в SR за определением и зарегистрировал DecodeFn под этот id.

Если запустить make run-producer ещё раз — id не изменится. SR увидит ту же схему (по нормализованному содержимому) и вернёт ту же запись.

На что обратить внимание

  • Я зашил текст .proto константой прямо в cmd/producer/main.go. Это не идеально для прода — если схема меняется, надо не забыть перегенерить и Go-код, и константу. На реальном сервисе обычно либо embed.FS от файла proto/orders/v1/order.proto, либо отдельный пайплайн для регистрации. В лекции зашил константу для самодостаточности.
  • TopicNameStrategy (<topic>-value) — самая частая, но не единственная. Есть RecordNameStrategy (subject = полное имя message типа) и TopicRecordNameStrategy (комбинация). Если в одном топике лежат разные message-типы — TopicNameStrategy не подходит, перепрыгивай на RecordName. В курсе не разбираем — это редкий случай.
  • sr.Index(0) для top-level message — обязателен для Protobuf, иначе магического байта 0x00 после schema_id не появится и Java-клиент Confluent'а не сможет это прочитать. Avro и JSON этого не требуют.
  • sr.Serde хранит mapping id -> tserde под atomic.Value. Это значит — потокобезопасно для чтения, регистрация под мьютексом. Использовать один Serde из множества goroutine'ов можно (и нужно).
  • DELETE на subject — операция, к которой стоит относиться осторожно. Soft-delete (DELETE /subjects/<sub>) удаляет subject, но id остаётся в реестре под своим ключом — старые сообщения в Kafka всё ещё можно декодировать. Hard-delete (?permanent=true) стирает id окончательно, и старые сообщения становятся нечитаемыми. В Makefile у нас clean делает hard-delete для воспроизводимости — на проде так делать не надо.
  • gen/ — в репо. Те же аргументы, что и в Protobuf в Go: воспроизводимость без buf на чистом clone'е, ревьюер видит, как сгенерёный код реагирует на правку схемы.

Что дальше

В Эволюция схем — про эволюцию схем. Что значит BACKWARD/FORWARD/FULL compatibility, какие правки в Protobuf SR пропустит, а какие — нет. И как buf breaking ловит ломающие изменения ещё до того, как они доедут до SR.

А отсюда уже понятно, зачем вообще нужен Registry: он один точно знает, какие схемы в системе живут, и не пускает в реестр то, что сломает потребителей.

·Модуль 05

Этот урок ещё впереди

Курс изучается по порядку — чтобы открыть этот шаг, сначала завершите предыдущие. Так контекст накапливается без пропусков.

/ вы пытались открыть
Контракты / Schema Registry