diff --git a/docs/components/transport-types/websocket-transport.md b/docs/components/transport-types/websocket-transport.md index 1cb964a6..28b9c594 100644 --- a/docs/components/transport-types/websocket-transport.md +++ b/docs/components/transport-types/websocket-transport.md @@ -142,7 +142,32 @@ builders: { } ``` -When a feed is added or removed, only the delta (`subscriptions.new` or `subscriptions.stale`) is passed to the batch builder, so each cycle may produce a message covering one or more feeds depending on what changed. +When a feed is added or removed, only the delta (`subscriptions.new` or `subscriptions.stale`) is passed to the batch builder by default, so each cycle may produce a message covering one or more feeds depending on what changed. + +#### Snapshot batch subscribe + +Some providers require the full desired subscription list on every subscribe message (not just new feeds). Set `options.batchSubscribeMode` to `'snapshot'`: + +```typescript +builders: { + options: { batchSubscribeMode: 'snapshot' }, + batchSubscribeMessage: (desired) => ({ + action: 'subscribe', + pairs: desired.map((p) => `${p.base}/${p.quote}`), + }), + batchUnsubscribeMessage: (params) => ({ + action: 'unsubscribe', + pairs: [`${params.base}/${params.quote}`], + }), +}, +``` + +| Mode | `batchSubscribeMessage` receives | Sent when | +| ----------------- | -------------------------------- | ------------------------------ | +| `delta` (default) | `subscriptions.new` | `subscriptions.new.length > 0` | +| `snapshot` | `subscriptions.desired` | `subscriptions.new.length > 0` | + +Unsubscribe messages are always sent for `subscriptions.stale` only. When both new and stale feeds exist in the same cycle, unsubscribes are sent before subscribes. ### Heartbeat messages diff --git a/src/transports/index.ts b/src/transports/index.ts index ffb8a6e4..98a52b86 100644 --- a/src/transports/index.ts +++ b/src/transports/index.ts @@ -118,10 +118,10 @@ export class TransportRoutes { private map: Record> = {} register(name: string, transport: Transport) { - // This is intentional, to keep names to one word only - if (name !== DEFAULT_TRANSPORT_NAME && !/^[a-z]+$/.test(name)) { + // This is intentional, to keep names to one word only + numbers + if (name !== DEFAULT_TRANSPORT_NAME && !/^[a-z0-9]+$/.test(name)) { throw new Error( - `Transport name "${name}" is invalid. Names in the AdapterEndpoint transports map can only include lowercase letters.`, + `Transport name "${name}" is invalid. Names in the AdapterEndpoint transports map can only include lowercase letters and numbers.`, ) } if (this.map[name]) { diff --git a/src/transports/websocket.ts b/src/transports/websocket.ts index c007f5c6..0127c7db 100644 --- a/src/transports/websocket.ts +++ b/src/transports/websocket.ts @@ -69,11 +69,22 @@ type WebSocketIndividualBuilders = { ) => unknown } +/** Selector type for batch subscription mode */ +export type BatchSubscribeMode = 'delta' | 'snapshot' + +export type WebSocketBatchBuilderOptions = { + /** + * `delta` (default): `batchSubscribeMessage` receives `subscriptions.new`. + * `snapshot`: when `subscriptions.new` is non-empty, receives `subscriptions.desired`. + */ + batchSubscribeMode?: BatchSubscribeMode +} + type WebSocketBatchBuilders = { /** * Builds a single WS message that will be sent to subscribe to multiple feeds at once * - * @param params - the list of adapter requests to subscribe to in this batch + * @param params - the list of adapter requests to subscribe to in this batch, dependent on WebSocketBatchBuilderOptions.batchSubscribeMode * @param context - the background context for the Adapter * @returns the WS message (can be any type as long as the [[WebSocket]] doesn't complain) */ @@ -93,6 +104,11 @@ type WebSocketBatchBuilders = { params: TypeFromDefinition[], context: EndpointContext, ) => unknown + + /** + * Optional parameters used when building batch subscribe/unsubscribe messages + */ + options?: WebSocketBatchBuilderOptions } /** @@ -420,11 +436,59 @@ export class WebSocketTransport< return connection } + private async sendSubscriptionMessages( + context: EndpointContext, + subscriptions: SubscriptionDeltas>, + ): Promise { + logger.debug('Connection is open, sending subs/unsubs if there are any') + const builders = this.config.builders + if (builders) { + if ('batchSubscribeMessage' in builders) { + const { batchSubscribeMessage, batchUnsubscribeMessage, options } = builders + const mode = options?.batchSubscribeMode ?? 'delta' + const batchSubscriptionInputs = mode === 'delta' ? subscriptions.new : subscriptions.desired + await this.sendMessages( + context, + // Avoid sending subscribe/unsubscribe messages if nothing new to send + batchSubscribeMessage && subscriptions.new.length > 0 + ? [batchSubscribeMessage(batchSubscriptionInputs, context)].filter( + (m) => m !== undefined, + ) + : subscriptions.new, + batchUnsubscribeMessage && subscriptions.stale.length > 0 + ? [batchUnsubscribeMessage(subscriptions.stale, context)].filter((m) => m !== undefined) + : subscriptions.stale, + ) + } else { + const { subscribeMessage, unsubscribeMessage } = builders as WebSocketIndividualBuilders + await this.sendMessages( + context, + subscribeMessage + ? subscriptions.new + .map((sub) => subscribeMessage(sub, context)) + .filter((m) => m !== undefined) + : subscriptions.new, + unsubscribeMessage + ? subscriptions.stale + .map((sub) => unsubscribeMessage(sub, context)) + .filter((m) => m !== undefined) + : subscriptions.stale, + ) + } + recordWsMessageSentMetrics(context, subscriptions.new, subscriptions.stale) + } else { + logger.trace( + "This ws transport has no builders configured, so we're not sending any messages", + ) + } + } + async sendMessages(context: EndpointContext, subscribes: unknown[], unsubscribes: unknown[]) { const serializedSubscribes = subscribes.map(this.serializeMessage) const serializedUnsubscribes = unsubscribes.map(this.serializeMessage) - const messages = serializedSubscribes.concat(serializedUnsubscribes) + // Order unsubscribes before subscribes in case of overlap race condition + const messages = serializedUnsubscribes.concat(serializedSubscribes) if (messages.length > 0) { logger.debug(`There are ${messages.length} messages to send`) @@ -557,45 +621,7 @@ export class WebSocketTransport< // Otherwise we could encounter the case where we just closed the connection because there's no desired ones, // but without this check we'd attempt to send out all the unsubscribe messages if (!this.connectionClosed()) { - logger.debug('Connection is open, sending subs/unsubs if there are any') - const builders = this.config.builders - if (builders) { - if ('batchSubscribeMessage' in builders) { - const { batchSubscribeMessage, batchUnsubscribeMessage } = builders - await this.sendMessages( - context, - batchSubscribeMessage - ? [batchSubscribeMessage(subscriptions.new, context)].filter((m) => m !== undefined) - : subscriptions.new, - batchUnsubscribeMessage - ? [batchUnsubscribeMessage(subscriptions.stale, context)].filter( - (m) => m !== undefined, - ) - : subscriptions.stale, - ) - } else { - const { subscribeMessage, unsubscribeMessage } = - builders as WebSocketIndividualBuilders - await this.sendMessages( - context, - subscribeMessage - ? subscriptions.new - .map((sub) => subscribeMessage(sub, context)) - .filter((m) => m !== undefined) - : subscriptions.new, - unsubscribeMessage - ? subscriptions.stale - .map((sub) => unsubscribeMessage(sub, context)) - .filter((m) => m !== undefined) - : subscriptions.stale, - ) - } - recordWsMessageSentMetrics(context, subscriptions.new, subscriptions.stale) - } else { - logger.trace( - "This ws transport has no builders configured, so we're not sending any messages", - ) - } + await this.sendSubscriptionMessages(context, subscriptions) } // Record WS subscription metrics diff --git a/test/transports/websocket.test.ts b/test/transports/websocket.test.ts index 364d401d..e347fa41 100644 --- a/test/transports/websocket.test.ts +++ b/test/transports/websocket.test.ts @@ -7,6 +7,7 @@ import { Adapter, AdapterEndpoint, EndpointContext } from '../../src/adapter' import { AdapterConfig, EmptyCustomSettings } from '../../src/config' import { metrics as eaMetrics } from '../../src/metrics' import { + BatchSubscribeMode, WebSocketClassProvider, WebsocketReverseMappingTransport, WebSocketTransport, @@ -1946,7 +1947,7 @@ test.serial( }, ) -const createBatchAdapter = (): Adapter => { +const createBatchAdapter = (batchSubscribeMode?: BatchSubscribeMode): Adapter => { const websocketTransport = new WebSocketTransport({ url: () => ENDPOINT_URL, handlers: { @@ -1972,6 +1973,7 @@ const createBatchAdapter = (): Adapter => { }, }, builders: { + ...(batchSubscribeMode ? { options: { batchSubscribeMode } } : {}), batchSubscribeMessage: (params) => ({ request: 'subscribe', pairs: params.map((p) => `${p.base}/${p.quote}`), @@ -2007,7 +2009,7 @@ const createBatchAdapter = (): Adapter => { }) } -test.serial('batch builders send one subscribe message for multiple feeds', async (t) => { +test.serial('batch builders (delta) send one subscribe message for multiple feeds', async (t) => { mockWebSocketProvider(WebSocketClassProvider) const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) const subscribeMessages: Array<{ request: string; pairs: string[] }> = [] @@ -2056,7 +2058,7 @@ test.serial('batch builders send one subscribe message for multiple feeds', asyn t.deepEqual(subscribeMessages[1].pairs, ['BTC/USD']) }) -test.serial('batch builders send one unsubscribe message when feeds go stale', async (t) => { +test.serial('batch builders (delta) send unsubscribe when feeds go stale', async (t) => { mockWebSocketProvider(WebSocketClassProvider) const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) @@ -2096,3 +2098,57 @@ test.serial('batch builders send one unsubscribe message when feeds go stale', a mockWsServer.close() await t.context.clock.runToLastAsync() }) + +test.serial( + 'batch builders (snapshot) pass subscriptions.desired on incremental add', + async (t) => { + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) + const subscribeMessages: Array<{ request: string; pairs: string[] }> = [] + + mockWsServer.on('connection', (socket) => { + socket.on('message', (rawMsg) => { + const parsed = JSON.parse(rawMsg.toString()) + if (parsed.request === 'subscribe') { + subscribeMessages.push(parsed) + for (const pair of parsed.pairs) { + socket.send(JSON.stringify({ pair, value: price })) + } + } + }) + }) + + const testAdapter = await TestAdapter.startWithMockedCache( + createBatchAdapter('snapshot'), + t.context, + ) + + await testAdapter.startBackgroundExecuteThenGetResponse(t, { + requestData: { base: 'ETH', quote: 'USD' }, + expectedCacheSize: 1, + expectedResponse: { + data: { result: price }, + result: price, + statusCode: 200, + }, + }) + + await testAdapter.startBackgroundExecuteThenGetResponse(t, { + requestData: { base: 'BTC', quote: 'USD' }, + expectedCacheSize: 2, + expectedResponse: { + data: { result: price }, + result: price, + statusCode: 200, + }, + }) + + testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() + + t.is(subscribeMessages.length, 2) + t.deepEqual(subscribeMessages[0].pairs, ['ETH/USD']) + t.deepEqual(subscribeMessages[1].pairs, ['ETH/USD', 'BTC/USD']) + }, +)