Skip to content

Handle WebSub delta updates without full undeploy/redeploy#1935

Open
senthuran16 wants to merge 18 commits into
mainfrom
websub-kafka-improvements-undeploy-redeploy
Open

Handle WebSub delta updates without full undeploy/redeploy#1935
senthuran16 wants to merge 18 commits into
mainfrom
websub-kafka-improvements-undeploy-redeploy

Conversation

@senthuran16

@senthuran16 senthuran16 commented May 11, 2026

Copy link
Copy Markdown
Member

Purpose

WebSub binding updates were going through full remove-and-readd flows, which caused unnecessary undeploy behavior, dropped live receiver state, and made channel-only
updates heavier than needed. Resolves N/A.

Goals

  • apply WebSub channel-only updates in place
  • keep subscription sync topics and live receiver state intact during delta updates
  • fall back to full rebind only for structural changes

Approach

  • added a WebSub receiver delta-update path that can add/remove channels without recreating the whole binding
  • updated runtime WebSub binding handling to diff channel topics, update hub chains, ensure new topics, and delete removed channel topics while preserving the internal
    subscription topic
  • changed the xDS handler to call UpdateWebSubApiBinding(...) instead of remove-then-add for changed bindings
  • added controller/service-side undeploy handling improvements included in this branch’s diff

User stories

WebSub API updates with channel-only changes are applied without unnecessary undeploys, while real structural changes still rebind safely.

Documentation

N/A - runtime/controller behavior fix only; no product documentation update included in this PR.

Automation tests

  • Unit tests

    Branch includes focused handler/runtime/service test coverage for delta update and undeploy paths

  • Integration tests

    Not run in this turn

Security checks

Samples

N/A

Related PRs

@coderabbitai

coderabbitai Bot commented May 11, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

📝 Walkthrough

Walkthrough

This pull request implements in-place update support for WebSub API bindings across both the event gateway runtime and the gateway controller. On the runtime side, WebSubReceiver gains a shared mutex to coordinate channel-map access across handlers, and a new ApplyBindingDelta method enables selective channel addition and removal without recreating the receiver. The runtime layer introduces UpdateWebSubApiBinding to conditionally apply delta updates when the receiver and broker-driver are equivalent, or fall back to remove-and-readd otherwise. The xDS layer is extended with an UpdateWebSubApiBinding method on the BindingManager interface, routing changed bindings through the update path. On the controller side, a new WebSubAPIService handles parsing, validation, conflict checking, storage persistence, event publishing, and optional on-prem synchronization for WebSub API updates. The service is integrated into APIServer and the websub_api_handler now routes update requests through this dedicated service instead of the deployment path.

Sequence Diagram

sequenceDiagram
  participant Client as API Client
  participant Handler as websub_api_handler
  participant Service as WebSubAPIService
  participant Runtime as Runtime
  participant Receiver as WebSubReceiver
  participant Storage as Storage
  participant EventHub as EventHub
  
  Client->>Handler: UpdateWebSubAPI (updated config)
  Handler->>Service: Update(params with body/handle)
  Service->>Service: Parse & validate config
  Service->>Storage: Load existing config by handle
  Service->>Storage: Check artifact conflicts
  Service->>Storage: Persist updated config
  Service->>EventHub: Publish UPDATE event
  par Runtime Update (if connected)
    Service->>Runtime: UpdateWebSubApiBinding(old, new)
    Runtime->>Runtime: Determine if delta possible
    alt Delta update eligible
      Runtime->>Receiver: ApplyBindingDelta(removed, added)
      Receiver->>Receiver: Publish tombstones for removed
      Receiver->>Receiver: Update channels map with new topics
    else Full update required
      Runtime->>Runtime: Remove old binding, add new binding
    end
  end
  Service-->>Handler: UpdateResult with Config
  Handler-->>Client: HTTP 200 with updated config
Loading
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 41.18% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title directly and accurately summarizes the main feature: WebSub delta updates that avoid full undeploy/redeploy cycles, which aligns with the extensive changes across receiver, runtime, xDS handler, and controller layers.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The pull request description covers all major sections of the template with substantial detail: purpose, goals, approach, user stories, documentation, automation tests, security checks, and related PRs are all addressed.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch websub-kafka-improvements-undeploy-redeploy

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (5)
event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go (2)

