diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go index 56f6e67eb..dcf6d1650 100644 --- a/event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector.go @@ -403,7 +403,11 @@ func (e *WebBrokerApiReceiver) inboundLoop(ctx context.Context, conn *brokerApiC select { case <-ctx.Done(): return - case msg := <-conn.inbound: + case msg, ok := <-conn.inbound: + if !ok || msg == nil { + return + } + // Apply channel-specific on_produce policies. slog.Debug("[5] Applying channel onProduce policies", "connID", conn.connID, @@ -462,7 +466,11 @@ func (e *WebBrokerApiReceiver) outboundLoop(ctx context.Context, conn *brokerApi select { case <-ctx.Done(): return - case msg := <-conn.outbound: + case msg, ok := <-conn.outbound: + if !ok || msg == nil { + return + } + slog.Debug("[7] Applying channel onConsume policies", "connID", conn.connID, "api", e.channel.Name, diff --git a/event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector_test.go b/event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector_test.go new file mode 100644 index 000000000..12f0a9304 --- /dev/null +++ b/event-gateway/gateway-runtime/internal/connectors/receiver/websocket/broker_api_connector_test.go @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com). + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package websocket + +import ( + "context" + "testing" + "time" + + "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors" +) + +func TestInboundLoopReturnsWhenInboundChannelClosed(t *testing.T) { + conn := &brokerApiConnection{ + connID: "test-connection", + inbound: make(chan *connectors.Message), + channelName: "orders", + } + close(conn.inbound) + + assertLoopReturnsWithoutPanic(t, func() { + (&WebBrokerApiReceiver{}).inboundLoop(context.Background(), conn) + }) +} + +func TestOutboundLoopReturnsWhenOutboundChannelClosed(t *testing.T) { + conn := &brokerApiConnection{ + connID: "test-connection", + outbound: make(chan *connectors.Message), + channelName: "orders", + } + close(conn.outbound) + + assertLoopReturnsWithoutPanic(t, func() { + (&WebBrokerApiReceiver{}).outboundLoop(context.Background(), conn) + }) +} + +func assertLoopReturnsWithoutPanic(t *testing.T, run func()) { + t.Helper() + + done := make(chan any, 1) + go func() { + defer func() { + done <- recover() + }() + run() + }() + + select { + case recovered := <-done: + if recovered != nil { + t.Fatalf("loop panicked after channel close: %v", recovered) + } + case <-time.After(time.Second): + t.Fatal("loop did not return after channel close") + } +}