diff --git a/src/util/requester.ts b/src/util/requester.ts index 89249c2d..4459161e 100644 --- a/src/util/requester.ts +++ b/src/util/requester.ts @@ -8,68 +8,16 @@ import { AdapterDataProviderError, AdapterRateLimitError, } from '../validation/error' +import { EvictedError, TurnQueue } from './turn-queue' const logger = makeLogger('Requester') -interface ListNode { - value: T - next: ListNode | undefined -} - -class UniqueLinkedList { - first: ListNode | undefined - last: ListNode | undefined - length = 0 - - constructor(private maxLength: number) {} - - add(value: T): T | undefined { - let overflow - if (this.length === this.maxLength) { - // If this new item would put us over max length, remove the first one (i.e. oldest one) - overflow = this.remove() - } - - const node: ListNode = { - value, - next: undefined, - } - - if (!this.first) { - this.first = node - } - if (this.last) { - this.last.next = node - } - - this.last = node - this.length++ - metrics.get('requesterQueueSize').inc() - return overflow - } - - remove() { - const node = this.first - - if (!node) { - return - } - - this.first = node.next - this.length-- - metrics.get('requesterQueueSize').dec() - return node.value - } -} - -interface QueuedRequest { +interface PendingRequest { key: string config: AxiosRequestConfig retries: number cost?: number promise: Promise> - reject: (err: unknown) => void - resolve: (req: RequesterResult) => void } interface RequesterResult { @@ -93,9 +41,8 @@ interface RequesterResult { * - Contemplate architectures with multiple writer EA instances */ export class Requester { - private processing = false - private queue: UniqueLinkedList - private map = {} as Record + private queue: TurnQueue + private pendingRequestMap = new Map() private maxRetries: number private timeout: number private sleepBeforeRequeueingMs: number @@ -107,54 +54,7 @@ export class Requester { this.maxRetries = adapterSettings.RETRY this.timeout = adapterSettings.API_TIMEOUT this.sleepBeforeRequeueingMs = adapterSettings.REQUESTER_SLEEP_BEFORE_REQUEUEING_MS - this.queue = new UniqueLinkedList(adapterSettings.MAX_HTTP_REQUEST_QUEUE_LENGTH) - } - - private queueRequest(queuedRequest: QueuedRequest): void { - // By the time we're here, we know that queuedRequest has both the unresolved promise, and the resolve and reject handlers within - // It's really, REALLY important for thread safety that from this point until this function returns, there are no async breaks. - // Node will stay within a "thread" until it gets a chance to switch context, like an "await" statement. This section of code depends - // on it executing from start to finish without any other actions to avoid race conditions, so if we had an "await" here we could run into problems. - // For example, two separate queue processing "threads" could be spawned by mistake, or the queue could overflow. - const overflowedRequest = this.queue.add(queuedRequest as QueuedRequest) - if (overflowedRequest) { - // If we have overflow, it means the oldest request needs to be rejected because the queue is at its limits - censorLogs(() => - logger.debug( - `Request (Key: ${overflowedRequest.key}, Retry #: ${overflowedRequest.retries}) was removed from the queue to make room for a newer one (Size: ${this.queue.length})`, - ), - ) - metrics.get('requesterQueueOverflow').inc() - overflowedRequest.reject( - new AdapterRateLimitError({ - message: - 'The EA was unable to execute the request to fetch the requested data from the DP because the request queue overflowed. This likely indicates that a higher API tier is needed.', - statusCode: 429, - msUntilNextExecution: this.rateLimiter.msUntilNextExecution(), - }), - ) - - // Remove the overflown request from our map - delete this.map[overflowedRequest.key] - } - - // The item was successfully added to the queue, so we can also add it to our map - // If the request is being re-added because it will be retried, this will have no practical effect - censorLogs(() => - logger.trace( - `Added request (Key: ${queuedRequest.key}, Retry #: ${queuedRequest.retries}) to the queue (Size: ${this.queue.length})`, - ), - ) - this.map[queuedRequest.key] = queuedRequest as QueuedRequest - - // Finally, we start the queue processing - if (!this.processing) { - this.processing = true - logger.debug(`Starting requester queue processing`) - // We don't want to wait for the queue to finish processing here; this will just spawn a "thread" - // and the promise we'll return from this method is the one for the request when it resolves - this.processNext() - } + this.queue = new TurnQueue(adapterSettings.MAX_HTTP_REQUEST_QUEUE_LENGTH) } /** @@ -171,7 +71,7 @@ export class Requester { cost?: number, ): Promise> { // If there's already a queued request, reuse it's existing promise - const existingQueuedRequest = this.map[key] + const existingQueuedRequest = this.pendingRequestMap.get(key) if (existingQueuedRequest) { censorLogs(() => logger.trace(`Request already exists, returning queued promise (Key: ${key})`), @@ -179,87 +79,111 @@ export class Requester { return existingQueuedRequest.promise as Promise> } - const queuedRequest = { + // Set configured timeout for all requests unless manually specified + req.timeout = req.timeout || this.timeout + + const pendingRequest = { key, config: req, retries: 0, cost, - } as QueuedRequest + } as PendingRequest - // This dual promise layer is built so the queuedRequest can hold both the resolve and reject handlers, - // and the promise itself so we can return it for request coalescing without creating new ones - await new Promise((unblock) => { - queuedRequest.promise = new Promise>((success, failure) => { - queuedRequest.resolve = success - queuedRequest.reject = failure - unblock(0) - }) - }) + this.pendingRequestMap.set(pendingRequest.key, pendingRequest as PendingRequest) - // Add the request to our queue - this.queueRequest(queuedRequest) + pendingRequest.promise = this.executeRequestWithRetries(pendingRequest) - return queuedRequest.promise + try { + return await pendingRequest.promise + } finally { + this.pendingRequestMap.delete(pendingRequest.key) + } } - // Will grab from queue sequentially, and sleep just before hitting rate limits - private async processNext(): Promise { - // This will remove from the list, but not the map; that way coalescing is still functional for in-flight reqs - const next = this.queue.remove() + private async executeRequestWithRetries( + pendingRequest: PendingRequest, + ): Promise> { + for (let retries = 0; ; retries++) { + pendingRequest.retries = retries + try { + await this.waitBeforeExecutingRequest(pendingRequest) + return await this.executeRequest(pendingRequest) + } catch (e) { + if (e instanceof AdapterRateLimitError) { + // Too many requests in the queue. Don't retry. + throw e + } + if (retries === this.maxRetries) { + logger.trace('No more retries remaining, rejecting promise...') + throw e + } + } - if (!next) { - logger.debug( - `No more requests present in the queue, stopping processing until new one comes in`, + const timeToSleep = this.sleepBeforeRequeueingMs || (2 ** retries + Math.random()) * 1000 + logger.info( + `${this.maxRetries - retries} retries remaining, sleeping for ${timeToSleep}ms...`, ) - this.processing = false - return + await sleep(timeToSleep) } + } - censorLogs(() => - logger.trace( - `Popped next request (Key: ${next.key}, Retry #: ${next.retries}) from the queue (Size: ${this.queue.length})`, - ), - ) - - // Wait until the rate limiter allows the request to be executed - await this.rateLimiter.waitForRateLimit(next.cost) - - // Fire off to complete in the background. We don't wait here to be able to fire off multiple requests concurrently - this.executeRequest.bind(this)(next) + // Waits for the request to have its turn in the queue and for the rate + // limiter. Throws if the request was removed from the queue due to overflow + // while waiting for its turn. + private async waitBeforeExecutingRequest(req: PendingRequest): Promise { + try { + metrics.get('requesterQueueSize').inc() + await this.queue.runInTurn(async () => { + metrics.get('requesterQueueSize').dec() + // Wait until the rate limiter allows the request to be executed + await this.rateLimiter.waitForRateLimit(req.cost) + }) + } catch (e) { + metrics.get('requesterQueueSize').dec() - return this.processNext() + if (e instanceof EvictedError) { + const overflowedRequest = req + censorLogs(() => + logger.debug( + `Request (Key: ${overflowedRequest.key}, Retry #: ${overflowedRequest.retries}) was removed from the queue to make room for a newer one (Size: ${this.queue.length})`, + ), + ) + metrics.get('requesterQueueOverflow').inc() + throw new AdapterRateLimitError({ + message: + 'The EA was unable to execute the request to fetch the requested data from the DP because the request queue overflowed. This likely indicates that a higher API tier is needed.', + statusCode: 429, + msUntilNextExecution: this.rateLimiter.msUntilNextExecution(), + }) + } + throw e + } } - // Handler for the requests that will be fired off, eventually resolving the promise associated with the queued request - private async executeRequest(req: QueuedRequest) { - const { key, config, resolve, reject, retries } = req + private async executeRequest(req: PendingRequest): Promise> { + const { key, config } = req + const providerDataRequested = Date.now() const responseTimer = metrics.get('dataProviderRequestDurationSeconds').startTimer() - - // Set configured timeout for all requests unless manually specified - config.timeout = config.timeout || this.timeout - try { censorLogs(() => logger.trace(`Sending request (Key: ${key}) to data provider`)) censorLogs(() => logger.trace(config)) const response = await axios.request(config) censorLogs(() => logger.trace(`Request (Key: ${key}) was successful `)) - resolve({ - response, - timestamps: { - providerDataRequestedUnixMs: providerDataRequested, - providerDataReceivedUnixMs: Date.now(), - }, - }) - - // Remove the request from our map - delete this.map[key] // Record count of successful data provider requests metrics .get('dataProviderRequests') .labels(dataProviderMetricsLabel(response.status, config.method)) .inc() + + return { + response, + timestamps: { + providerDataRequestedUnixMs: providerDataRequested, + providerDataReceivedUnixMs: Date.now(), + }, + } } catch (e) { const err = e as AxiosError censorLogs(() => @@ -279,43 +203,24 @@ export class Requester { .labels(dataProviderMetricsLabel(err.response?.status || 0, config.method)) .inc() - if (retries >= this.maxRetries) { - logger.trace('No more retries remaining, rejecting promise...') - const ErrorClass = err.response?.status ? AdapterDataProviderError : AdapterConnectionError - - reject( - new ErrorClass( - { - statusCode: 502, - name: 'Data Provider error', - providerStatusCode: err?.response?.status ?? 502, - message: err?.message, - cause: e, - errorResponse: err?.response?.data, - url: config.url, - }, - { - providerDataRequestedUnixMs: providerDataRequested, - providerDataReceivedUnixMs: Date.now(), - providerIndicatedTimeUnixMs: undefined, - }, - ), - ) - - // Remove the request from our map - delete this.map[key] - } else { - const timeToSleep = this.sleepBeforeRequeueingMs || (2 ** retries + Math.random()) * 1000 - logger.info( - `${this.maxRetries - retries} retries remaining, sleeping for ${timeToSleep}ms...`, - ) - await sleep(timeToSleep) - - req.retries++ - - // Re add the request to our queue - this.queueRequest(req) - } + const ErrorClass = err.response?.status ? AdapterDataProviderError : AdapterConnectionError + + throw new ErrorClass( + { + statusCode: 502, + name: 'Data Provider error', + providerStatusCode: err?.response?.status ?? 502, + message: err?.message, + cause: e, + errorResponse: err?.response?.data, + url: config.url, + }, + { + providerDataRequestedUnixMs: providerDataRequested, + providerDataReceivedUnixMs: Date.now(), + providerIndicatedTimeUnixMs: undefined, + }, + ) } finally { // Record time taken for data provider request for success or failure responseTimer() diff --git a/src/util/turn-queue.ts b/src/util/turn-queue.ts new file mode 100644 index 00000000..461047c9 --- /dev/null +++ b/src/util/turn-queue.ts @@ -0,0 +1,119 @@ +// TurnQueue is used by Requester to make sure requests execute in turn and to +// limit the number of pending requests. +// +// Before a request is made the requester waits for the rate limiter inside a +// call to run() on the TurnQueue. This callback is only run when the previous +// request finished waiting for its rate limit. +// +// If the maximum number of pending turns is exceeded, the oldest pending turn +// is dropped and its callback is never run. Instead the call to run() throws +// an EvictedError. + +export type Turn = { + release: () => void +} + +export class EvictedError extends Error { + constructor(message: string) { + super(message) + this.name = 'EvictedError' + } +} + +class TurnNode { + isDone = false + _next: TurnNode | undefined = undefined + starting: Promise + resolveStarting!: (turn: Turn | undefined) => void + + constructor(private queue: TurnQueue) { + this.starting = new Promise((resolve) => { + this.resolveStarting = resolve + }) + } + + start() { + this.resolveStarting({ + release: () => this.end(), + }) + } + + drop() { + this.resolveStarting(undefined) + } + + get next(): TurnNode | undefined { + return this._next + } + + set next(value: TurnNode | undefined) { + this._next = value + this.maybeStartNext() + } + + private end() { + if (this.isDone) { + throw new Error('Turn already ended') + } + this.isDone = true + this.maybeStartNext() + } + + private maybeStartNext() { + if (this.isDone && this.next) { + this.queue.startNext() + } + } +} + +export class TurnQueue { + length = 0 + active: TurnNode + last: TurnNode + + constructor(private maxLength: number) { + const ready = new TurnNode(this) + ready.isDone = true + this.active = ready + this.last = ready + } + + async runInTurn(task: () => Promise): Promise { + const turn = await this.takeTurn() + if (!turn) { + throw new EvictedError('Too many pending turns') + } + + try { + await task() + } finally { + turn.release() + } + } + + private async takeTurn(): Promise { + const turn = new TurnNode(this) + this.last.next = turn + this.last = turn + this.length++ + + if (this.length > this.maxLength) { + this.length-- + + const first = this.active.next! + if (first === this.last) { + this.last = this.active + } + first.drop() + this.active.next = first.next + } + + return turn.starting + } + + startNext() { + this.length-- + this.active = this.active.next! + this.active.start() + } +} diff --git a/test/util/requester.test.ts b/test/util/requester.test.ts index b69107c0..8f5a1a3e 100644 --- a/test/util/requester.test.ts +++ b/test/util/requester.test.ts @@ -213,7 +213,7 @@ test.serial( ) test.serial( - 'should not remove request from the queue while its still being rate limited', + 'should remove request from the queue while its still being rate limited', wrapTest(async (t) => { const params1 = { param: 'test1' } const params2 = { param: 'test2' } @@ -225,14 +225,15 @@ test.serial( const promise1 = makeRequest(t, params1) const promise2 = makeRequest(t, params2) - t.is(await getMetric(t, 'requester_queue_size'), 1) - // Sleeping shorter than the rate limit interval so the second request should - // still be in the queue. + // Sleeping shorter than the rate limit interval so the second request is + // still being rate limited. await sleep(rateLimitIntervalMs - 10) - // But the requester is too eager and removed it before it waiting for the - // reate limit. + // It's already removed from the queue because the rate limiting is + // specific to (the cost of) the request. So once we start rate limiting + // the request we are committed to it and don't want to evict it from the + // queue. t.is(await getMetric(t, 'requester_queue_size'), 0) const promise3 = makeRequest(t, params3) @@ -244,6 +245,7 @@ test.serial( // When adding the fifth request, the queue overflowed so the queue size is // still 2. + await sleep(0) t.is(await getMetric(t, 'requester_queue_size'), 2) t.is(await getMetric(t, 'requester_queue_overflow'), 1) @@ -674,13 +676,7 @@ test.serial( // Ignore } - t.is( - await getTotalRequestDuration(t), - (adapterSettings.RETRY + 1) * requestLatency + - // This is a bug. The request duration should not include the sleep time - // between retries. - adapterSettings.RETRY * adapterSettings.REQUESTER_SLEEP_BEFORE_REQUEUEING_MS, - ) + t.is(await getTotalRequestDuration(t), (adapterSettings.RETRY + 1) * requestLatency) }), ) @@ -737,9 +733,7 @@ test.serial( { params: { param: param2 }, timestamp: t0 + rateLimitIntervalMs, - // This should be 'async2' but because of a bug it's 'async1'. - // The test tests what's implemented so it can pass until the bug is fixed. - asyncLocal: 'async1', + asyncLocal: 'async2', }, ]) }), diff --git a/test/util/turn-queue.test.ts b/test/util/turn-queue.test.ts new file mode 100644 index 00000000..9eb57532 --- /dev/null +++ b/test/util/turn-queue.test.ts @@ -0,0 +1,279 @@ +import test from 'ava' +import { deferredPromise, sleep } from '../../src/util' +import { EvictedError, TurnQueue } from '../../src/util/turn-queue' + +// Flush pending microtasks so the queue's transitions settle before asserting. +const flush = () => sleep(0) + +type Run = { + started: boolean + finished: boolean + evicted: boolean + error: unknown + finish: () => void + promise: Promise +} + +// Starts a queue.runInTurn() whose task blocks until finish() is called, so a +// test can hold a turn and observe whether the caller is running, finished, or +// evicted. +const startRun = (queue: TurnQueue): Run => { + const [blockUntilFinished, unblock] = deferredPromise() + const run: Run = { + started: false, + finished: false, + evicted: false, + error: undefined, + finish: () => unblock(undefined), + promise: Promise.resolve(), + } + run.promise = queue + .runInTurn(async () => { + run.started = true + await blockUntilFinished + }) + .then(() => { + run.finished = true + }) + .catch((error) => { + run.error = error + run.evicted = error instanceof EvictedError + }) + return run +} + +test('runs the task immediately when the queue is free', async (t) => { + const queue = new TurnQueue(1) + + const a = startRun(queue) + await flush() + t.true(a.started) + t.false(a.finished) + + a.finish() + await flush() + t.true(a.finished) +}) + +test('runs tasks one at a time', async (t) => { + const queue = new TurnQueue(5) + + const a = startRun(queue) + const b = startRun(queue) + await flush() + + t.true(a.started) + t.false(b.started) // Only one task runs at a time + + a.finish() + await flush() + t.true(a.finished) + t.true(b.started) + t.false(b.finished) + + b.finish() + await flush() + t.true(b.finished) +}) + +test('runs waiting tasks in FIFO order', async (t) => { + const queue = new TurnQueue(5) + + const runs = [startRun(queue), startRun(queue), startRun(queue)] + await flush() + + t.true(runs[0].started) + t.false(runs[1].started) + t.false(runs[2].started) + + runs[0].finish() + await flush() + t.true(runs[1].started) + t.false(runs[2].started) + + runs[1].finish() + await flush() + t.true(runs[2].started) + + runs[2].finish() + await flush() + t.true(runs[2].finished) +}) + +test('releases the turn even if the task throws', async (t) => { + const queue = new TurnQueue(5) + + const failing = queue.runInTurn(async () => { + throw new Error('boom') + }) + const next = startRun(queue) + + // The error from the task propagates to the runInTurn() caller. + await t.throwsAsync(failing, { message: 'boom' }) + await flush() + + // The next task still gets its turn because the failed one released. + t.true(next.started) + + next.finish() + await flush() + t.true(next.finished) +}) + +test('maxLength 0: evicts a task requested while one is running', async (t) => { + const queue = new TurnQueue(0) + + const a = startRun(queue) + await flush() + t.true(a.started) + + const b = startRun(queue) + await flush() + t.true(b.evicted) + t.false(b.started) + + // After the running task finishes, a fresh task can run again. + a.finish() + await flush() + + const c = startRun(queue) + await flush() + t.true(c.started) + + c.finish() + await flush() +}) + +test('maxLength 1: evicts the oldest waiter when a new one arrives', async (t) => { + const queue = new TurnQueue(1) + + const a = startRun(queue) // Runs + const b = startRun(queue) // Waits + await flush() + t.true(a.started) + t.false(b.started) + t.false(b.evicted) + + const c = startRun(queue) // Evicts b + await flush() + t.true(b.evicted) + t.false(c.started) // C is now the only waiter + + a.finish() + await flush() + t.true(c.started) // C runs, skipping the evicted b + + c.finish() + await flush() +}) + +test('maxLength 2: evicts the oldest waiter and keeps the newer ones', async (t) => { + const queue = new TurnQueue(2) + + const a = startRun(queue) // Runs + const b = startRun(queue) // Waits + const c = startRun(queue) // Waits + const d = startRun(queue) // Evicts b + await flush() + + t.true(a.started) + t.true(b.evicted) + t.false(c.started) + t.false(c.evicted) + t.false(d.started) + t.false(d.evicted) + + a.finish() + await flush() + t.true(c.started) + t.false(d.started) + + c.finish() + await flush() + t.true(d.started) + + d.finish() + await flush() +}) + +test('maxLength 1: repeated arrivals keep only the newest waiter', async (t) => { + const queue = new TurnQueue(1) + + const a = startRun(queue) + const b = startRun(queue) + const c = startRun(queue) // Evicts b + const d = startRun(queue) // Evicts c + await flush() + + t.true(a.started) + t.true(b.evicted) + t.true(c.evicted) + t.false(d.started) + t.false(d.evicted) + + a.finish() + await flush() + t.true(d.started) + + d.finish() + await flush() +}) + +test('throws EvictedError without running the task when evicted', async (t) => { + const queue = new TurnQueue(0) + + const a = startRun(queue) + await flush() + + const b = startRun(queue) + await flush() + + t.true(b.error instanceof EvictedError) + t.false(b.started) // The task of an evicted caller never runs + + a.finish() + await flush() +}) + +test('tracks the number of waiting tasks in length', async (t) => { + const queue = new TurnQueue(5) + + t.is(queue.length, 0) + + const a = startRun(queue) + await flush() + t.is(queue.length, 0) // The running task is not counted as waiting + + const b = startRun(queue) + const c = startRun(queue) + await flush() + t.is(queue.length, 2) + + a.finish() + await flush() + t.is(queue.length, 1) // B runs, c still waits + + b.finish() + await flush() + t.is(queue.length, 0) + + c.finish() + await flush() +}) + +test('reuses the queue after it drains', async (t) => { + const queue = new TurnQueue(2) + + const a = startRun(queue) + await flush() + a.finish() + await flush() + t.true(a.finished) + + const b = startRun(queue) + await flush() + t.true(b.started) + b.finish() + await flush() + t.true(b.finished) +})