29-29: 💤 Low value

Document concurrency expectations for ApplyBindingDelta.

This method mutates e.channel.Channels under channelMu, interacts with e.store, e.consumerMgr, e.topics, and e.syncProducer. Adding a short doc comment noting that the caller (Runtime.UpdateWebSubApiBinding) must serialize concurrent delta applications for the same receiver — and that handler reads coordinate via channelMu — would help future maintainers reason about the locking boundary.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go`
at line 29, Add a short doc comment to ApplyBindingDelta explaining concurrency
expectations: state that callers (specifically Runtime.UpdateWebSubApiBinding)
must serialize concurrent delta applications for the same WebSubReceiver, that
ApplyBindingDelta mutates e.channel.Channels and must hold channelMu while doing
so, and that other components it touches (e.store, e.consumerMgr, e.topics,
e.syncProducer) are accessed within this boundary so handler readers should
coordinate via channelMu; reference the method name ApplyBindingDelta, the
channel field e.channel.Channels, and the mutex channelMu in the comment so
maintainers know the locking contract.

30-72: 💤 Low value

Redundant GetByTopic lookups and per-iteration locking.

Two minor efficiency points worth considering:

  • e.store.GetByTopic(channelName) is called once at line 31 and again at line 44 for the same channel. Either combine the two removal passes (preserving ordering: publish tombstones → tear down consumers → remove from store → delete from channel map) or cache the slice.
  • The add loop acquires and releases channelMu per channel (lines 68-70). Wrapping the whole loop in a single Lock/Unlock is fine here because the inner work is just a map assignment, and would reduce lock churn under bulk updates.

Neither is functionally incorrect; flagging as optional cleanup.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go`
around lines 30 - 72, Cache the subscriptions slice returned by
e.store.GetByTopic(channelName) instead of calling it twice: for each channel in
removedChannels call e.store.GetByTopic once, iterate that cached slice to first
publish tombstones via e.syncProducer.PublishTombstone, then tear down consumers
with e.consumerMgr.RemoveSubscription and remove entries from the store with
e.store.Remove, and finally delete the channel from e.channel.Channels under
e.channelMu. Also reduce lock churn in the addedChannels loop by acquiring
e.channelMu once around the entire iteration when setting
e.channel.Channels[channelName] and call e.topics.Register(channelName) outside
the lock for each channel.
event-gateway/gateway-runtime/internal/xdsclient/handler_test.go (1)

167-170: 💤 Low value

Consider renaming new field to avoid shadowing the builtin.

new is a Go predeclared identifier. Renaming to newWSB (mirroring the interface parameter names) would slightly improve readability and avoid the shadowing.

♻️ Suggested rename
 type updatedBindingCall struct {
-	old binding.WebSubApiBinding
-	new binding.WebSubApiBinding
+	old    binding.WebSubApiBinding
+	newWSB binding.WebSubApiBinding
 }

