Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion docs/components/transport-types/websocket-transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,32 @@ builders: {
}
```

When a feed is added or removed, only the delta (`subscriptions.new` or `subscriptions.stale`) is passed to the batch builder, so each cycle may produce a message covering one or more feeds depending on what changed.
When a feed is added or removed, only the delta (`subscriptions.new` or `subscriptions.stale`) is passed to the batch builder by default, so each cycle may produce a message covering one or more feeds depending on what changed.

#### Snapshot batch subscribe

Some providers require the full desired subscription list on every subscribe message (not just new feeds). Set `options.batchSubscribeMode` to `'snapshot'`:

```typescript
builders: {
options: { batchSubscribeMode: 'snapshot' },
batchSubscribeMessage: (desired) => ({
action: 'subscribe',
pairs: desired.map((p) => `${p.base}/${p.quote}`),
}),
batchUnsubscribeMessage: (params) => ({
action: 'unsubscribe',
pairs: [`${params.base}/${params.quote}`],
}),
},
```

| Mode | `batchSubscribeMessage` receives | Sent when |
| ----------------- | -------------------------------- | ------------------------------ |
| `delta` (default) | `subscriptions.new` | `subscriptions.new.length > 0` |
| `snapshot` | `subscriptions.desired` | `subscriptions.new.length > 0` |

Unsubscribe messages are always sent for `subscriptions.stale` only. When both new and stale feeds exist in the same cycle, unsubscribes are sent before subscribes.

### Heartbeat messages

Expand Down
6 changes: 3 additions & 3 deletions src/transports/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ export class TransportRoutes<T extends TransportGenerics> {
private map: Record<string, Transport<T>> = {}

register<T2 extends T>(name: string, transport: Transport<T2>) {
// This is intentional, to keep names to one word only
if (name !== DEFAULT_TRANSPORT_NAME && !/^[a-z]+$/.test(name)) {
// This is intentional, to keep names to one word only + numbers
if (name !== DEFAULT_TRANSPORT_NAME && !/^[a-z0-9]+$/.test(name)) {
throw new Error(
`Transport name "${name}" is invalid. Names in the AdapterEndpoint transports map can only include lowercase letters.`,
`Transport name "${name}" is invalid. Names in the AdapterEndpoint transports map can only include lowercase letters and numbers.`,
)
}
if (this.map[name]) {
Expand Down
108 changes: 67 additions & 41 deletions src/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,22 @@ type WebSocketIndividualBuilders<T extends WebsocketTransportGenerics> = {
) => unknown
}

/** Selector type for batch subscription mode */
export type BatchSubscribeMode = 'delta' | 'snapshot'

export type WebSocketBatchBuilderOptions = {
/**
* `delta` (default): `batchSubscribeMessage` receives `subscriptions.new`.
* `snapshot`: when `subscriptions.new` is non-empty, receives `subscriptions.desired`.
*/
batchSubscribeMode?: BatchSubscribeMode
}

type WebSocketBatchBuilders<T extends WebsocketTransportGenerics> = {
/**
* Builds a single WS message that will be sent to subscribe to multiple feeds at once
*
* @param params - the list of adapter requests to subscribe to in this batch
* @param params - the list of adapter requests to subscribe to in this batch, dependent on WebSocketBatchBuilderOptions.batchSubscribeMode
* @param context - the background context for the Adapter
* @returns the WS message (can be any type as long as the [[WebSocket]] doesn't complain)
*/
Expand All @@ -93,6 +104,11 @@ type WebSocketBatchBuilders<T extends WebsocketTransportGenerics> = {
params: TypeFromDefinition<T['Parameters']>[],
context: EndpointContext<T>,
) => unknown

/**
* Optional parameters used when building batch subscribe/unsubscribe messages
*/
options?: WebSocketBatchBuilderOptions
}

/**
Expand Down Expand Up @@ -420,11 +436,59 @@ export class WebSocketTransport<
return connection
}

private async sendSubscriptionMessages(
context: EndpointContext<T>,
subscriptions: SubscriptionDeltas<TypeFromDefinition<T['Parameters']>>,
): Promise<void> {
logger.debug('Connection is open, sending subs/unsubs if there are any')
const builders = this.config.builders
if (builders) {
if ('batchSubscribeMessage' in builders) {
const { batchSubscribeMessage, batchUnsubscribeMessage, options } = builders
const mode = options?.batchSubscribeMode ?? 'delta'
const batchSubscriptionInputs = mode === 'delta' ? subscriptions.new : subscriptions.desired
await this.sendMessages(
context,
// Avoid sending subscribe/unsubscribe messages if nothing new to send
batchSubscribeMessage && subscriptions.new.length > 0
? [batchSubscribeMessage(batchSubscriptionInputs, context)].filter(
(m) => m !== undefined,
)
: subscriptions.new,
batchUnsubscribeMessage && subscriptions.stale.length > 0
? [batchUnsubscribeMessage(subscriptions.stale, context)].filter((m) => m !== undefined)
: subscriptions.stale,
)
} else {
const { subscribeMessage, unsubscribeMessage } = builders as WebSocketIndividualBuilders<T>
await this.sendMessages(
context,
subscribeMessage
? subscriptions.new
.map((sub) => subscribeMessage(sub, context))
.filter((m) => m !== undefined)
: subscriptions.new,
unsubscribeMessage
? subscriptions.stale
.map((sub) => unsubscribeMessage(sub, context))
.filter((m) => m !== undefined)
: subscriptions.stale,
)
}
recordWsMessageSentMetrics(context, subscriptions.new, subscriptions.stale)
} else {
logger.trace(
"This ws transport has no builders configured, so we're not sending any messages",
)
}
}

async sendMessages(context: EndpointContext<T>, subscribes: unknown[], unsubscribes: unknown[]) {
const serializedSubscribes = subscribes.map(this.serializeMessage)
const serializedUnsubscribes = unsubscribes.map(this.serializeMessage)

const messages = serializedSubscribes.concat(serializedUnsubscribes)
// Order unsubscribes before subscribes in case of overlap race condition
const messages = serializedUnsubscribes.concat(serializedSubscribes)

if (messages.length > 0) {
logger.debug(`There are ${messages.length} messages to send`)
Expand Down Expand Up @@ -557,45 +621,7 @@ export class WebSocketTransport<
// Otherwise we could encounter the case where we just closed the connection because there's no desired ones,
// but without this check we'd attempt to send out all the unsubscribe messages
if (!this.connectionClosed()) {
logger.debug('Connection is open, sending subs/unsubs if there are any')
const builders = this.config.builders
if (builders) {
if ('batchSubscribeMessage' in builders) {
const { batchSubscribeMessage, batchUnsubscribeMessage } = builders
await this.sendMessages(
context,
batchSubscribeMessage
? [batchSubscribeMessage(subscriptions.new, context)].filter((m) => m !== undefined)
: subscriptions.new,
batchUnsubscribeMessage
? [batchUnsubscribeMessage(subscriptions.stale, context)].filter(
(m) => m !== undefined,
)
: subscriptions.stale,
)
} else {
const { subscribeMessage, unsubscribeMessage } =
builders as WebSocketIndividualBuilders<T>
await this.sendMessages(
context,
subscribeMessage
? subscriptions.new
.map((sub) => subscribeMessage(sub, context))
.filter((m) => m !== undefined)
: subscriptions.new,
unsubscribeMessage
? subscriptions.stale
.map((sub) => unsubscribeMessage(sub, context))
.filter((m) => m !== undefined)
: subscriptions.stale,
)
}
recordWsMessageSentMetrics(context, subscriptions.new, subscriptions.stale)
} else {
logger.trace(
"This ws transport has no builders configured, so we're not sending any messages",
)
}
await this.sendSubscriptionMessages(context, subscriptions)
}

// Record WS subscription metrics
Expand Down
62 changes: 59 additions & 3 deletions test/transports/websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Adapter, AdapterEndpoint, EndpointContext } from '../../src/adapter'
import { AdapterConfig, EmptyCustomSettings } from '../../src/config'
import { metrics as eaMetrics } from '../../src/metrics'
import {
BatchSubscribeMode,
WebSocketClassProvider,
WebsocketReverseMappingTransport,
WebSocketTransport,
Expand Down Expand Up @@ -1946,7 +1947,7 @@ test.serial(
},
)

const createBatchAdapter = (): Adapter => {
const createBatchAdapter = (batchSubscribeMode?: BatchSubscribeMode): Adapter => {
const websocketTransport = new WebSocketTransport<WebSocketTypes>({
url: () => ENDPOINT_URL,
handlers: {
Expand All @@ -1972,6 +1973,7 @@ const createBatchAdapter = (): Adapter => {
},
},
builders: {
...(batchSubscribeMode ? { options: { batchSubscribeMode } } : {}),
batchSubscribeMessage: (params) => ({
request: 'subscribe',
pairs: params.map((p) => `${p.base}/${p.quote}`),
Expand Down Expand Up @@ -2007,7 +2009,7 @@ const createBatchAdapter = (): Adapter => {
})
}

test.serial('batch builders send one subscribe message for multiple feeds', async (t) => {
test.serial('batch builders (delta) send one subscribe message for multiple feeds', async (t) => {
mockWebSocketProvider(WebSocketClassProvider)
const mockWsServer = new Server(ENDPOINT_URL, { mock: false })
const subscribeMessages: Array<{ request: string; pairs: string[] }> = []
Expand Down Expand Up @@ -2056,7 +2058,7 @@ test.serial('batch builders send one subscribe message for multiple feeds', asyn
t.deepEqual(subscribeMessages[1].pairs, ['BTC/USD'])
})

test.serial('batch builders send one unsubscribe message when feeds go stale', async (t) => {
test.serial('batch builders (delta) send unsubscribe when feeds go stale', async (t) => {
mockWebSocketProvider(WebSocketClassProvider)
const mockWsServer = new Server(ENDPOINT_URL, { mock: false })

Expand Down Expand Up @@ -2096,3 +2098,57 @@ test.serial('batch builders send one unsubscribe message when feeds go stale', a
mockWsServer.close()
await t.context.clock.runToLastAsync()
})

test.serial(
'batch builders (snapshot) pass subscriptions.desired on incremental add',
async (t) => {
mockWebSocketProvider(WebSocketClassProvider)
const mockWsServer = new Server(ENDPOINT_URL, { mock: false })
const subscribeMessages: Array<{ request: string; pairs: string[] }> = []

mockWsServer.on('connection', (socket) => {
socket.on('message', (rawMsg) => {
const parsed = JSON.parse(rawMsg.toString())
if (parsed.request === 'subscribe') {
subscribeMessages.push(parsed)
for (const pair of parsed.pairs) {
socket.send(JSON.stringify({ pair, value: price }))
}
}
})
})

const testAdapter = await TestAdapter.startWithMockedCache(
createBatchAdapter('snapshot'),
t.context,
)

await testAdapter.startBackgroundExecuteThenGetResponse(t, {
requestData: { base: 'ETH', quote: 'USD' },
expectedCacheSize: 1,
expectedResponse: {
data: { result: price },
result: price,
statusCode: 200,
},
})

await testAdapter.startBackgroundExecuteThenGetResponse(t, {
requestData: { base: 'BTC', quote: 'USD' },
expectedCacheSize: 2,
expectedResponse: {
data: { result: price },
result: price,
statusCode: 200,
},
})

testAdapter.api.close()
mockWsServer.close()
await t.context.clock.runToLastAsync()

t.is(subscribeMessages.length, 2)
t.deepEqual(subscribeMessages[0].pairs, ['ETH/USD'])
t.deepEqual(subscribeMessages[1].pairs, ['ETH/USD', 'BTC/USD'])
},
)
Loading