-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create RFS successor work items #1104
Create RFS successor work items #1104
Conversation
Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
…uccessor items, it should attempt to redrive the successor item process) Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1104 +/- ##
============================================
+ Coverage 80.75% 80.79% +0.04%
- Complexity 2925 2928 +3
============================================
Files 399 399
Lines 14845 14845
Branches 1007 1007
============================================
+ Hits 11988 11994 +6
Misses 2248 2248
+ Partials 609 603 -6
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
@@ -76,6 +77,13 @@ interface ICompleteWorkItemContext extends IRetryableActivityContext { | |||
IRefreshContext getRefreshContext(); | |||
} | |||
|
|||
interface ICreateSuccessorWorkItemsContext extends IRetryableActivityContext { | |||
String ACTIVITY_NAME = ActivityNames.CREATE_SUCCESSOR_WORK_ITEMS; | |||
IRefreshContext getRefreshContext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sigh - it took me a few minutes to remember what we're refreshing... and it's the server.
nit: Maybe rename this whole term from refresh to something like refreshIndexOnCluster (that could be a separate PR, but it could be a 10 minute refactoring change now)
public ICompleteWorkItemContext getCompleteWorkItemContext() { return new CompleteWorkItemContext(this.rootInstrumentationScope, this); } | ||
|
||
@Override | ||
public ICreateUnassignedWorkItemContext getCreateUnassignedWorkItemContext() { return new CreateUnassignedWorkItemContext(this.rootInstrumentationScope, this); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: style thing, can you put the definition on a separate line if its > 120 chars?
+ // already done | ||
" if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && " | ||
+ " if (params.expirationWindow > 0 && ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {" | ||
+ // already done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this line true - what was already done and for what, if we make it into the body of the if on line 243? That's what it sounds like, but I know that's not the case at all.
I think that this is accurate for the checks - maybe /**/ comments would be better; or stating here that it was NOT yet done, or no comment at all. Sorry, I know that this was my comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot figure out what this comment meant. I agree that it sure looks to me like this is a not-yet-done work item if we're at this point. I wrote out a somewhat more complete version of that.
@@ -312,6 +322,13 @@ public boolean createUnassignedWorkItem( | |||
} | |||
} | |||
|
|||
private ArrayList<String> getSuccessorItemsIfPresent(JsonNode responseDoc) { | |||
if (responseDoc.has(SUCCESSOR_ITEMS_FIELD_NAME)) { | |||
return new ArrayList<>(Arrays.asList(responseDoc.get(SUCCESSOR_ITEMS_FIELD_NAME).asText().split(","))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make "," a constant string w/ a name like SUCCESSOR_ITEMS_DELIMITER
var body = updateSuccessorWorkItemsTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc") | ||
.replace(WORKER_ID_TEMPLATE, workerId) | ||
.replace(CLIENT_TIMESTAMP_TEMPLATE, Long.toString(clock.instant().toEpochMilli() / 1000)) | ||
.replace(SUCCESSOR_WORK_ITEM_IDS_TEMPLATE, String.join(",", successorWorkItemIds)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see the comment about "," as a constant.
More importantly, when the list of items comes in, throw an IllegalArgumentException if any of them have a comma already.
// Now, we should be able to claim the other successor items but the _next_ call should fail because there are no available items | ||
for (int i = 0; i < (N_SUCCESSOR_ITEMS - 1); i++) { | ||
workItemId = getWorkItemAndVerify(testContext, "claimItem_" + i, new ConcurrentHashMap<>(), originalWorkItemExpiration, false, true); | ||
Assertions.assertTrue(successorItems.contains(workItemId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need a slightly stronger set of tests here to confirm that you're not getting duplicates (reusing a ConcurrentHashMap above and taking the count will do that).
Assertions.assertTrue(successorItems.contains(workItemId)); | ||
} | ||
|
||
Assertions.assertThrows(IllegalStateException.class, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you extend IllegalStateException and throw subtypes. I don't know why the code within will throw.
You can also store the value from assertThrow & do some further checks (instanceOf, getMessage().contains(...)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to discuss - why does this throw an illegal state exception? That's a scary sounding exception. The calls don't look that scary.
Assertions.assertEquals(200, response.getStatusCode()); | ||
|
||
// Now attempt to go through with the correct successor item list | ||
Assertions.assertThrows(IllegalArgumentException.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know why this is throwing - but it would be nice to see some extra check that we know why...
I think that at its root this is from the server/painless script - so maybe just scan the getMessage() contents to confirm that it's helpful.
getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), Duration.ofSeconds(5), false, false); | ||
|
||
// Now attempt to go through with the correct successor item list | ||
Assertions.assertThrows(IllegalArgumentException.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is completely the right behavior - but I can't recall from looking at the code... why will this fail?
// We need to make sure that if we pick it up and call `createSuccessorWorkItemsAndMarkComplete`, it will complete successfully and create the two missing items. | ||
var originalWorkItemId = getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), originalWorkItemExpiration, false, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in another comment, I think that these lines should be transparently handled. This test still has a lot of value, you'll just shed a few lines and make sure that you eventually fill up the "seenWorkIds" map.
Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
97cbcb6
to
4c0acb6
Compare
} catch (Exception e) { | ||
attempt++; | ||
if (attempt > maxRetries) { | ||
log.atError().setCause(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need this or the others in this method? Will you log these at error up in the call stack w/ the same info? If so, these aren't needed. I see this code as library code, unaware of how it's being used, so for something to be called an error, it would need to make a number of assumptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm keeping the one with the retry times because that is opaque to the caller. But otherwise agreed.
@@ -138,6 +207,10 @@ public void setup(Supplier<IWorkCoordinationContexts.IInitializeCoordinatorState | |||
+ " \"status\": {\n" | |||
+ " \"type\": \"keyword\",\n" | |||
+ " \"norms\": false\n" | |||
+ " },\n" | |||
+ " \"" + SUCCESSOR_ITEMS_FIELD_NAME + "\": {\n" | |||
+ " \"type\": \"keyword\",\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious - did you need to define this, or is this just for completeness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding successor items would be okay, due to dynamic type mapping, but that's a setting that could be changed, and it's helpful for pulling this value out that I know that the field will always be there.
" if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && " | ||
+ " if (params.expirationWindow > 0 && ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {" | ||
+ // work item is not completed, but may be assigned to this or a different worker (or unassigned) | ||
" if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && " | ||
+ " ctx._source." + EXPIRATION_FIELD_NAME + " > serverTimeSeconds) {" | ||
+ // count as an update to force the caller to lookup the expiration time, but no need to modify it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, spacing could align w/ the next line
return new WorkItemAndDuration( | ||
workItemId, | ||
Instant.ofEpochMilli(1000 * responseDoc.path(EXPIRATION_FIELD_NAME).longValue()) | ||
final var workItem = new WorkItemWithPotentialSuccessors( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like there's any value to making this object. Using component variables will make the code easier to follow.
if (workItem.successorWorkItemIds != null) { | ||
// continue the previous work of creating the successors and marking this item as completed | ||
createSuccessorWorkItemsAndMarkComplete(workItem.workItemId, workItem.successorWorkItemIds, ctx::getCreateSuccessorWorkItemsContext); | ||
return new AlreadyCompleted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the right thing to do for a public method?
A caller was trying to get the next bit of work. We grabbed something, did some extremely small, internal work, that the caller is unaware of. Signaling to the caller that work was "already done" is a bit weird.
I had this thought before, but didn't push so that we could keep symmetries, but the confusion factor is high... Should we support adding successorItems from within this call? If a user creates a workitem, there isn't a successor list passed in. The update part in the original contract here is (I hope) an atavistic artifact whose value was invalidated by the assign functions and changes to how RFS was getting work (& is even further invalidated by this change itself - since we can now chain work atomically).
System.out.println("resultTree = " + resultTree); | ||
System.out.println(resultTree.get("error")); | ||
System.out.println(resultTree.get("error").get("type")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove these
if (workItem.successorWorkItemIds != null) { | ||
// continue the previous work of creating the successors and marking this item as completed | ||
createSuccessorWorkItemsAndMarkComplete(workItem.workItemId, workItem.successorWorkItemIds, ctx::getCreateSuccessorWorkItemsContext); | ||
return new AlreadyCompleted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the downside to continuing? AlreadyCompleted is a weird response for acquireNextWorkItem(). Why would it have been acquired if it was already done?
Assertions.assertTrue(successorItems.contains(workItemId)); | ||
} | ||
|
||
Assertions.assertThrows(IllegalStateException.class, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to discuss - why does this throw an illegal state exception? That's a scary sounding exception. The calls don't look that scary.
Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
b62778e
into
opensearch-project:main
Description
One component of sub-shard RFS is creating a mechanism to robustly create new work items from a given parent.
This PR implements a function in the work coordinator to take a parent work item id, and a list of one or more successor item ids.
createSuccessorItemsAndMarkComplete
is called with the current work item id, and a list of successor item ids. It does the following:successor_items
list, subject to the following criteria:a. client is using the correct script version
b. the worker writing still holds the lease
c. the work item's current
successor_items
list is either not set or the same as the one being pushed.create
call that will only create the new items if their ids don't exist (e.g. it won't overwrite an item with the same id).The checks built in make this function idempotent. If it fails at any point, it can be run again without creating any inconsistent or conflicting state. Indeed, one of the expected behaviors is that if another work picks up a lease where 1 and any portion of 2 were completed, it will immediately re-run the function with the same parameters and this will attempt again to create the successors and complete the process.
If the first step (updating the item with the
successor_items
list) fails and the worker exits, the mechanism fails and the progress will be lost (TODO: I should have much more robust retries around this step).It fits into the broader workflow as follows:
successor_items
, jump to step 4.createSuccessorItemsAndMarkComplete
with a single item list containing a work item that defines the current shard starting where this worker is finishing.a. This creates one new work item, and marks the current one as completed.
In the future, this workflow can be modified to split the remaining work into multiple shards instead of one, or to split before starting a given shard, instead of after (e.g. a set of workers first runs through and splits every shard into sub-shards, and then new workers actually tackle writing the documents).
Issues Resolved
https://opensearch.atlassian.net/browse/MIGRATIONS-2127
Testing
Tests are added that test the basic behavior, the behavior in a contentious situation (40 simultaneous workers), and the error-recovery behavior (e.g. that the function can be run idempotently). There are also some tests of error-checking behavior (e.g. attempting to supply a new
successor_items
list.I'd like to add a toxi-proxy test, a test (and check) to prevent adding one's self as a successor work item, and retries around setting the initial
successor_items
list.Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.