Call sites at lines 147 and 183 would need the corresponding update.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@event-gateway/gateway-runtime/internal/xdsclient/handler_test.go` around
lines 167 - 170, The struct updatedBindingCall defines fields old and new where
new shadows the predeclared identifier; rename the field new to newWSB (to
mirror the WebSubApiBinding parameter naming) in the updatedBindingCall type and
update all usages — e.g., places constructing or accessing updatedBindingCall
(such as where updatedBindingCall{...} is created and checked in tests like the
calls around the existing assertions) and any comparisons or field references —
to use .newWSB instead of .new so the code no longer uses the predeclared
identifier.
gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go (1)

191-221: ⚡ Quick win

Update path: consider surfacing validation/parse details to the client.

For websubapi.ValidationError, Error() only reports a count (e.g. configuration validation failed (3 errors)), so the 400 fallback response body won't tell callers which fields failed. Same applies to ParseError, whose message embeds the underlying cause but not field paths. Consider mapping *websubapi.ValidationError and *websubapi.ParseError (and *websubapi.HandleMismatchError) explicitly, similar to how ErrNotFound is handled, so the response payload is actionable.

Suggested mapping
 	if err != nil {
 		log.Error("Failed to update WebSub API configuration", slog.Any("error", err))
 		if errors.Is(err, websubapi.ErrNotFound) {
 			c.JSON(http.StatusNotFound, api.ErrorResponse{
 				Status:  "error",
 				Message: fmt.Sprintf("WebSub API configuration with handle '%s' not found", handle),
 			})
 			return
 		} else if storage.IsConflictError(err) {
 			c.JSON(http.StatusConflict, api.ErrorResponse{
 				Status:  "error",
 				Message: err.Error(),
 			})
 			return
 		}
+		var validationErr *websubapi.ValidationError
+		if errors.As(err, &validationErr) {
+			// Build a payload that includes the individual validation errors.
+			c.JSON(http.StatusBadRequest, api.ErrorResponse{
+				Status:  "error",
+				Message: fmt.Sprintf("Validation failed: %v", validationErr.Errors),
+			})
+			return
+		}
 		if mapRenderError(c, "update", err) {
 			return
 		}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go` around
lines 191 - 221, The Update handler currently falls back to a generic 400 body
that hides field-level parse/validation details; modify the error handling after
webSubAPIService.Update (the block handling err) to explicitly map error types
websubapi.ValidationError, websubapi.ParseError and
websubapi.HandleMismatchError (use errors.As or type assertions) and return a
structured 400 JSON containing the underlying details (validation error list,
parse cause/path, or handle mismatch info) instead of only err.Error(); keep
existing ErrNotFound and conflict handling, and ensure these specific cases
produce an actionable response payload that clients can use to surface field
paths and messages.
gateway/gateway-controller/pkg/service/websubapi/service.go (1)

77-91: 💤 Low value

Constructor: consider returning an error instead of panicking on misconfiguration.

Constructor-time panics are acceptable as fail-fast at startup, and NewAPIServer already enforces these same invariants (non-nil db, eventHub, systemConfig, non-empty gateway ID), so the panics here are effectively unreachable in normal startup. They do, however, make the service awkward to use from tests or future callers that may legitimately want a constructor error. Either keep panics with a brief note that this is startup-only, or convert to error returns for symmetry with idiomatic Go service constructors.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@gateway/gateway-controller/pkg/service/websubapi/service.go` around lines 77
- 91, The constructor currently panics on invalid inputs (db, deploymentService,
eventHub, systemConfig, empty GatewayID); change the constructor (e.g.,
NewWebSubAPIService) to return (*WebSubAPIService, error) instead of panicking,
replace each panic(...) with a descriptive error (fmt.Errorf or errors.New)
mentioning the offending parameter (db, deploymentService, eventHub,
systemConfig, GatewayID) and perform the same checks (including
strings.TrimSpace(systemConfig.Controller.Server.GatewayID) == "") returning
nil, err on failure; update callers (including NewAPIServer and any tests) to
handle the returned error and propagate or fail fast as appropriate.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go`:
- Around line 30-39: ApplyBindingDelta currently aborts on the first
PublishTombstone error which can leave partial state; change the loop that
iterates removedChannels and calls e.syncProducer.PublishTombstone (and related
e.store.GetByTopic usage) to not return immediately on error: instead log the
error via the existing logger (or collect it) and continue processing remaining
subscriptions and channels so tombstoning is best-effort; after finishing the
tombstone loop proceed to tear down consumers and update e.channel.Channels and
register added channels, and if you choose aggregation return a combined error
at the end (or otherwise keep behavior consistent with other removal/addition
log-and-continue semantics).

