Push notifications
A Brew coffee shop finishes an order, a barista hits "ready" — the customer must instantly get an "order ready" push. This use case is about how notification-service delivers that push to the customer (Customer) across three external channels. The trigger is the domain event OrderReady from brew.kitchen.v1; notification-service turns it into a push request and writes it to brew.notifications.v1. One Kafka topic at the input, three recipient channels, each with its own retry pipeline and DLQ. This assembles four lectures at once — the outbox pattern (Outbox pattern) was left out, but retry/DLQ (Retry and DLQ deep dive), CB and HMAC (Delivery to external systems), microservice guarantee with dedup (Microservice communication), and Protobuf (Protobuf in Go) are all relevant here.
What we're building
The scenario is simple. notification-service consumes OrderReady from brew.kitchen.v1, assembles an "order ready" push for the customer, and sends a Notification to notification-events (the course-convention demo topic for brew.notifications.v1). The message has a channel field. Three valid values:
firebase— push to the Brew Android app.apns— push to the Brew iOS app.webhook— HTTP call to a third-party partner-integration URL.
The service then needs to reach the corresponding external recipient over HTTP, survive failures with retry and backoff, not crash entirely when one channel fails for an extended period, avoid duplicate delivery (the customer must not get two pushes about one order), and maintain a history per notification.
Internally it looks like this:
notification-events
│
▼
notification-router (consumer)
│
├──► notification-firebase ──► firebase-sender ──► mock-firebase
│ │
│ ├──► notification-firebase-retry-30s
│ ├──► notification-firebase-retry-5m
│ └──► notification-firebase-dlq ──► firebase-dlq-consumer
│
├──► notification-apns ──► apns-sender ──► mock-apns
│ └── (same retry/dlq)
│
└──► notification-webhook ──► webhook-sender ──► mock-webhook
└── (same retry/dlq)14 topics. That sounds like a lot. Each channel gets 4 (main + retry-30s + retry-5m + dlq), one shared entry topic, and one router-DLQ for records where channel is not set at all (proto3 default = CHANNEL_UNSPECIFIED) or has an unknown value. If there were 10 channels, only the topic list would grow, not the code.
Why separate topics per channel
You could keep a single notifications topic and filter by channel in each sender. Don't. Reasons:
- channel consumer groups scale independently. If firebase is slow, apns shouldn't wait;
- retry delays can differ per channel. Five minutes is fine for a partner webhook; an "order ready" push to the customer should retry in thirty seconds — the coffee gets cold;
- DLQ is easier to read when it's immediately clear:
notification-firebase-dlq— the problem is in one channel, the others are alive; - Kafka's topic limit is tens of thousands per broker. Ten extra topics won't matter.
The cost is one extra forward in the router. Acceptable.
What the code shows
First, the router. A thin consumer on notification-events. Extracts Notification from the protobuf payload and forwards it byte-for-byte to the appropriate channel topic based on the channel field. No extra processing, no application logic.
out = append(out, &kgo.Record{
Topic: dest,
Key: r.Key,
Value: r.Value,
Headers: appendRouterHeaders(r.Headers, o.NodeID, n.GetChannel().String()),
})destinationFor is a switch on the channel enum, nothing fancy. We append router-node and channel headers for tracing. Then ProduceSync with the accumulated batch, then CommitRecords on the input. The same at-least-once guarantee as in outbox: there's a window for duplicates between produce and commit, caught by dedup in the sender.
If a record has channel=CHANNEL_UNSPECIFIED (proto3 default from a broken producer) or an enum the router doesn't know, it goes to notification-events-dlq — a separate router-DLQ topic. Silently dropping it is not an option: with proto3, forgetting to set the field is easy, so we separate "routed correctly" from "no destination found" at the topic level, not by silence.
The key point here: no domain logic appeared. The router is dumb and fast. All the complexity lives in the senders.
Sender — one per channel
Each channel sender is a single process with thin scaffolding (three channel variants, one sender each). It subscribes to its own main topic and two retry stages:
stages := []Stage{
{Topic: *mainTopic, Delay: 0, NextTopic: *retry30Topic},
{Topic: *retry30Topic, Delay: *delay30, NextTopic: *retry5mTopic},
{Topic: *retry5mTopic, Delay: *delay5m, NextTopic: ""},
}One consumer group across all three topics. On retry stages, before processing we wait until record.Timestamp + stage.Delay has elapsed — a record written to retry-30s at 12:00:00 waits until 12:00:30. This blocks the poll loop, but intentionally: the pipeline should be transparent. Production does it more elegantly (a separate goroutine per retry topic, or PauseFetchPartitions — the latter covered in Delivery to external systems).
if st.Delay > 0 {
if err := waitUntilDue(ctx, r.Timestamp, st.Delay); err != nil {
return err
}
}Then — a delivery attempt. Under the protection of the circuit breaker:
result, err := s.cb.Execute(func() (deliveryResult, error) {
return s.deliverWithRetries(ctx, &n)
})Inside deliverWithRetries — standard backoff with jitter, up to MaxAttempts times within a single stage. The CB watches from outside: if N consecutive Execute() calls return an error, it transitions to Open and cuts further calls. Half of Delivery to external systems appears here unchanged — the pattern is universal.
Where a record goes after failure
Three outcomes:
- Success — write
notifications_log(status='delivered', ...)plusprocessed_events(consumer, notification_id)in a single transaction. Commit the offset in Kafka. - Permanent error (4xx except 408/429, or invalid protobuf) — immediately to DLQ, skipping retry. Commit (safe to do so).
- Transient error (5xx, 408, 429, network, timeout) and retries exhausted — forward to the stage's
nextTopic. Commit.
target := st.NextTopic
reason := "next-retry"
if permanent {
target = s.opts.DLQTopic
reason = "permanent"
} else if target == "" {
target = s.opts.DLQTopic
reason = "exhausted"
}An empty NextTopic on the last stage signals "DLQ next". The forward headers carry: retry.count, error.class, error.message, original.topic, previous.topic, forward.reason. In the DLQ you can see the full route — what failed, where it failed, how many times.
Dedup and effective-exactly-once
Between cb.Execute() (which may contain multiple HTTP attempts) and the write to Postgres there is a window. If the process crashes after a successful delivery but before committing the offset — on restart the sender reads the same message again. The receiver has already seen the notification by Idempotency-Key (the notification_id) — it won't duplicate. But our notifications_log table could:
err = pgx.BeginFunc(ctx, pool, func(tx pgx.Tx) error {
consumer := string(s.opts.Channel) + "-sender"
tag, err := tx.Exec(ctx, dedupSQL, consumer, n.GetId())
if err != nil { return fmt.Errorf("dedup: %w", err) }
if tag.RowsAffected() == 0 {
return nil // already processed — exit without INSERT into notifications_log
}
_, err = tx.Exec(ctx, insertHistorySQL, ...)
return err
})The gate and the business insert in one transaction — otherwise a crash between them would leave the gate saying "already processed" while the log never appeared. The same technique as in Microservice communication.
HMAC and external idempotency
Every HTTP request carries two key headers:
req.Header.Set("Idempotency-Key", n.GetId())
req.Header.Set("X-Signature", sig) // hex(HMAC-SHA256(secret, body))Idempotency-Key is the notification_id. The same id across all retries of one notification tells the receiver on the Firebase / APNs / webhook side: "I've seen this, not sending the customer a second push about the same order". In the mocks we only log it; in production there's real receiver logic.
X-Signature is HMAC-SHA256 of the body using a shared secret. The receiver verifies it — no one from the same IP can inject unauthorized pushes.
DLQ as a separate consumer
The DLQ topic is a terminal. It receives records that exhausted all stages without delivery, or those that arrived as permanent. The sender does not write to the DLQ itself — that's a separate process running in -mode=dlq mode:
case "dlq":
err := RunDLQ(ctx, DLQOpts{
NodeID: *nodeID,
Channel: d.Channel,
DLQTopic: *dlqTopic,
Group: *dlqGroup,
DSN: dsn,
FromStart: *fromStart,
})This process writes notifications_log(status='dlq', last_error=..., attempts=...) — a history entry indicating delivery failed. In a real system you'd also add a Slack alert, a Prometheus metric, an admin page for manual replay, and a thread in the on-call channel.
Mock services
Three stdlib-only HTTP handlers. Same code, different port and name:
cmd/mock-firebase/main.goon :8091cmd/mock-apns/main.goon :8092cmd/mock-webhook/main.goon :8093
Each accepts POST /send. Based on FAIL_RATE_503 it returns 503, based on FAIL_RATE_TIMEOUT it hangs for N seconds, /health handles health checks, everything else returns 200. This is the pattern from Delivery to external systems replicated across three channels.
case dice < fail503:
w.Header().Set("Retry-After", "1")
http.Error(w, ..., http.StatusServiceUnavailable)
case dice < fail503+failTimeout:
select {
case <-time.After(time.Duration(timeoutHangSec) * time.Second):
w.WriteHeader(http.StatusGatewayTimeout)
case <-r.Context().Done():
return
}
default:
w.WriteHeader(http.StatusOK)Each mock is a standalone Go module, built with a separate Dockerfile via go mod init on the fly. This is intentional: so the images don't pull half the course workspace.
For the integration test it's different. The mocks spin up directly inside the test via httptest.NewServer with an import of internal/mockserver. No Docker, on free ports. Same handler template, different environment.
Running manually
The sandbox from the root docker-compose.yml is already running. Then:
make up # Postgres :15441 + three mocks in docker
make db-init # notifications_log + processed_events tables
make topic-create-all # 14 topics, P=6 RF=3
make run-router & # forwards by channel
make run-firebase-sender &
make run-firebase-dlq & # separate — writes notifications_log status='dlq'
# same for apns and webhook (run-apns-sender / run-apns-dlq / run-webhook-sender / run-webhook-dlq)
make seed # 100 notifications into notification-events
make db-history # see breakdown by channel/statusUnder normal conditions everything lands in delivered. To observe retry and DLQ:
make chaos-fail-50 # restart mocks with FAIL_RATE_503=0.5
make seed
make db-history # rows with status='dlq' will appear (if retries didn't help)
make mock-stats # see how many 503s the mocks actually returnedThe sender's default parameters are delay-30s=30s, delay-5m=5m. For an interactive demo that's too slow: the sender in retry-30s waits half a minute before retrying. Use the -delay-30s and -delay-5m flags — set them to, say, 2s and 5s to watch the pipeline turn in real time.
Integration test
The most interesting part. File test/integration_test.go under the integration build tag. Run via make test-integration; requires Kafka and Postgres to be running.
What it does:
- Starts three
httptest.Serverinstances withFAIL_RATE_503=0.7(aggressive — so at least some records exhaust retries and land in DLQ). - Starts the router, one sender per channel in
delivermode and one indlqmode — all seven as goroutines inside the test. - Sends 200 notifications to
notification-eventsround-robin across channels. - Waits until
notifications_log.delivered + notifications_log.dlq == 200. That is the "pipeline completed" criterion. - If
dlq > 0— switches mocks toFAIL_RATE_503=0, waits for stabilization, then re-reads the DLQ topics and publishes records back to main with a newnotification_id(replay-*). That is the DLQ replay. - Checks that after replay
deliveredincreased by at least half of the replayed records. - Stops all nodes, checks that none crashed with an unhandled error.
if lastSnap.dlq > 0 {
fbCfg.set(0.0, 0.0, 5) // liveMockHandler reads atomically,
apnsCfg.set(0.0, 0.0, 5) // the change is visible on the next request
whCfg.set(0.0, 0.0, 5)
replayed, err := replayDLQ(root, bootstrap)
threshold := baseline.delivered + replayed/2
// wait until delivered ≥ threshold
}There's a nuance here. mockserver.Handler(cfg, stats) freezes cfg in a closure, so the test wraps it in liveMockHandler: it holds a mockConfig with atomic.Value fields and reads them on every request. A fail-rate switch via cfg.set(...) takes effect on the next incoming request, no handler swap needed — http.Server.Handler is a plain field with no atomic guarantees, and a race with in-flight requests would be caught by go test -race.
200 notifications instead of the original 5000 — for speed on a dev machine. The logic is the same; the test runs in 12–15 seconds. For real load, change the totalNotifications constant.
Where this fits in the course
The use case brings together:
- Retry and DLQ deep dive — retry topics with delay and DLQ as a terminal
- Delivery to external systems — circuit breaker, HMAC, exponential backoff with jitter, mock-webhook pattern
- Protobuf in Go — Protobuf as the wire format
- Microservice communication — at-least-once + dedup on the consumer via
processed_events
What is deliberately absent:
- Schema Registry. Here it's byte-for-byte Protobuf without
magic byte+schema_id. A separate lecture (Schema Registry) shows how to add SR — the pattern is orthogonal and plugs into this use case without logic changes. - Outbox pattern. The outgoing message to
notification-eventsis written directly — we don't model the write side with a database. If we did, we'd add anoutboxtable and publisher, but that would extend the example without teaching anything new. - gRPC API. This use case is about async delivery. The gRPC front is shown in Microservice communication and Hybrid gRPC + Kafka.
- Real APNs / Firebase credentials. Cert-flow for APNs and FCM HTTP v1 token-exchange are a separate story, out of scope for this course. Channel architecture is demonstrated on mocks.
Things to try
Once the pipeline is running, there are several good experiments:
- run
make chaos-fail-50with aggressiveFAIL_RATE_503=0.9and observe how the sender's CB throttles during a prolonged failure. Logs showCB ...: closed → open. - stop
firebase-sendermid-load. After restart the sender resumes from the same offset — no records lost. Postgres has no duplicates — the dedup gate holds. - run two senders for the same channel with different
-node-id. They join one consumer group and split the partitions in half. Scale — no code changes. - manually merge retry-30s and retry-5m into one topic with a 1-minute delay and observe the behavior change. Hint: in
Stage{}this changes in one place.
Files
.
├── README.md # this file
├── Makefile # all commands
├── docker-compose.override.yml # Postgres :15441 + 3 mocks
├── db/init.sql # notifications_log + processed_events
├── proto/notifications/v1/ # Notification + Channel enum
├── gen/ # generated Go code
├── buf.yaml / buf.gen.yaml # buf config
├── cmd/
│ ├── notification-router/ # consumer on notification-events → channels
│ ├── firebase-sender/ # thin wrapper over sender.Main
│ ├── apns-sender/
│ ├── webhook-sender/
│ ├── mock-firebase/ # HTTP mock + Dockerfile, stdlib-only
│ ├── mock-apns/
│ ├── mock-webhook/
│ └── seed-tool/ # make seed
├── internal/
│ ├── router/router.go # router logic
│ ├── sender/sender.go # retry + CB + HMAC + DB (shared code for all channels)
│ ├── sender/cmdmain.go # CmdDefaults + flags for cmd wrappers
│ └── mockserver/server.go # handler factory for the test
└── test/integration_test.go # end-to-end test with DLQ replay