Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 100 additions & 195 deletions src/util/requester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,68 +8,16 @@ import {
AdapterDataProviderError,
AdapterRateLimitError,
} from '../validation/error'
import { EvictedError, TurnQueue } from './turn-queue'

const logger = makeLogger('Requester')

interface ListNode<T> {
value: T
next: ListNode<T> | undefined
}

class UniqueLinkedList<T> {
first: ListNode<T> | undefined
last: ListNode<T> | undefined
length = 0

constructor(private maxLength: number) {}

add(value: T): T | undefined {
let overflow
if (this.length === this.maxLength) {
// If this new item would put us over max length, remove the first one (i.e. oldest one)
overflow = this.remove()
}

const node: ListNode<T> = {
value,
next: undefined,
}

if (!this.first) {
this.first = node
}
if (this.last) {
this.last.next = node
}

this.last = node
this.length++
metrics.get('requesterQueueSize').inc()
return overflow
}

remove() {
const node = this.first

if (!node) {
return
}

this.first = node.next
this.length--
metrics.get('requesterQueueSize').dec()
return node.value
}
}

interface QueuedRequest<T = unknown> {
interface PendingRequest<T = unknown> {
key: string
config: AxiosRequestConfig
retries: number
cost?: number
promise: Promise<RequesterResult<T>>
reject: (err: unknown) => void
resolve: (req: RequesterResult<T>) => void
}

interface RequesterResult<T> {
Expand All @@ -93,9 +41,8 @@ interface RequesterResult<T> {
* - Contemplate architectures with multiple writer EA instances
*/
export class Requester {
private processing = false
private queue: UniqueLinkedList<QueuedRequest>
private map = {} as Record<string, QueuedRequest>
private queue: TurnQueue
private pendingRequestMap = new Map<string, PendingRequest>()
private maxRetries: number
private timeout: number
private sleepBeforeRequeueingMs: number
Expand All @@ -107,54 +54,7 @@ export class Requester {
this.maxRetries = adapterSettings.RETRY
this.timeout = adapterSettings.API_TIMEOUT
this.sleepBeforeRequeueingMs = adapterSettings.REQUESTER_SLEEP_BEFORE_REQUEUEING_MS
this.queue = new UniqueLinkedList<QueuedRequest>(adapterSettings.MAX_HTTP_REQUEST_QUEUE_LENGTH)
}

private queueRequest<T>(queuedRequest: QueuedRequest<T>): void {
// By the time we're here, we know that queuedRequest has both the unresolved promise, and the resolve and reject handlers within
// It's really, REALLY important for thread safety that from this point until this function returns, there are no async breaks.
// Node will stay within a "thread" until it gets a chance to switch context, like an "await" statement. This section of code depends
// on it executing from start to finish without any other actions to avoid race conditions, so if we had an "await" here we could run into problems.
// For example, two separate queue processing "threads" could be spawned by mistake, or the queue could overflow.
const overflowedRequest = this.queue.add(queuedRequest as QueuedRequest<unknown>)
if (overflowedRequest) {
// If we have overflow, it means the oldest request needs to be rejected because the queue is at its limits
censorLogs(() =>
logger.debug(
`Request (Key: ${overflowedRequest.key}, Retry #: ${overflowedRequest.retries}) was removed from the queue to make room for a newer one (Size: ${this.queue.length})`,
),
)
metrics.get('requesterQueueOverflow').inc()
overflowedRequest.reject(
new AdapterRateLimitError({
message:
'The EA was unable to execute the request to fetch the requested data from the DP because the request queue overflowed. This likely indicates that a higher API tier is needed.',
statusCode: 429,
msUntilNextExecution: this.rateLimiter.msUntilNextExecution(),
}),
)

// Remove the overflown request from our map
delete this.map[overflowedRequest.key]
}

// The item was successfully added to the queue, so we can also add it to our map
// If the request is being re-added because it will be retried, this will have no practical effect
censorLogs(() =>
logger.trace(
`Added request (Key: ${queuedRequest.key}, Retry #: ${queuedRequest.retries}) to the queue (Size: ${this.queue.length})`,
),
)
this.map[queuedRequest.key] = queuedRequest as QueuedRequest

// Finally, we start the queue processing
if (!this.processing) {
this.processing = true
logger.debug(`Starting requester queue processing`)
// We don't want to wait for the queue to finish processing here; this will just spawn a "thread"
// and the promise we'll return from this method is the one for the request when it resolves
this.processNext()
}
this.queue = new TurnQueue(adapterSettings.MAX_HTTP_REQUEST_QUEUE_LENGTH)
}

/**
Expand All @@ -171,95 +71,119 @@ export class Requester {
cost?: number,
): Promise<RequesterResult<T>> {
// If there's already a queued request, reuse it's existing promise
const existingQueuedRequest = this.map[key]
const existingQueuedRequest = this.pendingRequestMap.get(key)
if (existingQueuedRequest) {
censorLogs(() =>
logger.trace(`Request already exists, returning queued promise (Key: ${key})`),
)
return existingQueuedRequest.promise as Promise<RequesterResult<T>>
}

const queuedRequest = {
// Set configured timeout for all requests unless manually specified
req.timeout = req.timeout || this.timeout
Comment thread
mmcallister-cll marked this conversation as resolved.

const pendingRequest = {
key,
config: req,
retries: 0,
cost,
} as QueuedRequest<T>
} as PendingRequest<T>

// This dual promise layer is built so the queuedRequest can hold both the resolve and reject handlers,
// and the promise itself so we can return it for request coalescing without creating new ones
await new Promise((unblock) => {
queuedRequest.promise = new Promise<RequesterResult<T>>((success, failure) => {
queuedRequest.resolve = success
queuedRequest.reject = failure
unblock(0)
})
})
this.pendingRequestMap.set(pendingRequest.key, pendingRequest as PendingRequest)

// Add the request to our queue
this.queueRequest(queuedRequest)
pendingRequest.promise = this.executeRequestWithRetries(pendingRequest)

return queuedRequest.promise
try {
return await pendingRequest.promise
} finally {
this.pendingRequestMap.delete(pendingRequest.key)
}
}

// Will grab from queue sequentially, and sleep just before hitting rate limits
private async processNext(): Promise<void> {
// This will remove from the list, but not the map; that way coalescing is still functional for in-flight reqs
const next = this.queue.remove()
private async executeRequestWithRetries<T>(
pendingRequest: PendingRequest<T>,
): Promise<RequesterResult<T>> {
for (let retries = 0; ; retries++) {
pendingRequest.retries = retries
try {
await this.waitBeforeExecutingRequest(pendingRequest)
return await this.executeRequest(pendingRequest)
} catch (e) {
if (e instanceof AdapterRateLimitError) {
// Too many requests in the queue. Don't retry.
throw e
}
if (retries === this.maxRetries) {
logger.trace('No more retries remaining, rejecting promise...')
throw e
}
}

if (!next) {
logger.debug(
`No more requests present in the queue, stopping processing until new one comes in`,
const timeToSleep = this.sleepBeforeRequeueingMs || (2 ** retries + Math.random()) * 1000
logger.info(
`${this.maxRetries - retries} retries remaining, sleeping for ${timeToSleep}ms...`,
)
this.processing = false
return
await sleep(timeToSleep)
}
}

censorLogs(() =>
logger.trace(
`Popped next request (Key: ${next.key}, Retry #: ${next.retries}) from the queue (Size: ${this.queue.length})`,
),
)

// Wait until the rate limiter allows the request to be executed
await this.rateLimiter.waitForRateLimit(next.cost)

// Fire off to complete in the background. We don't wait here to be able to fire off multiple requests concurrently
this.executeRequest.bind(this)(next)
// Waits for the request to have its turn in the queue and for the rate
// limiter. Throws if the request was removed from the queue due to overflow
// while waiting for its turn.
private async waitBeforeExecutingRequest<T>(req: PendingRequest<T>): Promise<void> {
try {
metrics.get('requesterQueueSize').inc()
await this.queue.runInTurn(async () => {
metrics.get('requesterQueueSize').dec()
// Wait until the rate limiter allows the request to be executed
await this.rateLimiter.waitForRateLimit(req.cost)
})
} catch (e) {
metrics.get('requesterQueueSize').dec()

return this.processNext()
if (e instanceof EvictedError) {
const overflowedRequest = req
censorLogs(() =>
logger.debug(
`Request (Key: ${overflowedRequest.key}, Retry #: ${overflowedRequest.retries}) was removed from the queue to make room for a newer one (Size: ${this.queue.length})`,
),
)
metrics.get('requesterQueueOverflow').inc()
throw new AdapterRateLimitError({
Comment thread
mmcallister-cll marked this conversation as resolved.
message:
'The EA was unable to execute the request to fetch the requested data from the DP because the request queue overflowed. This likely indicates that a higher API tier is needed.',
statusCode: 429,
msUntilNextExecution: this.rateLimiter.msUntilNextExecution(),
})
}
throw e
}
}

// Handler for the requests that will be fired off, eventually resolving the promise associated with the queued request
private async executeRequest(req: QueuedRequest) {
const { key, config, resolve, reject, retries } = req
private async executeRequest<T>(req: PendingRequest<T>): Promise<RequesterResult<T>> {
const { key, config } = req

const providerDataRequested = Date.now()
const responseTimer = metrics.get('dataProviderRequestDurationSeconds').startTimer()

// Set configured timeout for all requests unless manually specified
config.timeout = config.timeout || this.timeout

try {
censorLogs(() => logger.trace(`Sending request (Key: ${key}) to data provider`))
censorLogs(() => logger.trace(config))
const response = await axios.request(config)
censorLogs(() => logger.trace(`Request (Key: ${key}) was successful `))
resolve({
response,
timestamps: {
providerDataRequestedUnixMs: providerDataRequested,
providerDataReceivedUnixMs: Date.now(),
},
})

// Remove the request from our map
delete this.map[key]

// Record count of successful data provider requests
metrics
.get('dataProviderRequests')
.labels(dataProviderMetricsLabel(response.status, config.method))
.inc()

return {
response,
timestamps: {
providerDataRequestedUnixMs: providerDataRequested,
providerDataReceivedUnixMs: Date.now(),
},
}
} catch (e) {
const err = e as AxiosError
censorLogs(() =>
Expand All @@ -279,43 +203,24 @@ export class Requester {
.labels(dataProviderMetricsLabel(err.response?.status || 0, config.method))
.inc()

if (retries >= this.maxRetries) {
logger.trace('No more retries remaining, rejecting promise...')
const ErrorClass = err.response?.status ? AdapterDataProviderError : AdapterConnectionError

reject(
new ErrorClass(
{
statusCode: 502,
name: 'Data Provider error',
providerStatusCode: err?.response?.status ?? 502,
message: err?.message,
cause: e,
errorResponse: err?.response?.data,
url: config.url,
},
{
providerDataRequestedUnixMs: providerDataRequested,
providerDataReceivedUnixMs: Date.now(),
providerIndicatedTimeUnixMs: undefined,
},
),
)

// Remove the request from our map
delete this.map[key]
} else {
const timeToSleep = this.sleepBeforeRequeueingMs || (2 ** retries + Math.random()) * 1000
logger.info(
`${this.maxRetries - retries} retries remaining, sleeping for ${timeToSleep}ms...`,
)
await sleep(timeToSleep)

req.retries++

// Re add the request to our queue
this.queueRequest(req)
}
const ErrorClass = err.response?.status ? AdapterDataProviderError : AdapterConnectionError

throw new ErrorClass(
{
statusCode: 502,
name: 'Data Provider error',
providerStatusCode: err?.response?.status ?? 502,
message: err?.message,
cause: e,
errorResponse: err?.response?.data,
url: config.url,
},
{
providerDataRequestedUnixMs: providerDataRequested,
providerDataReceivedUnixMs: Date.now(),
providerIndicatedTimeUnixMs: undefined,
},
)
} finally {
// Record time taken for data provider request for success or failure
responseTimer()
Expand Down
Loading
Loading