Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create RFS successor work items #1104

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
31ae47d
Fix bug in numWorkItemsArePending
mikaylathompson Oct 23, 2024
e48005b
Just switch to _count
mikaylathompson Oct 23, 2024
ab9b653
_count -> _search
mikaylathompson Oct 24, 2024
1a926d1
Rename for clarity (pending->notYetCompleted)
mikaylathompson Oct 24, 2024
f573157
Add extra check around the uncertain case if hits==0
mikaylathompson Oct 24, 2024
89198da
Main logic and tests for successor work items
mikaylathompson Oct 24, 2024
9af9db4
Add another check to never change the successor items
mikaylathompson Oct 24, 2024
10b0ede
Integrate into workflow (anytime a worker picks up a work item with s…
mikaylathompson Oct 25, 2024
2c2c79c
Merge branch 'main' into successor-work-items
mikaylathompson Oct 25, 2024
dfb8183
Merge branch 'main' into successor-work-items
mikaylathompson Nov 5, 2024
ddb9c33
Handle retries and an edge case
mikaylathompson Nov 5, 2024
0f6d6ee
spotless fixes
mikaylathompson Nov 5, 2024
bd44697
Addressing review comments
mikaylathompson Nov 11, 2024
ae2d1dd
Additional review updates
mikaylathompson Nov 12, 2024
4c0acb6
Clean up scopedworkcoordinator modifications
mikaylathompson Nov 12, 2024
ab751db
Next round of review
mikaylathompson Nov 12, 2024
2a29476
Fix tests for new acquireNextWorkItem behavior
mikaylathompson Nov 12, 2024
cdfc72c
Merge branch 'main' into successor-work-items
AndreKurait Nov 13, 2024
0cbe935
Merge branch 'main' into successor-work-items
AndreKurait Nov 13, 2024
9320855
Merge branch 'main' into successor-work-items
AndreKurait Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ActivityNames {
public static final String ACQUIRE_SPECIFIC_WORK = "acquireSpecificWorkItem";
public static final String COMPLETE_WORK = "completeWork";
public static final String ACQUIRE_NEXT_WORK = "acquireNextWorkItem";
public static final String CREATE_SUCCESSOR_WORK_ITEMS = "createSuccessorWorkItems";

private ActivityNames() {}
}
Expand Down Expand Up @@ -54,6 +55,8 @@ interface IBaseAcquireWorkContext extends IRetryableActivityContext {}

interface IAcquireSpecificWorkContext extends IBaseAcquireWorkContext {
String ACTIVITY_NAME = ActivityNames.ACQUIRE_SPECIFIC_WORK;

ICreateSuccessorWorkItemsContext getCreateSuccessorWorkItemsContext();
}

interface IAcquireNextWorkItemContext extends IBaseAcquireWorkContext {
Expand All @@ -68,6 +71,9 @@ interface IAcquireNextWorkItemContext extends IBaseAcquireWorkContext {
void recordRecoverableClockError();

void recordFailure(OpenSearchWorkCoordinator.PotentialClockDriftDetectedException e);

ICreateSuccessorWorkItemsContext getCreateSuccessorWorkItemsContext();

}

interface ICompleteWorkItemContext extends IRetryableActivityContext {
Expand All @@ -76,6 +82,13 @@ interface ICompleteWorkItemContext extends IRetryableActivityContext {
IRefreshContext getRefreshContext();
}

interface ICreateSuccessorWorkItemsContext extends IRetryableActivityContext {
String ACTIVITY_NAME = ActivityNames.CREATE_SUCCESSOR_WORK_ITEMS;
IRefreshContext getRefreshContext();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh - it took me a few minutes to remember what we're refreshing... and it's the server.
nit: Maybe rename this whole term from refresh to something like refreshIndexOnCluster (that could be a separate PR, but it could be a 10 minute refactoring change now)

ICompleteWorkItemContext getCompleteWorkItemContext();
ICreateUnassignedWorkItemContext getCreateUnassignedWorkItemContext();
}

interface IScopedWorkContext<C extends IBaseAcquireWorkContext> extends IScopedInstrumentationAttributes {
C createOpeningContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class RootWorkCoordinationContext extends RootOtelContext {
public final WorkCoordinationContexts.AcquireSpecificWorkContext.MetricInstruments acquireSpecificWorkMetrics;
public final WorkCoordinationContexts.CompleteWorkItemContext.MetricInstruments completeWorkMetrics;
public final WorkCoordinationContexts.AcquireNextWorkItemContext.MetricInstruments acquireNextWorkMetrics;
public final WorkCoordinationContexts.CreateSuccessorWorkItemsContext.MetricInstruments createSuccessorWorkItemsMetrics;

public RootWorkCoordinationContext(OpenTelemetry sdk, IContextTracker contextTracker) {
this(sdk, contextTracker, null);
Expand All @@ -38,6 +39,7 @@ public RootWorkCoordinationContext(OpenTelemetry sdk,
acquireSpecificWorkMetrics = WorkCoordinationContexts.AcquireSpecificWorkContext.makeMetrics(meter);
completeWorkMetrics = WorkCoordinationContexts.CompleteWorkItemContext.makeMetrics(meter);
acquireNextWorkMetrics = WorkCoordinationContexts.AcquireNextWorkItemContext.makeMetrics(meter);
createSuccessorWorkItemsMetrics = WorkCoordinationContexts.CreateSuccessorWorkItemsContext.makeMetrics(meter);
}

public IWorkCoordinationContexts.IInitializeCoordinatorStateContext createCoordinationInitializationStateContext() {
Expand Down Expand Up @@ -87,4 +89,13 @@ public IWorkCoordinationContexts.ICompleteWorkItemContext createCompleteWorkCont
) {
return new WorkCoordinationContexts.CompleteWorkItemContext(this, enclosingScope);
}
}

public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext() {
return createSuccessorWorkItemsContext(null);
}

public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext(
IScopedInstrumentationAttributes enclosingScope
) {
return new WorkCoordinationContexts.CreateSuccessorWorkItemsContext(this, enclosingScope);
}}
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ private MetricInstruments(Meter meter, String activityName) {
public MetricInstruments getRetryMetrics() {
return getRootInstrumentationScope().acquireSpecificWorkMetrics;
}

public ICreateSuccessorWorkItemsContext getCreateSuccessorWorkItemsContext() {
return new CreateSuccessorWorkItemsContext(this.rootInstrumentationScope, this);
}
}

@Getter
Expand Down Expand Up @@ -276,6 +280,10 @@ public IRefreshContext getRefreshContext() {
return new Refresh(this.rootInstrumentationScope, this);
}

public ICreateSuccessorWorkItemsContext getCreateSuccessorWorkItemsContext() {
return new CreateSuccessorWorkItemsContext(this.rootInstrumentationScope, this);
}

public static class MetricInstruments extends RetryMetricInstruments {
public final LongCounter assignedCounter;
public final LongCounter nothingAvailableCounter;
Expand Down Expand Up @@ -363,4 +371,51 @@ public MetricInstruments getRetryMetrics() {
return getRootInstrumentationScope().completeWorkMetrics;
}
}

@Getter
class CreateSuccessorWorkItemsContext extends BaseSpanContext<RootWorkCoordinationContext>
implements
ICreateSuccessorWorkItemsContext,
RetryableActivityContextMetricMixin<CreateSuccessorWorkItemsContext.MetricInstruments> {
final IScopedInstrumentationAttributes enclosingScope;

CreateSuccessorWorkItemsContext(
RootWorkCoordinationContext rootScope,
IScopedInstrumentationAttributes enclosingScope
) {
super(rootScope);
this.enclosingScope = enclosingScope;
initializeSpan(rootScope);
}

@Override
public String getActivityName() {
return ACTIVITY_NAME;
}

@Override
public IRefreshContext getRefreshContext() {
return new Refresh(this.rootInstrumentationScope, this);
}

@Override
public ICompleteWorkItemContext getCompleteWorkItemContext() {
return new CompleteWorkItemContext(this.rootInstrumentationScope, this);
}

public static class MetricInstruments extends RetryMetricInstruments {
private MetricInstruments(Meter meter, String activityName) {
super(meter, autoLabels(activityName));
}
}

public static @NonNull MetricInstruments makeMetrics(Meter meter) {
return new MetricInstruments(meter, ACTIVITY_NAME);
}

@Override
public MetricInstruments getRetryMetrics() {
return getRootInstrumentationScope().createSuccessorWorkItemsMetrics;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.function.Supplier;

import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts;
Expand Down Expand Up @@ -67,8 +68,8 @@ WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem(
String workItemId,
Duration leaseDuration,
Supplier<IWorkCoordinationContexts.IAcquireSpecificWorkContext> contextSupplier
) throws IOException;

) throws IOException, InterruptedException;
/**
* Scan the created work items that have not yet had leases acquired and have not yet finished.
* One of those work items will be returned along with a lease for how long this process may continue
Expand Down Expand Up @@ -98,6 +99,20 @@ void completeWorkItem(
Supplier<IWorkCoordinationContexts.ICompleteWorkItemContext> contextSupplier
) throws IOException, InterruptedException;

/**
* Add the list of successor items to the work item, create new work items for each of the successors, and mark the
* original work item as completed.
* @param workItemId the work item that is being completed
* @param successorWorkItemIds the list of successor work items that will be created
* @throws IOException
* @throws InterruptedException
*/
void createSuccessorWorkItemsAndMarkComplete(
String workItemId,
ArrayList<String> successorWorkItemIds,
Supplier<IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext> contextSupplier
) throws IOException, InterruptedException;

/**
* @return the number of items that are not yet complete. This will include items with and without claimed leases.
* @throws IOException
Expand Down
Loading
Loading