-
Notifications
You must be signed in to change notification settings - Fork 669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
misc(RequestQueueV2): adding notes for request queue v2 implementation #2700
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -567,6 +567,30 @@ export abstract class RequestProvider implements IStorage { | |
* the function might occasionally return a false negative, | ||
* but it will never return a false positive. | ||
*/ | ||
/** | ||
* TODO Kuba: | ||
* I would move this to request_queue_v2.ts file as it is related to the new API. | ||
* Let's clean it up. We would probably need some new API endpoints to make this work. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can imagine consuming the queue would be easier if we had the following endpoints:
Living without 1. is pretty easy, but if we had it, it would mean less bookkeeping on Crawlee's side - we could just spam it to make sure we have stuff to work on. 2, on the other hand, would be a tremendous help. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ad. 1. This is little bit new concept, I think if we introduce this one the listAndLockHead will not make sense. Maybe we can add some parameter instead. But can you iterate over how this will help? ad. 2. This make sense to offload to API, I just do not want to offload the whole logic, as the scraper need to decide if want to finish or not. Plus it has better idea if need to enqueue new URLs or not. Maybe just isHeadEmpty will be ok to have as API> There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I would love to make it trivial to use the API from Crawlee. If I could just call this endpoint repeatedly and it would a) return my current locked requests, b) lock more requests if possible and if I have less than On top of that, if adding requests to the queue also returned the new "queue head", we could get rid of the whole special treatment for forefronted requests.
Which part of the logic would you like to keep in the crawler? Just because |
||
* But for now, we can use the listHead and getRequest as hacky solution. | ||
* Because the listHead list all requests that are not handled, event the locked ones. And we can use getRequest to check if the request is locked. | ||
* | ||
* There are basically two paths here: | ||
* 1. There are no other clients touching the queue(hadMultipleClient === false), so we can trust the to queueHeadIds cache. | ||
* - If the queueHeadIds is not empty -> happy path we have some requests to process. -> false | ||
* - If the queueHeadIds is empty | ||
* -> we need to ensure that the queue is really empty(ensureHeadIsNonEmpty). | ||
* -> Check again if the queueHeadIds is not empty -> false | ||
* | ||
* 2. There are other clients touching the queue(hadMultipleClient === true), so we need to count that other client can change the head. | ||
* - If the queueHeadIds is not empty -> happy path we have some requests to process. -> false | ||
* - If the queueHeadIds is empty | ||
* -> we need to ensure that the queue is really empty(ensureHeadIsNonEmpty). | ||
* -> we need to check if other clients still processing the requests (using listHead and getRequest on the first request) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you check the line bellow, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pardon me, I'm stuck at "type of request" - what should I be looking for? If we're just interested in lock status, isn't it already certain that the request is locked by another client if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the listHead returns even locked requests plus pending. This way you can list all pending and lock requests for the queue, and then check if other clients lock these. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But... if I got nothing from |
||
* -> If there are some requests that are not handled(maybe locked), we can return false - some other client is still processing the requests. (let's notify about this in log using info log) | ||
* -> If there are no requests -> we cannot still be sure that the queue is empty, because the other client can still adding requests to the queue. | ||
* -> We need to use some timeout here and compare it with queue modifiedAt(it is in response of listHead), if there is no modification of queue for some time, | ||
* we can be sure that the queue is empty. The timeout should be in the order of seconds. | ||
*/ | ||
async isFinished(): Promise<boolean> { | ||
// TODO: once/if we figure out why sometimes request queues get stuck (if it's even request queues), remove this once and for all :) | ||
if (Date.now() - this.lastActivity.getTime() > this.internalTimeoutMillis) { | ||
|
@@ -586,6 +610,7 @@ export abstract class RequestProvider implements IStorage { | |
|
||
this.queueHeadIds.clear(); | ||
// This cache may be the bane of our existence, but it's still required for v1... | ||
// TODO Kuba: Let's remove this cache, it's not needed anymore, the head and lock is atomic operation. | ||
this.recentlyHandledRequestsCache.clear(); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,6 +109,11 @@ export class RequestQueue extends RequestProvider { | |
* @inheritDoc | ||
*/ | ||
override async fetchNextRequest<T extends Dictionary = Dictionary>(): Promise<Request<T> | null> { | ||
|
||
// TODO Kuba: We are missing there check for the queuePausedForMigration and for aborting. | ||
// If we did not pause it here, we would be fetching and locking requests. | ||
// The raise condition is here that we call _clearPossibleLocks(), but after that we can still fetch and lock requests, so the requests will be locked and never unlocked. | ||
Comment on lines
+113
to
+115
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Soo... we should just return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't go deep in the implementation of each scrapers, but overall once a migrating or aborting event happens, we should stop working with queue and persist. |
||
|
||
checkStorageAccess(); | ||
|
||
this.lastActivity = new Date(); | ||
|
@@ -187,6 +192,7 @@ export class RequestQueue extends RequestProvider { | |
checkStorageAccess(); | ||
|
||
// Stop fetching if we are paused for migration | ||
// TODO Kuba: We should stop fetching in case of aborting as well. | ||
if (this.queuePausedForMigration) { | ||
return; | ||
} | ||
|
@@ -204,21 +210,28 @@ export class RequestQueue extends RequestProvider { | |
} | ||
|
||
private async _listHeadAndLock(): Promise<void> { | ||
// TODO Kuba: This is a little bit confusing, let's rename to something descriptive. | ||
const forefront = await this.hasPendingForefrontRequests(); | ||
|
||
// TODO Kuba: The list and lock is ATOMIC operation, so we can be sure that we are the only ones reading these requests for requestLockSecs nobody else can read them using listAndLockHead. | ||
const headData = await this.client.listAndLockHead({ | ||
limit: Math.min(forefront ? this.assumedForefrontCount : 25, 25), | ||
lockSecs: this.requestLockSecs, | ||
}); | ||
|
||
const headIdBuffer = []; | ||
|
||
// TODO Kuba: This is a bit of hack this should not happen. We need to investigate why. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose you mean unlocking the forefront requests? I guess @barjin could provide some context here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Anyway, this is the PR that added it https://github.com/apify/crawlee/pull/2689/files |
||
for (const { id, uniqueKey } of headData.items) { | ||
// Queue head index might be behind the main table, so ensure we don't recycle requests | ||
if ( | ||
// TODO Kuba: Missing id or uniqueKey means that the request returned from the queue is malformed. This should never happen, needs to investigate why. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this is just defensive programming in a wrong place. Types support your claim that the ID and unique key should always be present. But since there is no runtime validation of the API responses (on the client side), we have no guarantee that the types actually match. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we should log warning or so and do not ignore this inconsistency completely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the API can return objects that do not conform to the docs, maybe that should be checked on the server side? Adding code downstream for "what if the docs lie...?" is not an efficient thing to do. |
||
!id || | ||
!uniqueKey || | ||
|
||
// TODO Kuba: This should not happen. | ||
this.recentlyHandledRequestsCache.get(id) || | ||
|
||
// If we tried to read new forefront requests, but another client appeared in the meantime, we can't be sure we'll only read our requests. | ||
// To retain the correct queue ordering, we rollback this head read. | ||
(forefront && headData.hadMultipleClients) | ||
|
@@ -235,6 +248,8 @@ export class RequestQueue extends RequestProvider { | |
try { | ||
await this.client.deleteRequestLock(id); | ||
} catch { | ||
// TODO Kuba: There is raise condition here, if you lock the request and did not unlock it, it will be locked there and run stuck. | ||
// TODO Kuba: We should at least log this. | ||
Comment on lines
+251
to
+252
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This only happens under very specific circumstances, and even if we don't manage to unlock the request, the lock will time out in ~60 seconds, right? So we shouldn't be risking a deadlock. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 60 seconds in case there is no pageTimeout set in hours which happens in google maps scraper. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, maybe it's a bad idea to lock requests for this long then. The |
||
// Ignore | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree - I will just copy the relevant bits from
RequestProvider
to the Request queue v2 class and remove the inheritance.