Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added console backfill pause, console backfill stop archives working state #1140

Merged
merged 7 commits into from
Nov 15, 2024

Conversation

chelma
Copy link
Member

@chelma chelma commented Nov 14, 2024

Description

  • Updated console backfill stop to scale the RFS workers down to 0, backup the working state to disk, and delete the working state index. To support this, we created a new operation, backfill archive, at the middleware layer. This new operation is not surfaced in the CLI.
  • Added console backfill pause as an alias for console backfill scale 0 so that users can "stop" a migration without stop'ing it

Issues Resolved

Testing

  • Added unit tests
  • Ran the commands against a real source/target:
(.venv) bash-5.2# console backfill start
Backfill started successfully.
Service migration-aws-integ-reindex-from-snapshot set to 5 desired count. Currently 0 running and 0 pending.

(.venv) bash-5.2# console backfill status
BackfillStatus.STARTING
Running=0
Pending=5
Desired=5

(.venv) bash-5.2# console backfill stop
Backfill stopped successfully.
Service migration-aws-integ-reindex-from-snapshot set to 0 desired count. Currently 0 running and 5 pending.
Archiving the working state of the backfill operation...
RFS Workers are still running, waiting for them to complete...
Backfill working state archived to: /shared-logs-output/migration-console-default/backfill_working_state/working_state_backup_20241115174822.json

(.venv) bash-5.2# cat /shared-logs-output/migration-console-default/backfill_working_state/working_state_backup_20241115174822.json
[
{
    "shard_setup": {
        "numAttempts": 1,
        "scriptVersion": "poc",
        "leaseHolderId": "ip-12-0-2-40.us-east-2.compute.internal",
        "creatorId": "ip-12-0-2-40.us-east-2.compute.internal",
        "expiration": 1731693108,
        "completedAt": 1731692808
    },
    "bwc_index_1__0": {
        "numAttempts": 1,
        "scriptVersion": "poc",
        "leaseHolderId": "ip-12-0-2-40.us-east-2.compute.internal",
        "creatorId": "ip-12-0-2-40.us-east-2.compute.internal",
        "expiration": 1731693408,
        "completedAt": 1731692809
    }
}
]

(.venv) bash-5.2#console clusters cat-indices

WARNING: Cluster information may be stale. Use --refresh to update.

SOURCE CLUSTER
health status index       uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   bwc_index_1 TUs7jjaWSvya-yWztVZQPg   1   0          1            0      3.8kb          3.8kb

TARGET CLUSTER
health status index                     uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .opensearch-observability 8HOComzdSlSWCwqWIOGRbQ   1   1          0            0       416b           208b
green  open   .plugins-ml-config        9tld-PCJToSUsMiyDhlyhQ   5   1          1            0      9.5kb          4.7kb
green  open   bwc_index_1               rPkA0eYNRma70xpJMTStdw   5   1          1            0     11.4kb          5.6kb
green  open   .kibana_1                 JICtu5p6ReiNUg8nU152tQ   1   1          1            0     10.4kb          5.2kb

Check List

  • New functionality includes testing
    • All tests pass, including unit test, integration test and doctest
  • New functionality has been documented
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link

codecov bot commented Nov 14, 2024

Codecov Report

Attention: Patch coverage is 89.09091% with 6 lines in your changes missing coverage. Please review.

Project coverage is 80.72%. Comparing base (66ae561) to head (65eca1f).
Report is 15 commits behind head on main.

Files with missing lines Patch % Lines
...rationConsole/lib/console_link/console_link/cli.py 88.88% 2 Missing ⚠️
...b/console_link/console_link/models/backfill_osi.py 33.33% 2 Missing ⚠️
.../console_link/console_link/models/backfill_base.py 66.66% 1 Missing ⚠️
...b/console_link/console_link/models/backfill_rfs.py 95.83% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1140      +/-   ##
============================================
+ Coverage     80.63%   80.72%   +0.08%     
- Complexity     2910     2947      +37     
============================================
  Files           399      399              
  Lines         14829    14965     +136     
  Branches       1007     1017      +10     
============================================
+ Hits          11958    12080     +122     
- Misses         2260     2274      +14     
  Partials        611      611              
Flag Coverage Δ
gradle-test 78.74% <ø> (+0.07%) ⬆️
python-test 89.95% <89.09%> (+0.19%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -111,6 +128,23 @@ def stop(self, *args, **kwargs) -> CommandResult:
def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances")
return self.ecs_client.set_desired_count(units)

def archive(self, *args, archive_dir_path: str = None, **kwargs) -> CommandResult:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this function do if the index doesn't exist?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 145 seems like it would be ok, but line 140 would be a problem.
The User version of this question is what happens if a user runs it twice? If it were idempotent, it would just print out the path again, so maybe the outer message can be tweaked slightly and there can be another check in here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the code to gracefully skip the archive process if the index doesn't exist.

@@ -17,6 +19,8 @@

logger = logging.getLogger(__name__)

WORKING_STATE_INDEX = ".migrations_working_state"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventually, this should be configurable, but I don't think that needs to be today. Let's do that at the same time we make it configurable across the board.

Copy link
Collaborator

@gregschohn gregschohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking this up. I have a couple questions/suggestions for the handling of the archives and some other minor stuff.

@@ -291,6 +293,14 @@ def start_backfill_cmd(ctx, pipeline_name):
raise click.ClickException(message)
click.echo(message)

@backfill_group.command(name="pause")
@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline names could be confusing. If I knew nothing about OSI, I might think that I have named RFS pipelines that I could do something with. Other than removing OSI, I'm not sure it's worth making any other changes. For that, I'd recommend opening a jira and we'll discuss in the coming weeks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -301,6 +311,17 @@ def stop_backfill_cmd(ctx, pipeline_name):
raise click.ClickException(message)
click.echo(message)

click.echo("Archiving the working state of the backfill operation...")
exitcode, message = backfill_.archive(ctx.env.backfill)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming exitcode. It seemed like it was supposed to be the integer exit code for the process, but it looks like it's a boolean.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes enough sense that I think I'm personally fine w/ it.

@abstractmethod
def stop(self, *args, **kwargs) -> CommandResult[str]:
"""Stop or pause the backfill. This does not make guarantees about resumeability."""
"""Stop the backfill. This does not make guarantees about resumeability."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a guarantee if it completes successfully, right? That would be a fair thing to say and give the user a better understanding of what to expect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intuition is that you use pause when you want the ability to resume, and stop if you want to clean things up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can you not guarantee if start can pickup or not if these two commands ended successfully?

@abstractmethod
def archive(self, *args, **kwargs) -> CommandResult[str]:
"""Archive the backfill operation. Should return the information required to resume the backfill operations.
Should fail if there are currently running operations."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently running backfill operations.
The user could think that some independent activity (or who knows ECS tasks?!) could cause failure here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This text is not surfaced to the users at all, AFAIK

@@ -111,6 +128,23 @@ def stop(self, *args, **kwargs) -> CommandResult:
def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances")
return self.ecs_client.set_desired_count(units)

def archive(self, *args, archive_dir_path: str = None, **kwargs) -> CommandResult:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 145 seems like it would be ok, but line 140 would be a problem.
The User version of this question is what happens if a user runs it twice? If it were idempotent, it would just print out the path again, so maybe the outer message can be tweaked slightly and there can be another check in here.

if archive_dir_path:
backup_dir = archive_dir_path
elif shared_logs_dir is None:
backup_dir = "./working_state"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this more specific, like backfill_working_state

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

os.makedirs(backup_dir, exist_ok=True)

# Write the backup
with open(backup_path, "w") as f:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overwrites an existing file. It looks like we always write to the same file. I'd recommend opening w/ "a" and putting a header in that can clearly show the beginning so that the segments can be demarcated.

The other option is to open the file exclusively for create ("x") w/ a timestamp (if there is a collision, the user could see it and just try again a second or ms later - though there really shouldn't be in 99.9999% of real world use cases). If we think that files could be very big, this is probably a much better option than the previous one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made individual, timestamped files

Comment on lines 225 to 226
for hit in hits:
documents[hit['_id']] = hit['_source']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could put this into the beginning of the while loop - that makes the code easier to maintain if you're either putting this stuff to memory or stream

@@ -198,3 +199,63 @@ def execute_benchmark_workload(self, workload: str,
display_command = command.replace(f"basic_auth_password:{password_to_censor}", "basic_auth_password:********")
logger.info(f"Executing command: {display_command}")
subprocess.run(command, shell=True)

def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Dict[str, Any]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the results here are big enough to warrant a scroll (& good catch, I think they are) - does it also make sense to write the contents to a file rather than accumulate them in memory? If a customer has 20K shards and we divide those into 20 work units each and each document is ~200 bytes of data plus pointers for strings, you're looking at 80MB. A visitor pattern would look nice here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion, turned this into a generator.

@chelma chelma merged commit 05cf84b into opensearch-project:main Nov 15, 2024
15 checks passed
@chelma chelma deleted the MIGRATIONS-2189 branch November 15, 2024 19:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants