diff --git a/perfTest/graphile.config.js b/perfTest/graphile.config.js index 841ba7d6..62ee0b4c 100644 --- a/perfTest/graphile.config.js +++ b/perfTest/graphile.config.js @@ -15,7 +15,9 @@ const preset = { fileExtensions: [".js", ".cjs", ".mjs"], // fileExtensions: [".js", ".cjs", ".mjs", ".ts", ".cts", ".mts"], gracefulShutdownAbortTimeout: 2500, - localQueueSize: -1, + localQueue: { + size: -1, + }, completeJobBatchDelay: -1, failJobBatchDelay: -1, }, diff --git a/src/index.ts b/src/index.ts index 96ac478c..cd33c952 100644 --- a/src/index.ts +++ b/src/index.ts @@ -155,31 +155,75 @@ declare global { events?: WorkerEvents; - /** - * To enable processing jobs in batches, set this to an integer larger - * than 1. This will result in jobs being fetched by the pool rather than - * the worker, the pool will fetch (and lock!) `localQueueSize` jobs up - * front, and each time a worker requests a job it will be served from - * this list until the list is exhausted, at which point a new set of - * jobs will be fetched (and locked). - * - * This setting can help reduce the load on your database from looking - * for jobs, but is only really effective when there are often many jobs - * queued and ready to go, and can increase the latency of job execution - * because a single worker may lock jobs into its queue leaving other - * workers idle. - * - * @default `-1` - */ - localQueueSize?: number; + localQueue?: { + /** + * To enable processing jobs in batches, set this to an integer larger + * than 1. This will result in jobs being fetched by the pool rather than + * the worker, the pool will fetch (and lock!) `localQueue.size` jobs up + * front, and each time a worker requests a job it will be served from + * this list until the list is exhausted, at which point a new set of + * jobs will be fetched (and locked). + * + * This setting can help reduce the load on your database from looking + * for jobs, but is only really effective when there are often many jobs + * queued and ready to go, and can increase the latency of job execution + * because a single worker may lock jobs into its queue leaving other + * workers idle. + * + * @default `-1` + */ + size: number; - /** - * How long should jobs sit in the local queue before they are returned - * to the database? Defaults to 5 minutes. - * - * @default `300000` - */ - localQueueTtl?: number; + /** + * How long should jobs sit in the local queue before they are returned + * to the database? Defaults to 5 minutes. + * + * @default `300000` + */ + ttl?: number; + + /** + * When running at very high scale (multiple worker instances, each + * with some level of concurrency), Worker's polling can cause + * significant load on the database when there are too few jobs in the + * database to keep all worker pools busy - each time a new job comes + * in, each pool may request it, multiplying up the load. To reduce + * this impact, when a pool receives no (or few) results to its query + * for new jobs, we can instigate a "refetch delay" to cause the pool + * to wait before issuing its next poll for jobs, even when new job + * notifications come in. + */ + refetchDelay?: { + /** + * How long in milliseconds to wait, on average, before asking for + * more jobs when a previous fetch results in insufficient jobs to + * fill the local queue. (Causes the local queue to (mostly) ignore + * "new job" notifications.) + * + * When new jobs are coming in but the workers are mostly idle, you + * can expect on average `(1000/durationMs) * INSTANCE_COUNT` "get jobs" + * queries per second to be issued to your database. Increasing this + * decreases database load at the cost of increased latency when there + * are insufficient jobs in the database to keep the local queue full. + */ + durationMs: number; + /** + * How many jobs should a fetch return to trigger the refetchDelay? + * Must be less than the local queue size + * + * @default {0} + */ + threshold?: number; + /** + * How many new jobs, on average, can the pool that's in idle fetch + * delay be notified of before it aborts the refetch delay and fetches + * anyway + * + * @default {5 * localQueue.size} + */ + abortThreshold?: number; + }; + }; /** * The time in milliseconds to wait after a `completeJob` call to see if diff --git a/src/localQueue.ts b/src/localQueue.ts index 8299b895..a284a294 100644 --- a/src/localQueue.ts +++ b/src/localQueue.ts @@ -11,6 +11,7 @@ import { GetJobFunction, Job, TaskList, WorkerPool } from "./interfaces"; import { getJob as baseGetJob } from "./sql/getJob"; import { returnJob } from "./sql/returnJob"; +const STARTING = "STARTING"; const POLLING = "POLLING"; const WAITING = "WAITING"; const TTL_EXPIRED = "TTL_EXPIRED"; @@ -23,34 +24,48 @@ const RELEASED = "RELEASED"; * relieving the workers of this responsibility. * * The local queue trades latency for throughput: jobs may sit in the local - * queue for a longer time (maximum `localQueueSize` jobs waiting maximum - * `localQueueTTL` milliseconds), but fewer requests to the database are made + * queue for a longer time (maximum `localQueue.size` jobs waiting maximum + * `localQueue.ttl` milliseconds), but fewer requests to the database are made * for jobs since more jobs are fetched at once, enabling the worker to reach * higher levels of performance (and reducing read stress on the DB). * * The local queue is always in one of these modes: * + * - STARTING mode * - POLLING mode * - WAITING mode * - TTL_EXPIRED mode * - RELEASED mode * + * ## STARTING mode + * + * STARTING mode is the initial state of the local queue. + * + * Immediately move to POLLING mode. + * * ## POLLING mode * - * POLLING mode is the initial state of the local queue. The queue will only be - * in POLLING mode when it contains no cached jobs. + * The queue will only be in POLLING mode when it contains no cached jobs. + * + * When the queue enters POLLING mode: * - * When the queue enters POLLING mode (and when it starts) it will trigger a - * fetch of jobs from the database. + * - if any refetch delay has expired it will trigger a fetch of jobs from the + * database, + * - otherwise it will trigger a refetch to happen once the refetch delay has + * completed. * - * If no jobs were returned then it will wait `pollInterval` ms and then fetch - * again. + * When jobs are fetched: * - * If a "new job" notification is received during the polling interval then the - * timer will be cancelled, and a fetch will be fired immediately. + * - if no jobs were returned then it will wait `pollInterval` ms and then + * fetch again. + * - if fewer than `Math.ceil(Math.min(localQueueRefetchDelay.threshold, localQueueSize))` + * jobs were returned then a refetch delay will be set (if configured). + * - if jobs are returned from a POLLING mode fetch then the queue immediately + * enters WAITING mode. * - * If jobs are returned from a POLLING mode fetch then the queue immediately - * enters WAITING mode. + * When a "new job" notification is received, once any required refetch delay + * has expired (or immediately if it has already expired) the timer will be + * cancelled, and a fetch will be fired immediately. * * ## WAITING mode * @@ -94,10 +109,22 @@ export class LocalQueue { // Set true to fetch immediately after a fetch completes; typically only used // when the queue is pulsed during a fetch. fetchAgain = false; - mode: typeof POLLING | typeof WAITING | typeof TTL_EXPIRED | typeof RELEASED; + mode: + | typeof STARTING + | typeof POLLING + | typeof WAITING + | typeof TTL_EXPIRED + | typeof RELEASED = STARTING; private promise = defer(); private backgroundCount = 0; + /** If `localQueueRefetchDelay` is configured; set this true if the fetch resulted in a queue size lower than the threshold. */ + private refetchDelayActive = false; + private refetchDelayFetchOnComplete = false; + private refetchDelayTimer: NodeJS.Timeout | null = null; + private refetchDelayCounter: number = 0; + private refetchDelayAbortThreshold: number = Infinity; + constructor( private readonly compiledSharedOptions: CompiledSharedOptions, private readonly tasks: TaskList, @@ -107,9 +134,20 @@ export class LocalQueue { private readonly continuous: boolean, ) { this.ttl = - compiledSharedOptions.resolvedPreset.worker.localQueueTtl ?? 5 * MINUTE; + compiledSharedOptions.resolvedPreset.worker.localQueue?.ttl ?? 5 * MINUTE; this.pollInterval = compiledSharedOptions.resolvedPreset.worker.pollInterval ?? 2 * SECOND; + const localQueueRefetchDelayDuration = + compiledSharedOptions.resolvedPreset.worker.localQueue?.refetchDelay + ?.durationMs; + if ( + localQueueRefetchDelayDuration != null && + localQueueRefetchDelayDuration > this.pollInterval + ) { + throw new Error( + `Invalid configuration; 'preset.worker.localQueue.refetchDelay.durationMs' (${localQueueRefetchDelayDuration}) must not be larger than 'preset.worker.pollInterval' (${this.pollInterval})`, + ); + } this.setModePolling(); } @@ -232,12 +270,20 @@ export class LocalQueue { `Failed to return jobs from local queue to database queue`, { error: e }, ); - } + }, ), ); } private fetch = (): void => { + if (this.fetchTimer) { + clearTimeout(this.fetchTimer); + this.fetchTimer = null; + } + if (this.refetchDelayActive) { + this.refetchDelayFetchOnComplete = true; + return; + } this.background( this._fetch().catch((e) => { // This should not happen @@ -249,7 +295,21 @@ export class LocalQueue { }; private async _fetch() { + /** + * Did we fetch the maximum number of records that we could? (If so, we + * should consider fetching again straight away so there's always jobs to + * be done.) + */ let fetchedMax = false; + /** + * Did we fetch more jobs than the refetch delay threshold? (Greater than, + * not equal to.) If false, we should start a refetch delay. + * + * Initialized to `true` so on error we don't enable refetch delay. + */ + let refetchDelayThresholdSurpassed = true; + const refetchDelayOptions = + this.compiledSharedOptions.resolvedPreset.worker.localQueue?.refetchDelay; try { assert.equal(this.mode, POLLING, "Can only fetch when in polling mode"); assert.equal( @@ -257,12 +317,22 @@ export class LocalQueue { false, "Cannot fetch when a fetch is already in progress", ); - if (this.fetchTimer) { - clearTimeout(this.fetchTimer); - this.fetchTimer = null; - } + assert.equal( + this.refetchDelayActive, + false, + "Can not fetch when fetches are meant to be delayed", + ); + assert.equal( + this.jobQueue.length, + 0, + "Should not fetch when job queue isn't empty", + ); this.fetchAgain = false; this.fetchInProgress = true; + // NOTE: this.refetchDelayCounter is set here allow for pulse() during + // fetch(). If the refetch delay threshold is surpassed then this value + // is harmlessly ignored. + this.refetchDelayCounter = 0; // The ONLY await in this function. const jobs = await baseGetJob( @@ -277,10 +347,21 @@ export class LocalQueue { assert.equal( this.jobQueue.length, 0, - "Should not fetch when job queue isn't empty", + "Should not fetch when job queue isn't empty (recheck)", ); const jobCount = jobs.length; fetchedMax = jobCount >= this.getJobBatchSize; + refetchDelayThresholdSurpassed = + // If we've fetched the maximum, we've met the requirement + fetchedMax || + // If refetch delay is disabled, we've met the requirement + !refetchDelayOptions || + // If we fetched more than (**not** equal to) `threshold` jobs, we've met the requirement + jobCount > Math.floor(refetchDelayOptions.threshold ?? 0); + + // NOTE: we don't need to handle `this.mode === RELEASED` here because + // being in that mode guarantees the workerQueue is empty. + const workerCount = Math.min(jobCount, this.workerQueue.length); const workers = this.workerQueue.splice(0, workerCount); for (let i = 0; i < jobCount; i++) { @@ -304,28 +385,100 @@ export class LocalQueue { // Finally, now that there is no fetch in progress, choose what to do next if (this.mode === "RELEASED") { this.returnJobs(); - } else if (this.jobQueue.length > 0) { + return; + } + + if (!refetchDelayThresholdSurpassed) { + /** How long to avoid any refetches for */ + const refetchDelayMs = + (0.5 + Math.random()) * (refetchDelayOptions?.durationMs ?? 100); + /** How many notifications do we need to receive before we abort the "no refetches" behavior? */ + const abortThreshold = + (0.5 + Math.random()) * + Math.min( + refetchDelayOptions?.abortThreshold ?? Infinity, + 5 * this.getJobBatchSize, + ); + + this.fetchAgain = false; + this.refetchDelayActive = true; + this.refetchDelayFetchOnComplete = false; + this.refetchDelayAbortThreshold = abortThreshold; + // NOTE: this.refetchDelayCounter is set at the beginning of fetch() + // (i.e. above) to allow for pulse() during fetch() + this.refetchDelayTimer = setTimeout( + this.refetchDelayCompleteOrAbort, + refetchDelayMs, + ); + } + + if (this.jobQueue.length > 0) { this.setModeWaiting(); } else { if (fetchedMax || this.fetchAgain) { - // Maximal fetch; trigger immediate refetch + // Maximal fetch and all jobs instantly consumed; trigger immediate refetch + // OR: new jobs came in during fetch(); trigger immediate refetch + assert.equal( + this.refetchDelayActive, + false, + "refetchDelayActive should imply didn't fetch max and fetchAgain is false", + ); this.fetch(); } else if (this.continuous) { // Set up the timer this.fetchTimer = setTimeout(this.fetch, this.pollInterval); } else { this.setModeReleased(); + return; } } + + // In case the counter was incremented sufficiently during fetch() + this.handleCheckRefetchDelayAbortThreshold(); + } + + private refetchDelayCompleteOrAbort = (): void => { + if (this.refetchDelayTimer != null) { + clearTimeout(this.refetchDelayTimer); + this.refetchDelayTimer = null; + } + this.refetchDelayActive = false; + if (this.mode === POLLING && this.refetchDelayFetchOnComplete) { + // Cancel poll, do now + if (this.fetchTimer) { + clearTimeout(this.fetchTimer); + this.fetchTimer = null; + } + this.fetch(); + } + }; + + /** + * If no refetch delay is active, returns false; otherwise returns true and + * checks to see if we need to abort the delay and trigger a fetch. + */ + private handleCheckRefetchDelayAbortThreshold(): boolean { + if (!this.refetchDelayActive || this.mode === "RELEASED") { + return false; + } + if (this.refetchDelayCounter >= this.refetchDelayAbortThreshold) { + this.refetchDelayFetchOnComplete = true; + this.refetchDelayCompleteOrAbort(); + } + return true; } /** Called when a new job becomes available in the DB */ - public pulse() { - // The only situation when this affects anything is if we're running in polling mode. - if (this.mode === POLLING) { + public pulse(count: number) { + this.refetchDelayCounter += count; + + if (this.handleCheckRefetchDelayAbortThreshold()) { + // Refetch delay was enabled; we've incremented the counter and taken + // action if necessary. No further action necessary. + } else if (this.mode === POLLING) { if (this.fetchInProgress) { this.fetchAgain = true; - } else if (this.fetchTimer) { + } else if (this.fetchTimer != null) { clearTimeout(this.fetchTimer); this.fetchTimer = null; this.fetch(); @@ -387,6 +540,12 @@ export class LocalQueue { const oldMode = this.mode; this.mode = RELEASED; + if (this.refetchDelayTimer != null) { + clearTimeout(this.refetchDelayTimer); + this.refetchDelayTimer = null; + } + this.refetchDelayActive = false; + if (oldMode === POLLING) { // Release pending workers const workers = this.workerQueue.splice(0, this.workerQueue.length); diff --git a/src/main.ts b/src/main.ts index 5f86221d..c588b543 100644 --- a/src/main.ts +++ b/src/main.ts @@ -536,7 +536,7 @@ export function _runTaskList( worker: { concurrentJobs: baseConcurrency, gracefulShutdownAbortTimeout, - localQueueSize = -1, + localQueue: { size: localQueueSize = -1 } = {}, completeJobBatchDelay = -1, failJobBatchDelay = -1, }, @@ -660,7 +660,7 @@ export function _runTaskList( }, nudge(this: WorkerPool, count: number) { if (localQueue) { - localQueue.pulse(); + localQueue.pulse(count); } else { let n = count; // Nudge up to `n` workers