Postgres → Elasticsearch
Декларативный ETL под поиск по меню Brew. Слева Postgres catalog-service с меню напитков (Drink), статьями блога Brew о кофе и справочником клиентов. Справа Elasticsearch — туда нужно затащить эти же данные так, чтобы по ним работал full-text search: клиент в мобильном приложении набирает «латте» и сразу видит подходящие позиции меню. Между ними — ничего своего, только Debezium и Elasticsearch Sink connector. Go-код в этом use case'е есть, но он диагностический: db-loader и search-tester не участвуют в pipeline'е, ими удобно проверять, что всё доехало.
Это контраст к Postgres → ClickHouse с анонимизацией. Там между Debezium и Sink'ом стоял анонимизатор на Go — ему деваться было некуда, маски PII требуют логики. Тут логики нет: схема Postgres почти один-в-один ложится на JSON-документ ES, а трансформации формата (отвернуть Debezium-конверт, выдернуть ключ, переименовать топик в индекс) тянут стандартные Single Message Transforms внутри Connect. Главное достоинство — pipeline можно собрать и поддерживать без отдельного сервиса, который надо деплоить и мониторить.
Что собираем
Postgres (15443) — catalog-service: drinks / articles / customers
│
│ WAL (logical replication, slot)
▼
kafka-connect: Debezium PostgresConnector
│
▼
brew.cdc.public.drinks / brew.cdc.public.articles / brew.cdc.public.customers
│
│ ExtractNewRecordState (unwrap)
│ ExtractField$Key (id из ключа в _id)
│ RegexRouter (brew.cdc.public.X → X_v1)
▼
kafka-connect: ES Sink connector (v1)
│
▼
Elasticsearch (19200) → indices drinks_v1 / articles_v1 / customers_v1Сравни с Postgres → ClickHouse с анонимизацией: то же количество звеньев в Connect'е, только вместо ClickHouse Sink'а — ES Sink, и плюс цепочка SMT, потому что ES любит плоский документ, а Debezium шлёт обёрнутый. Postgres-схема, publication, replication slot, RF — без изменений. CDC-топики идут по канону brew.cdc.public.<table> — те же имена, что в модуле 07.
Зачем здесь Single Message Transforms
Debezium кладёт в Kafka событие в формате envelope:
{
"before": null,
"after": {"id": 1, "name": "Латте #1", "base_price": 250, ...},
"source": {"lsn": 12345, "ts_ms": ...},
"op": "c"
}ES Sink из коробки положит этот объект как есть. Получится индекс с полем before, after, source, op — не то, что ожидаешь, когда пишешь match: {name: "латте"}. SMT в этом use case'е делают три простые вещи.
ExtractNewRecordState — из envelope достаёт after и кладёт в корень. Для DELETE-операций (op=d) after пустой, и Debezium ставит value=null — это в ES Sink триггерит behavior.on.null.values=delete, документ исчезает.
ExtractField$Key — Debezium шлёт ключ как объект {"id": 1}, а ES хочет в _id строку или число. SMT берёт поле id.
RegexRouter — название топика становится названием индекса. brew.cdc.public.drinks → drinks_v1. Регулярка brew\.cdc\.public\.(.*) плюс replacement $1_v1 решают это в одной строке. Ради переключения версий (см. ниже) v1-суффикс зашит прямо в config — для v2 берётся отдельный коннектор с $1_v2.
Index template, или почему mapping живёт в репозитории
ES умеет сам угадать типы полей по первым нескольким документам — это называется dynamic mapping. На небольшом sandbox'е работает и так, но на проде это выстреливает в ногу: первый документ пришёл с tags: ["a","b"], тип определился как text → дальше документ с tags: 42 падает с ошибкой mapping conflict. Лечится только пересозданием индекса с явным mapping'ом.
Поэтому правильно — заранее зафиксировать структуру через index template. Шаблон применяется к индексам по pattern'у имени, и любой свежесозданный drinks_v2 уже стартует с нужным analyzer'ом и типами.
{
"index_patterns": ["drinks_*", "articles_*", "customers_*"],
"template": {
"settings": {"number_of_shards": 1, "number_of_replicas": 0,
"analysis": {"analyzer": {"ru_en_text": {...}}}},
"mappings": {"dynamic": true, "properties": {
"name": {"type": "text", "analyzer": "ru_en_text"},
"base_price": {"type": "long"}, ...
}}
}
}Полная версия — в es-template.json. Анализатор тут учебный: lowercase плюс asciifolding (свернуть кириллицу-латиницу в близкие формы). На проде сюда обычно ещё подвешивают morphology-токенизатор под язык; в sandbox'е он только усложнял бы установку.
Что показывает наш код
cmd/db-loader/main.go — это просто наполнитель Postgres. Тонкий wrapper над INSERT/UPDATE/DELETE с предсказуемыми значениями (id-ы числовые, name содержит название напитка вроде «Латте», чтобы потом по нему делать match-query). Вот сердцевина — три INSERT'а в одной транзакции:
_, err := tx.Exec(ctx, `
INSERT INTO drinks (id, sku, name, description, category, base_price, stock)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`,
did,
fmt.Sprintf("DRINK-%07d", did),
fmt.Sprintf("%s #%d", drink, did),
fmt.Sprintf("Напиток %s в меню Brew. Готовится бариста...", drink),
categories[int(did)%len(categories)],
int64(150+rand.IntN(450)),
rand.IntN(100),
)Никакой работы с Kafka напрямую тут нет — Debezium читает WAL и публикует CDC сам. Это и есть смысл декларативного ETL: catalog-service пишет в Postgres и ничего не знает про существование Elasticsearch.
cmd/search-tester/main.go — обратный диагностический срез. Считает строки в Postgres, документы в ES (_count API), делает match-query по полю и печатает топ-5 хитов. Полезен, чтобы быстро понять «дошло — не дошло».
pgCount, err := countPostgres(ctx, pool, *pgTable)
esCount, err := countES(ctx, *esURL, *alias)
if pgCount != esCount {
fmt.Printf("РАСХОЖДЕНИЕ: %d (PG) vs %d (ES)\n", pgCount, esCount)
}
hits, err := matchQuery(ctx, *esURL, *alias, *matchField, *query)Если расхождение постоянное — pipeline где-то стоит. Дальше идёт make connector-status, и обычно видно либо FAILED-таску в Sink'е, либо что Debezium застрял на slot'е (Postgres держит replication slot до явного дропа, и если коннектор упал, slot копит WAL и в какой-то момент исчерпает диск).
Мульти-индекс: меню, статьи, клиенты
Три таблицы едут в три разных индекса одним и тем же pipeline'ом. drinks — меню напитков Brew, основной кейс поиска. articles — блог Brew о кофе (обжарка, рецепты, гайды), отдельный индекс для демонстрации того, что один Sink спокойно тащит несколько источников. customers — справочник клиентов, тут как пример: поиск по нему обычно нужен в админке («найти клиента по имени»), а join меню и клиентов Sink не делает — это место для отдельной search-experience-лекции.
RegexRouter маршрутизирует все три топика одной регуляркой: каждый brew.cdc.public.<table> превращается в <table>_v1. Добавить четвёртый источник — это строка в table.include.list Debezium'а и расширение topics.regex Sink'а, код не меняется.
Blue-green reindex
Это та часть, ради которой паттерн «индекс с версионным суффиксом плюс alias» вообще существует. Простой случай: добавили в drinks колонку, обновили index template, пересоздали индекс — поиск по меню временно ничего не возвращает, пока CDC не перельёт всю таблицу заново. Не годится: клиент в приложении набирает «капучино» и видит пустоту.
Решение: алиас drinks указывает на drinks_v1, приложение читает только через alias. Когда нужно переехать на новый mapping — поднимаем второй ES Sink, который пишет в drinks_v2, ждём, пока он догонит v1 (обычно это пара минут на сотнях тысяч строк), переключаем alias на v2 одним атомарным запросом, удаляем v1 sink. Приложение ничего не заметило.
Ключевой шаг — переключение alias. ES делает это атомарно, и если запрос пришёл за миллисекунду до и за миллисекунду после — он не видит «промежуточного» состояния. Под капотом:
curl -X POST http://localhost:19200/_aliases -d '{
"actions": [
{"remove": {"index": "drinks_v1", "alias": "drinks"}},
{"add": {"index": "drinks_v2", "alias": "drinks"}}
]
}'Внутри одного запроса с actions Elasticsearch гарантирует атомарность — нет момента, когда alias не указывает ни на что. Это и есть разница с тем, чтобы делать DELETE и POST по отдельности.
Цель make reindex-blue-green автоматизирует весь сценарий: создаёт v2 sink, ждёт догона, переключает alias, удаляет v1 sink. Для лекции этого хватает; на проде ещё прикручивают canary-чтение из v2 заранее (через alias с is_write_index плюс read-only alias), чтобы не переключать вслепую.
Как поднять
Корневой стенд (kafka-1/2/3, kafka-connect, schema-registry) должен быть запущен. Plugins для Connect (Debezium PostgresConnector + Confluent ES Sink) — установлены через make connect-install-plugins из корня lectures/. Это разовая операция, описана в Task 34.5 плана.
Дальше из директории use case'а:
make up # Postgres + Elasticsearch
make pg-init # таблицы + publication
make es-init # index template
make topic-create-all # CDC + DLQ топики
make connect-plugin-check # проверить plugins
make connector-create-all # Debezium + ES Sink v1
make run-load DLOAD_COUNT=200 # залить нагрузку в Postgres
make connector-status # обе таски в RUNNING
make run-search # сравнить counts + топ-5 хитовЧерез минуту-две drinks_v1 в ES должен содержать 200 документов, search-tester — найти их по слову из поля name (по умолчанию «Латте»). Если расхождение — make connector-restart (он сбросит failed task'и) и снова make connector-status.
Для blue-green демо:
make reindex-blue-green # сам создаст alias (если не было), создаст v2, догонит, переключит
make alias-show # покажет, что drinks → drinks_v2reindex-blue-green зависит от alias-init — тот идемпотентно создаёт drinks → drinks_v1. Без alias'а атомарный remove+add упадёт целиком (ES rolling back при первой неуспешной action в _aliases).
Снести всё:
make clean # удаляет коннекторы, slot, топики, контейнерыЧто проверяет integration test
test/integration_test.go (build tag integration) делает то же самое, что ручной сценарий выше, плюс ещё две проверки.
- UPDATE одной строки в Postgres → отслеживание поля name в ES до изменения. Дедлайн 90 сек, обычно укладывается за 2–5 секунд (Sink linger.ms=200 + время на CDC propagation).
- DELETE одной строки → ожидание, пока документ исчезнет (
HEAD /_doc/<id>возвращает 404). Тоже 90 сек. - Blue-green: создаётся v2 sink с уникальным suffix'ом (чтобы не пересекаться с другими прогонами), ждём догона, переключаем alias. Проверяется, что alias реально указывает на v2.
Размер N=200 — для скорости прогона на dev-машине. Паттерн идентичен любому объёму: 50k или 500k поведут себя так же, изменятся только цифры в дедлайнах. Прогон теста занимает примерно 1–2 минуты, основное время — стартап двух Connect-коннекторов и ожидание snapshot phase Debezium'а.
make up && make pg-init && make es-init && make test-integrationТест сам делает truncate Postgres, удаляет старые drinks_v* индексы, дропает старые replication slot'ы (usecase_09_04_it_%) — между прогонами руками чистить не надо.
Файлы
04-pg-to-elasticsearch/
├── README.md ← этот файл
├── Makefile ← цели up/down/connector-*/test-integration/reindex-blue-green
├── docker-compose.override.yml ← Postgres (15443) + ES (19200)
├── go.mod ← зависимости (pgx + franz-go для теста)
├── es-template.json ← index template (settings + mappings)
├── connectors/
│ ├── debezium-pg-source.json ← Debezium PostgresConnector
│ ├── es-sink.json ← ES Sink, route → *_v1
│ └── es-sink-v2.json ← то же, route → *_v2 (для blue-green)
├── db/
│ └── init.sql ← таблицы drinks/articles/customers + publication
├── cmd/
│ ├── db-loader/main.go ← INSERT/UPDATE/DELETE в Postgres
│ └── search-tester/main.go ← диагностика: PG count vs ES count + match-query
└── test/
└── integration_test.go ← E2E с blue-green reindexЧто осталось за кадром
Sandbox-вариант намеренно урезан. На проде ту же схему обычно дополняют так.
Авторизация ES — здесь xpack.security.enabled: "false", потому что одна команда учится паттерну, а не настройке security. В реальном кластере Sink работает через connection.username/connection.password или API key. Конфиг ES Sink принимает оба варианта без изменений в SMT-цепочке.
Schema Registry — здесь Debezium и Sink говорят через JsonConverter. Это удобно для дебага (открыл консоль и читаешь). Под нагрузку JSON неэффективен, и обычно цепочку переводят на Avro через SR — оба коннектора это поддерживают, надо только заменить пары *.converter и поднять SR (он уже есть в корневом стенде, лекция Schema Registry про него отдельно).
Multi-tenant индексы — в этой лекции drinks_v1 глобальный. Если меню режется по городам/франчайзи, обычно делают шаблон drinks_<tenant>_v1 через тот же RegexRouter, плюс расширенный template. Логика остаётся.
PII клиентов — индекс customers_v1 тут хранит телефон и email как есть, потому что лекция про search-паттерн, а не про приватность. В реальной поисковой витрине по клиентам PII либо не индексируют вовсе, либо маскируют на входе — ровно тем приёмом, что в Postgres → ClickHouse с анонимизацией.
Реактивный canary при reindex'е — тут переключение alias делается в одно действие. На проде сначала alias с is_write_index=true для v2 и read-only-alias на v1, какое-то время оба индекса живут, и читать можно с постепенной миграцией трафика. Реализуется тем же _aliases API, только с большим количеством actions.
ETL backfill из source-of-truth, отличного от текущего Postgres — тут Debezium со snapshot.mode=initial снимает таблицу целиком при первом запуске. Если данные приходят откуда-то ещё (S3-snapshot, dump из старой БД), backfill делают отдельным процессом, обычно через bulk-индексирование напрямую в ES, и только новые изменения идут через CDC. Паттерн: «historical bulk + live CDC» — стандартный для миграций под поиск.