Skip to content

Commit

Permalink
fix: move code back
Browse files Browse the repository at this point in the history
  • Loading branch information
drobnikj committed Oct 14, 2024
1 parent 2232078 commit f997cff
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 104 deletions.
25 changes: 25 additions & 0 deletions packages/core/src/storages/request_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,30 @@ export abstract class RequestProvider implements IStorage {
* the function might occasionally return a false negative,
* but it will never return a false positive.
*/
/**
* TODO Kuba:
* I would move this to request_queue_v2.ts file as it is related to the new API.
* Let's clean it up. We would probably need some new API endpoints to make this work.
* But for now, we can use the listHead and getRequest as hacky solution.
* Because the listHead list all requests that are not handled, event the locked ones. And we can use getRequest to check if the request is locked.
*
* There are basically two paths here:
* 1. There are no other clients touching the queue(hadMultipleClient === false), so we can trust the to queueHeadIds cache.
* - If the queueHeadIds is not empty -> happy path we have some requests to process. -> false
* - If the queueHeadIds is empty
* -> we need to ensure that the queue is really empty(ensureHeadIsNonEmpty).
* -> Check again if the queueHeadIds is not empty -> false
*
* 2. There are other clients touching the queue(hadMultipleClient === true), so we need to count that other client can change the head.
* - If the queueHeadIds is not empty -> happy path we have some requests to process. -> false
* - If the queueHeadIds is empty
* -> we need to ensure that the queue is really empty(ensureHeadIsNonEmpty).
* -> we need to check if other clients still processing the requests (using listHead and getRequest on the first request)
* -> If there are some requests that are not handled(maybe locked), we can return false - some other client is still processing the requests. (let's notify about this in log using info log)
* -> If there are no requests -> we cannot still be sure that the queue is empty, because the other client can still adding requests to the queue.
* -> We need to use some timeout here and compare it with queue modifiedAt(it is in response of listHead), if there is no modification of queue for some time,
* we can be sure that the queue is empty. The timeout should be in the order of seconds.
*/
async isFinished(): Promise<boolean> {
// TODO: once/if we figure out why sometimes request queues get stuck (if it's even request queues), remove this once and for all :)
if (Date.now() - this.lastActivity.getTime() > this.internalTimeoutMillis) {
Expand All @@ -586,6 +610,7 @@ export abstract class RequestProvider implements IStorage {

this.queueHeadIds.clear();
// This cache may be the bane of our existence, but it's still required for v1...
// TODO Kuba: Let's remove this cache, it's not needed anymore, the head and lock is atomic operation.
this.recentlyHandledRequestsCache.clear();
}

Expand Down
104 changes: 0 additions & 104 deletions packages/core/src/storages/request_queue_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,110 +374,6 @@ export class RequestQueue extends RequestProvider {
}
}

/**
* Resolves to `true` if all requests were already handled and there are no more left.
* Due to the nature of distributed storage used by the queue,
* the function might occasionally return a false negative,
* but it will never return a false positive.
*/
/**
* TODO Kuba:
* This function is a bit of a mess. Let's clean it up. We would probably need some new API endpoints to make this work.
* But for now, we can use the listHead and getRequest as hacky solution.
* Because the listHead list all requests that are not handled, event the locked ones. And we can use getRequest to check if the request is locked.
*
* There are basically two paths here:
* 1. There are no other clients touching the queue(hadMultipleClient === false), so we can trust the to queueHeadIds cache.
* - If the queueHeadIds is not empty -> happy path we have some requests to process. -> false
* - If the queueHeadIds is empty
* -> we need to ensure that the queue is really empty(ensureHeadIsNonEmpty).
* -> Check again if the queueHeadIds is not empty -> false
*
* 2. There are other clients touching the queue(hadMultipleClient === true), so we need to count that other client can change the head.
* - If the queueHeadIds is not empty -> happy path we have some requests to process. -> false
* - If the queueHeadIds is empty
* -> we need to ensure that the queue is really empty(ensureHeadIsNonEmpty).
* -> we need to check if other clients still processing the requests (using listHead and getRequest on the first request)
* -> If there are some requests that are not handled(maybe locked), we can return false - some other client is still processing the requests. (let's notify about this in log using info log)
* -> If there are no requests -> we cannot still be sure that the queue is empty, because the other client can still adding requests to the queue.
* -> We need to use some timeout here and compare it with queue modifiedAt(it is in response of listHead), if there is no modification of queue for some time,
* we can be sure that the queue is empty. The timeout should be in the order of seconds.
*/
override async isFinished(): Promise<boolean> {
// TODO: once/if we figure out why sometimes request queues get stuck (if it's even request queues), remove this once and for all :)
if (Date.now() - this.lastActivity.getTime() > this.internalTimeoutMillis) {
const maybeHead = await this.client.listHead({ limit: 1 });

const request = maybeHead.items[0] ? await this.client.getRequest(maybeHead.items[0].id) : null;

const message = `The request queue hasn't had activity for ${
this.internalTimeoutMillis / 1000
}s, resetting internal state.`;

this.log.warning(message, {
queueHeadIdsPending: this.queueHeadIds.length(),
hasPendingOrLockedRequests: maybeHead.items.length > 0,
pendingRequestIsLocked: request?.lockExpiresAt,
});

this.queueHeadIds.clear();
// This cache may be the bane of our existence, but it's still required for v1...
// TODO Kuba: Let's remove this cache, it's not needed anymore, the head and lock is atomic operation.
this.recentlyHandledRequestsCache.clear();
}

if (this.queueHeadIds.length() > 0) {
this.log.debug('There are still ids in the queue head that are pending processing', {
queueHeadIdsPending: this.queueHeadIds.length(),
});

return false;
}

const currentHead = await this.client.listHead({ limit: 2 });

if (currentHead.items.length !== 0) {
// Give users some more concrete info as to why their crawlers seem to be "hanging" doing nothing while we're waiting because the queue is technically
// not empty. We decided that a queue with elements in its head but that are also locked shouldn't return true in this function.
// If that ever changes, this function might need a rewrite
// The `% 25` was absolutely arbitrarily picked. It's just to not spam the logs too much. This is also a very specific path that most crawlers shouldn't hit
if (++this.isFinishedCalledWhileHeadWasNotEmpty % 25 === 0) {
const requests = await Promise.all(
currentHead.items.map(async (item) => this.client.getRequest(item.id)),
);

this.log.info(
`Queue head still returned requests that need to be processed (or that are locked by other clients)`,
{
requests: requests
.map((r) => {
if (!r) {
return null;
}

return {
id: r.id,
lockExpiresAt: r.lockExpiresAt,
lockedBy: r.lockByClient,
};
})
.filter(Boolean),
clientKey: this.clientKey,
},
);
} else {
this.log.debug(
'Queue head still returned requests that need to be processed (or that are locked by other clients)',
{
requestIds: currentHead.items.map((item) => item.id),
},
);
}
}

return currentHead.items.length === 0;
}

protected override _reset() {
super._reset();
this._listHeadAndLockPromise = null;
Expand Down

0 comments on commit f997cff

Please sign in to comment.