In `@event-gateway/gateway-runtime/internal/runtime/runtime.go`:
- Around line 794-811: canDeltaUpdateWebSubBinding currently uses
reflect.DeepEqual on binding.WebSubApiBinding.BrokerDriver which can
false-negative because BrokerDriverSpec.Config is map[string]interface{} with
numeric types/order differences; change the comparison to normalize the
BrokerDriver.Config before comparing (e.g. for the BrokerDriver fields used in
the function, marshal BrokerDriver.Config to canonical JSON and compare the JSON
bytes or unmarshal into a typed struct if available), so in
canDeltaUpdateWebSubBinding replace the reflect.DeepEqual(oldWSB.BrokerDriver,
newWSB.BrokerDriver) check with a normalized comparison that first canonicalizes
BrokerDriver.Spec.Config (fallback to reflect.DeepEqual only if marshaling
fails) to avoid spurious deltas.
- Around line 949-999: The update runs side-effecting operations
(buildWebSubApiPolicyChains, brokerDriver.EnsureTopics,
updater.ApplyBindingDelta, brokerDriver.DeleteTopics) before revalidating the
snapshot under r.mu, leaving partial changes if the snapshot check fails; fix by
acquiring a per-binding update mutex for the target binding at the start of the
update and hold it across all side-effects and the final snapshot revalidation
(introduce a map[string]*sync.Mutex or sync.Mutex per binding and lock/unlock
around the sequence that includes buildWebSubApiPolicyChains, EnsureTopics,
ApplyBindingDelta, DeleteTopics and the snapshot checks that reference
r.activeReceivers/activeBrokerDrivers/hub.GetBinding), or alternatively move the
snapshot revalidation to before performing those side-effects; ensure any early
returns release the per-binding lock so concurrent updates are serialized for
that binding.
- Line 968: Replace the use of context.Background() when calling
updater.ApplyBindingDelta with the runtime's shutdown-aware context r.runCtx and
ensure you read/capture r.runCtx while holding the same lock used to capture
receiver so the call is canceled if the runtime shuts down; locate the call to
updater.ApplyBindingDelta (and the surrounding lock that captures receiver) and
change its context argument to the captured r.runCtx (captured under the lock)
so broker-driver work (tombstone publishes, consumer removals) is tied to
runtime shutdown.
- Around line 737-792: The webSubActiveChainKeys and
unregisterStaleBindingChains functions are missing UNSUBSCRIBE keys; update
webSubActiveChainKeys to also set active[binding.GenerateRouteKey("UNSUBSCRIBE",
hubPath, vhost)] when hub-level unsubscribe policies exist and similarly add
GenerateRouteKey("UNSUBSCRIBE", chPath, vhost) for each channel when
ch.Policies.Unsubscribe is non-empty, and update unregisterStaleBindingChains to
include b.UnsubscribeChainKey in the top-level keys slice and
chKeys.UnsubscribeChainKey in the per-channel channelKeys slice so
UnsubscribeChainKey values are compared against activeKeys and unregistered when
stale.
- Around line 950-953: The call to buildWebSubApiPolicyChains in runtime.go
currently unpacks only five values and omits the unsubscribe chain key, causing
a compile error; update the unpacking to capture the unsubscribe key (e.g., add
unsubKey or unsubscribeKey alongside subKey, inKey, outKey, chChainKeys, err)
and then include that value when constructing the RegisterBinding payload by
setting the UnsubscribeChainKey field (mirror the pattern used in
AddWebSubApiBinding where all four chain keys are set). Ensure you use the exact
returned variable name from buildWebSubApiPolicyChains (and assign it to the
RegisterBinding.UnsubscribeChainKey) so the function compiles.

---

Nitpick comments:
In `@event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go`:
- Line 29: Add a short doc comment to ApplyBindingDelta explaining concurrency
expectations: state that callers (specifically Runtime.UpdateWebSubApiBinding)
must serialize concurrent delta applications for the same WebSubReceiver, that
ApplyBindingDelta mutates e.channel.Channels and must hold channelMu while doing
so, and that other components it touches (e.store, e.consumerMgr, e.topics,
e.syncProducer) are accessed within this boundary so handler readers should
coordinate via channelMu; reference the method name ApplyBindingDelta, the
channel field e.channel.Channels, and the mutex channelMu in the comment so
maintainers know the locking contract.
- Around line 30-72: Cache the subscriptions slice returned by
e.store.GetByTopic(channelName) instead of calling it twice: for each channel in
removedChannels call e.store.GetByTopic once, iterate that cached slice to first
publish tombstones via e.syncProducer.PublishTombstone, then tear down consumers
with e.consumerMgr.RemoveSubscription and remove entries from the store with
e.store.Remove, and finally delete the channel from e.channel.Channels under
e.channelMu. Also reduce lock churn in the addedChannels loop by acquiring
e.channelMu once around the entire iteration when setting
e.channel.Channels[channelName] and call e.topics.Register(channelName) outside
the lock for each channel.

In `@event-gateway/gateway-runtime/internal/xdsclient/handler_test.go`:
- Around line 167-170: The struct updatedBindingCall defines fields old and new
where new shadows the predeclared identifier; rename the field new to newWSB (to
mirror the WebSubApiBinding parameter naming) in the updatedBindingCall type and
update all usages — e.g., places constructing or accessing updatedBindingCall
(such as where updatedBindingCall{...} is created and checked in tests like the
calls around the existing assertions) and any comparisons or field references —
to use .newWSB instead of .new so the code no longer uses the predeclared
identifier.

In `@gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go`:
- Around line 191-221: The Update handler currently falls back to a generic 400
body that hides field-level parse/validation details; modify the error handling
after webSubAPIService.Update (the block handling err) to explicitly map error
types websubapi.ValidationError, websubapi.ParseError and
websubapi.HandleMismatchError (use errors.As or type assertions) and return a
structured 400 JSON containing the underlying details (validation error list,
parse cause/path, or handle mismatch info) instead of only err.Error(); keep
existing ErrNotFound and conflict handling, and ensure these specific cases
produce an actionable response payload that clients can use to surface field
paths and messages.

In `@gateway/gateway-controller/pkg/service/websubapi/service.go`:
- Around line 77-91: The constructor currently panics on invalid inputs (db,
deploymentService, eventHub, systemConfig, empty GatewayID); change the
constructor (e.g., NewWebSubAPIService) to return (*WebSubAPIService, error)
instead of panicking, replace each panic(...) with a descriptive error
(fmt.Errorf or errors.New) mentioning the offending parameter (db,
deploymentService, eventHub, systemConfig, GatewayID) and perform the same
checks (including strings.TrimSpace(systemConfig.Controller.Server.GatewayID) ==
"") returning nil, err on failure; update callers (including NewAPIServer and
any tests) to handle the returned error and propagate or fail fast as
appropriate.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bfb7026e-fb44-4fd0-afa2-55605fb120ad

📥 Commits

Reviewing files that changed from the base of the PR and between 7bcef00 and f96e37c.

📒 Files selected for processing (11)
  • event-gateway/gateway-runtime/internal/connectors/receiver/websub/connector.go
  • event-gateway/gateway-runtime/internal/connectors/receiver/websub/delta.go
  • event-gateway/gateway-runtime/internal/connectors/receiver/websub/handler.go
  • event-gateway/gateway-runtime/internal/runtime/runtime.go
  • event-gateway/gateway-runtime/internal/xdsclient/handler.go
  • event-gateway/gateway-runtime/internal/xdsclient/handler_test.go
  • gateway/gateway-controller/pkg/api/handlers/handlers.go
  • gateway/gateway-controller/pkg/api/handlers/handlers_test.go
  • gateway/gateway-controller/pkg/api/handlers/websub_api_handler.go
  • gateway/gateway-controller/pkg/service/websubapi/errors.go
  • gateway/gateway-controller/pkg/service/websubapi/service.go

Comment thread event-gateway/gateway-runtime/internal/runtime/runtime.go
Comment thread event-gateway/gateway-runtime/internal/runtime/runtime.go
Comment thread event-gateway/gateway-runtime/internal/runtime/runtime.go
Comment thread event-gateway/gateway-runtime/internal/runtime/runtime.go Outdated
Comment thread event-gateway/gateway-runtime/internal/runtime/runtime.go Outdated
@senthuran16

Copy link
Copy Markdown
Member Author

Holding merging this PR until the test suite is ready

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants