Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create RFS successor work items #1104

Merged

Conversation

mikaylathompson
Copy link
Collaborator

@mikaylathompson mikaylathompson commented Oct 25, 2024

Description

One component of sub-shard RFS is creating a mechanism to robustly create new work items from a given parent.
This PR implements a function in the work coordinator to take a parent work item id, and a list of one or more successor item ids.

createSuccessorItemsAndMarkComplete is called with the current work item id, and a list of successor item ids. It does the following:

  1. format the list of successor item ids into a comma separated list and update the current work item's successor_items list, subject to the following criteria:
    a. client is using the correct script version
    b. the worker writing still holds the lease
    c. the work item's current successor_items list is either not set or the same as the one being pushed.
  2. Send a bulk request to create each of the successor items as a new work item. This uses a create call that will only create the new items if their ids don't exist (e.g. it won't overwrite an item with the same id).
  3. If the following step succeeds with only 201 (created) or 409 (already exists) error codes, mark the current item as completed.

The checks built in make this function idempotent. If it fails at any point, it can be run again without creating any inconsistent or conflicting state. Indeed, one of the expected behaviors is that if another work picks up a lease where 1 and any portion of 2 were completed, it will immediately re-run the function with the same parameters and this will attempt again to create the successors and complete the process.

If the first step (updating the item with the successor_items list) fails and the worker exits, the mechanism fails and the progress will be lost (TODO: I should have much more robust retries around this step).

It fits into the broader workflow as follows:

  1. A work item is claimed, as currently done.
  2. If it already has successor_items, jump to step 4.
  3. Write documents until the lease has nearly expired or a SIGTERM signal is received.
  4. Call createSuccessorItemsAndMarkComplete with a single item list containing a work item that defines the current shard starting where this worker is finishing.
    a. This creates one new work item, and marks the current one as completed.

In the future, this workflow can be modified to split the remaining work into multiple shards instead of one, or to split before starting a given shard, instead of after (e.g. a set of workers first runs through and splits every shard into sub-shards, and then new workers actually tackle writing the documents).

Issues Resolved

https://opensearch.atlassian.net/browse/MIGRATIONS-2127

Testing

Tests are added that test the basic behavior, the behavior in a contentious situation (40 simultaneous workers), and the error-recovery behavior (e.g. that the function can be run idempotently). There are also some tests of error-checking behavior (e.g. attempting to supply a new successor_items list.

I'd like to add a toxi-proxy test, a test (and check) to prevent adding one's self as a successor work item, and retries around setting the initial successor_items list.

Check List

  • New functionality includes testing
    • All tests pass, including unit test, integration test and doctest
  • New functionality has been documented
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
Signed-off-by: Mikayla Thompson <[email protected]>
…uccessor items, it should attempt to redrive the successor item process)

Signed-off-by: Mikayla Thompson <[email protected]>
Copy link

codecov bot commented Oct 25, 2024

Codecov Report

Attention: Patch coverage is 59.25926% with 33 lines in your changes missing coverage. Please review.

Project coverage is 80.79%. Comparing base (43d8b0a) to head (9320855).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...ad/workcoordination/OpenSearchWorkCoordinator.java 58.75% 29 Missing and 4 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1104      +/-   ##
============================================
+ Coverage     80.75%   80.79%   +0.04%     
- Complexity     2925     2928       +3     
============================================
  Files           399      399              
  Lines         14845    14845              
  Branches       1007     1007              
============================================
+ Hits          11988    11994       +6     
  Misses         2248     2248              
+ Partials        609      603       -6     
Flag Coverage Δ
gradle-test 78.82% <59.25%> (+0.04%) ⬆️
python-test 89.93% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -76,6 +77,13 @@ interface ICompleteWorkItemContext extends IRetryableActivityContext {
IRefreshContext getRefreshContext();
}

interface ICreateSuccessorWorkItemsContext extends IRetryableActivityContext {
String ACTIVITY_NAME = ActivityNames.CREATE_SUCCESSOR_WORK_ITEMS;
IRefreshContext getRefreshContext();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh - it took me a few minutes to remember what we're refreshing... and it's the server.
nit: Maybe rename this whole term from refresh to something like refreshIndexOnCluster (that could be a separate PR, but it could be a 10 minute refactoring change now)

public ICompleteWorkItemContext getCompleteWorkItemContext() { return new CompleteWorkItemContext(this.rootInstrumentationScope, this); }

@Override
public ICreateUnassignedWorkItemContext getCreateUnassignedWorkItemContext() { return new CreateUnassignedWorkItemContext(this.rootInstrumentationScope, this); }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style thing, can you put the definition on a separate line if its > 120 chars?

+ // already done
" if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && "
+ " if (params.expirationWindow > 0 && ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {"
+ // already done
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this line true - what was already done and for what, if we make it into the body of the if on line 243? That's what it sounds like, but I know that's not the case at all.

I think that this is accurate for the checks - maybe /**/ comments would be better; or stating here that it was NOT yet done, or no comment at all. Sorry, I know that this was my comment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot figure out what this comment meant. I agree that it sure looks to me like this is a not-yet-done work item if we're at this point. I wrote out a somewhat more complete version of that.

@@ -312,6 +322,13 @@ public boolean createUnassignedWorkItem(
}
}

private ArrayList<String> getSuccessorItemsIfPresent(JsonNode responseDoc) {
if (responseDoc.has(SUCCESSOR_ITEMS_FIELD_NAME)) {
return new ArrayList<>(Arrays.asList(responseDoc.get(SUCCESSOR_ITEMS_FIELD_NAME).asText().split(",")));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make "," a constant string w/ a name like SUCCESSOR_ITEMS_DELIMITER

var body = updateSuccessorWorkItemsTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc")
.replace(WORKER_ID_TEMPLATE, workerId)
.replace(CLIENT_TIMESTAMP_TEMPLATE, Long.toString(clock.instant().toEpochMilli() / 1000))
.replace(SUCCESSOR_WORK_ITEM_IDS_TEMPLATE, String.join(",", successorWorkItemIds));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the comment about "," as a constant.
More importantly, when the list of items comes in, throw an IllegalArgumentException if any of them have a comma already.

// Now, we should be able to claim the other successor items but the _next_ call should fail because there are no available items
for (int i = 0; i < (N_SUCCESSOR_ITEMS - 1); i++) {
workItemId = getWorkItemAndVerify(testContext, "claimItem_" + i, new ConcurrentHashMap<>(), originalWorkItemExpiration, false, true);
Assertions.assertTrue(successorItems.contains(workItemId));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need a slightly stronger set of tests here to confirm that you're not getting duplicates (reusing a ConcurrentHashMap above and taking the count will do that).

Assertions.assertTrue(successorItems.contains(workItemId));
}

Assertions.assertThrows(IllegalStateException.class, () -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you extend IllegalStateException and throw subtypes. I don't know why the code within will throw.
You can also store the value from assertThrow & do some further checks (instanceOf, getMessage().contains(...)).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to discuss - why does this throw an illegal state exception? That's a scary sounding exception. The calls don't look that scary.

Assertions.assertEquals(200, response.getStatusCode());

// Now attempt to go through with the correct successor item list
Assertions.assertThrows(IllegalArgumentException.class,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know why this is throwing - but it would be nice to see some extra check that we know why...
I think that at its root this is from the server/painless script - so maybe just scan the getMessage() contents to confirm that it's helpful.

getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), Duration.ofSeconds(5), false, false);

// Now attempt to go through with the correct successor item list
Assertions.assertThrows(IllegalArgumentException.class,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is completely the right behavior - but I can't recall from looking at the code... why will this fail?

Comment on lines 288 to 289
// We need to make sure that if we pick it up and call `createSuccessorWorkItemsAndMarkComplete`, it will complete successfully and create the two missing items.
var originalWorkItemId = getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), originalWorkItemExpiration, false, false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in another comment, I think that these lines should be transparently handled. This test still has a lot of value, you'll just shed a few lines and make sure that you eventually fill up the "seenWorkIds" map.

} catch (Exception e) {
attempt++;
if (attempt > maxRetries) {
log.atError().setCause(e)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need this or the others in this method? Will you log these at error up in the call stack w/ the same info? If so, these aren't needed. I see this code as library code, unaware of how it's being used, so for something to be called an error, it would need to make a number of assumptions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm keeping the one with the retry times because that is opaque to the caller. But otherwise agreed.

@@ -138,6 +207,10 @@ public void setup(Supplier<IWorkCoordinationContexts.IInitializeCoordinatorState
+ " \"status\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " },\n"
+ " \"" + SUCCESSOR_ITEMS_FIELD_NAME + "\": {\n"
+ " \"type\": \"keyword\",\n"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious - did you need to define this, or is this just for completeness?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding successor items would be okay, due to dynamic type mapping, but that's a setting that could be changed, and it's helpful for pulling this value out that I know that the field will always be there.

" if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && "
+ " if (params.expirationWindow > 0 && ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {"
+ // work item is not completed, but may be assigned to this or a different worker (or unassigned)
" if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && "
+ " ctx._source." + EXPIRATION_FIELD_NAME + " > serverTimeSeconds) {"
+ // count as an update to force the caller to lookup the expiration time, but no need to modify it
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, spacing could align w/ the next line

return new WorkItemAndDuration(
workItemId,
Instant.ofEpochMilli(1000 * responseDoc.path(EXPIRATION_FIELD_NAME).longValue())
final var workItem = new WorkItemWithPotentialSuccessors(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like there's any value to making this object. Using component variables will make the code easier to follow.

if (workItem.successorWorkItemIds != null) {
// continue the previous work of creating the successors and marking this item as completed
createSuccessorWorkItemsAndMarkComplete(workItem.workItemId, workItem.successorWorkItemIds, ctx::getCreateSuccessorWorkItemsContext);
return new AlreadyCompleted();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right thing to do for a public method?

A caller was trying to get the next bit of work. We grabbed something, did some extremely small, internal work, that the caller is unaware of. Signaling to the caller that work was "already done" is a bit weird.

I had this thought before, but didn't push so that we could keep symmetries, but the confusion factor is high... Should we support adding successorItems from within this call? If a user creates a workitem, there isn't a successor list passed in. The update part in the original contract here is (I hope) an atavistic artifact whose value was invalidated by the assign functions and changes to how RFS was getting work (& is even further invalidated by this change itself - since we can now chain work atomically).

Comment on lines 784 to 786
System.out.println("resultTree = " + resultTree);
System.out.println(resultTree.get("error"));
System.out.println(resultTree.get("error").get("type"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove these

if (workItem.successorWorkItemIds != null) {
// continue the previous work of creating the successors and marking this item as completed
createSuccessorWorkItemsAndMarkComplete(workItem.workItemId, workItem.successorWorkItemIds, ctx::getCreateSuccessorWorkItemsContext);
return new AlreadyCompleted();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the downside to continuing? AlreadyCompleted is a weird response for acquireNextWorkItem(). Why would it have been acquired if it was already done?

Assertions.assertTrue(successorItems.contains(workItemId));
}

Assertions.assertThrows(IllegalStateException.class, () -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to discuss - why does this throw an illegal state exception? That's a scary sounding exception. The calls don't look that scary.

Signed-off-by: Mikayla Thompson <[email protected]>
@AndreKurait AndreKurait reopened this Nov 13, 2024
@mikaylathompson mikaylathompson merged commit b62778e into opensearch-project:main Nov 13, 2024
14 of 16 checks passed
@mikaylathompson mikaylathompson deleted the successor-work-items branch November 13, 2024 18:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants