-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added console backfill pause
, console backfill stop
archives working state
#1140
Changes from all commits
ae451a0
65eca1f
b4b9d15
7676ccd
7003634
168b424
ec091a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
import json | ||
from pprint import pprint | ||
import sys | ||
import time | ||
from typing import Dict | ||
import click | ||
import console_link.middleware.clusters as clusters_ | ||
|
@@ -13,6 +14,7 @@ | |
import console_link.middleware.tuples as tuples_ | ||
|
||
from console_link.models.cluster import HttpMethod | ||
from console_link.models.backfill_rfs import RfsWorkersInProgress, WorkingIndexDoesntExist | ||
from console_link.models.utils import ExitCode | ||
from console_link.environment import Environment | ||
from console_link.models.metrics_source import Component, MetricStatistic | ||
|
@@ -292,6 +294,16 @@ def start_backfill_cmd(ctx, pipeline_name): | |
click.echo(message) | ||
|
||
|
||
@backfill_group.command(name="pause") | ||
@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name') | ||
@click.pass_obj | ||
def pause_backfill_cmd(ctx, pipeline_name): | ||
exitcode, message = backfill_.pause(ctx.env.backfill, pipeline_name=pipeline_name) | ||
if exitcode != ExitCode.SUCCESS: | ||
raise click.ClickException(message) | ||
click.echo(message) | ||
|
||
|
||
@backfill_group.command(name="stop") | ||
@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name') | ||
@click.pass_obj | ||
|
@@ -301,6 +313,22 @@ def stop_backfill_cmd(ctx, pipeline_name): | |
raise click.ClickException(message) | ||
click.echo(message) | ||
|
||
click.echo("Archiving the working state of the backfill operation...") | ||
exitcode, message = backfill_.archive(ctx.env.backfill) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider renaming exitcode. It seemed like it was supposed to be the integer exit code for the process, but it looks like it's a boolean. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It makes enough sense that I think I'm personally fine w/ it. |
||
|
||
if isinstance(message, WorkingIndexDoesntExist): | ||
click.echo("Working state index doesn't exist, skipping archive operation.") | ||
return | ||
|
||
while isinstance(message, RfsWorkersInProgress): | ||
click.echo("RFS Workers are still running, waiting for them to complete...") | ||
time.sleep(5) | ||
exitcode, message = backfill_.archive(ctx.env.backfill) | ||
|
||
if exitcode != ExitCode.SUCCESS: | ||
raise click.ClickException(message) | ||
click.echo(f"Backfill working state archived to: {message}") | ||
|
||
|
||
@backfill_group.command(name="scale") | ||
@click.argument("units", type=int, required=True) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,9 +44,14 @@ def start(self, *args, **kwargs) -> CommandResult[str]: | |
or failures--their data will begin moving to the target cluster.""" | ||
pass | ||
|
||
@abstractmethod | ||
def pause(self, *args, **kwargs) -> CommandResult[str]: | ||
"""Pause the backfill. This backfill should be resumable afterwards by invoking `start`.""" | ||
pass | ||
|
||
@abstractmethod | ||
def stop(self, *args, **kwargs) -> CommandResult[str]: | ||
"""Stop or pause the backfill. This does not make guarantees about resumeability.""" | ||
"""Stop the backfill. This does not make guarantees about resumeability.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a guarantee if it completes successfully, right? That would be a fair thing to say and give the user a better understanding of what to expect. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The intuition is that you use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why can you not guarantee if start can pickup or not if these two commands ended successfully? |
||
pass | ||
|
||
@abstractmethod | ||
|
@@ -58,5 +63,11 @@ def get_status(self, *args, **kwargs) -> CommandResult[Tuple[BackfillStatus, str | |
def scale(self, units: int, *args, **kwargs) -> CommandResult[str]: | ||
pass | ||
|
||
@abstractmethod | ||
def archive(self, *args, **kwargs) -> CommandResult[str]: | ||
"""Archive the backfill operation. Should return the information required to resume the backfill operations. | ||
Should fail if there are currently running operations.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. currently running backfill operations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This text is not surfaced to the users at all, AFAIK |
||
pass | ||
|
||
def describe(self) -> Dict: | ||
return self.config |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,14 @@ | ||
from datetime import datetime | ||
from typing import Dict, Optional | ||
import json | ||
import os | ||
from typing import Dict, Optional | ||
|
||
|
||
import requests | ||
|
||
from console_link.models.backfill_base import Backfill, BackfillStatus | ||
from console_link.models.client_options import ClientOptions | ||
from console_link.models.cluster import Cluster | ||
from console_link.models.cluster import Cluster, HttpMethod | ||
from console_link.models.schema_tools import contains_one_of | ||
from console_link.models.command_result import CommandResult | ||
from console_link.models.ecs_service import ECSService | ||
|
@@ -17,6 +19,8 @@ | |
|
||
logger = logging.getLogger(__name__) | ||
|
||
WORKING_STATE_INDEX = ".migrations_working_state" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. eventually, this should be configurable, but I don't think that needs to be today. Let's do that at the same time we make it configurable across the board. |
||
|
||
DOCKER_RFS_SCHEMA = { | ||
"type": "dict", | ||
"nullable": True, | ||
|
@@ -80,11 +84,27 @@ def __init__(self, config: Dict, target_cluster: Cluster) -> None: | |
self.target_cluster = target_cluster | ||
self.docker_config = self.config["reindex_from_snapshot"]["docker"] | ||
|
||
def pause(self, pipeline_name=None) -> CommandResult: | ||
raise NotImplementedError() | ||
|
||
def get_status(self, *args, **kwargs) -> CommandResult: | ||
return CommandResult(True, (BackfillStatus.RUNNING, "This is my running state message")) | ||
|
||
def scale(self, units: int, *args, **kwargs) -> CommandResult: | ||
raise NotImplementedError() | ||
|
||
def archive(self, *args, **kwargs) -> CommandResult: | ||
raise NotImplementedError() | ||
|
||
|
||
class RfsWorkersInProgress(Exception): | ||
def __init__(self): | ||
super().__init__("RFS Workers are still in progress") | ||
|
||
|
||
class WorkingIndexDoesntExist(Exception): | ||
def __init__(self, index_name: str): | ||
super().__init__(f"The working state index '{index_name}' does not exist") | ||
|
||
|
||
class ECSRFSBackfill(RFSBackfill): | ||
|
@@ -103,6 +123,10 @@ def __init__(self, config: Dict, target_cluster: Cluster, client_options: Option | |
def start(self, *args, **kwargs) -> CommandResult: | ||
logger.info(f"Starting RFS backfill by setting desired count to {self.default_scale} instances") | ||
return self.ecs_client.set_desired_count(self.default_scale) | ||
|
||
def pause(self, *args, **kwargs) -> CommandResult: | ||
logger.info("Pausing RFS backfill by setting desired count to 0 instances") | ||
return self.ecs_client.set_desired_count(0) | ||
|
||
def stop(self, *args, **kwargs) -> CommandResult: | ||
logger.info("Stopping RFS backfill by setting desired count to 0 instances") | ||
|
@@ -111,6 +135,31 @@ def stop(self, *args, **kwargs) -> CommandResult: | |
def scale(self, units: int, *args, **kwargs) -> CommandResult: | ||
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances") | ||
return self.ecs_client.set_desired_count(units) | ||
|
||
def archive(self, *args, archive_dir_path: str = None, archive_file_name: str = None, **kwargs) -> CommandResult: | ||
logger.info("Confirming there are no currently in-progress workers") | ||
status = self.ecs_client.get_instance_statuses() | ||
if status.running > 0 or status.pending > 0 or status.desired > 0: | ||
return CommandResult(False, RfsWorkersInProgress()) | ||
|
||
try: | ||
backup_path = get_working_state_index_backup_path(archive_dir_path, archive_file_name) | ||
logger.info(f"Backing up working state index to {backup_path}") | ||
backup_working_state_index(self.target_cluster, WORKING_STATE_INDEX, backup_path) | ||
logger.info("Working state index backed up successful") | ||
|
||
logger.info("Cleaning up working state index on target cluster") | ||
self.target_cluster.call_api( | ||
f"/{WORKING_STATE_INDEX}", | ||
method=HttpMethod.DELETE, | ||
params={"ignore_unavailable": "true"} | ||
) | ||
logger.info("Working state index cleaned up successful") | ||
return CommandResult(True, backup_path) | ||
except requests.HTTPError as e: | ||
if e.response.status_code == 404: | ||
return CommandResult(False, WorkingIndexDoesntExist(WORKING_STATE_INDEX)) | ||
return CommandResult(False, e) | ||
|
||
def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult: | ||
logger.info(f"Getting status of RFS backfill, with {deep_check=}") | ||
|
@@ -186,6 +235,45 @@ def _get_detailed_status(self) -> Optional[str]: | |
return "\n".join([f"Shards {key}: {value}" for key, value in values.items() if value is not None]) | ||
|
||
|
||
def get_working_state_index_backup_path(archive_dir_path: str = None, archive_file_name: str = None) -> str: | ||
shared_logs_dir = os.getenv("SHARED_LOGS_DIR_PATH", None) | ||
if archive_dir_path: | ||
backup_dir = archive_dir_path | ||
elif shared_logs_dir is None: | ||
backup_dir = "./backfill_working_state" | ||
else: | ||
backup_dir = os.path.join(shared_logs_dir, "backfill_working_state") | ||
|
||
if archive_file_name: | ||
file_name = archive_file_name | ||
else: | ||
file_name = f"working_state_backup_{datetime.now().strftime('%Y%m%d%H%M%S')}.json" | ||
return os.path.join(backup_dir, file_name) | ||
|
||
|
||
def backup_working_state_index(cluster: Cluster, index_name: str, backup_path: str): | ||
# Ensure the backup directory exists | ||
backup_dir = os.path.dirname(backup_path) | ||
os.makedirs(backup_dir, exist_ok=True) | ||
|
||
# Backup the docs in the working state index as a JSON array containing batches of documents | ||
with open(backup_path, 'w') as outfile: | ||
outfile.write("[\n") # Start the JSON array | ||
first_batch = True | ||
|
||
for batch in cluster.fetch_all_documents(index_name=index_name): | ||
if not first_batch: | ||
outfile.write(",\n") | ||
else: | ||
first_batch = False | ||
|
||
# Dump the batch of documents as an entry in the array | ||
batch_json = json.dumps(batch, indent=4) | ||
outfile.write(batch_json) | ||
|
||
outfile.write("\n]") # Close the JSON array | ||
|
||
|
||
def parse_query_response(query: dict, cluster: Cluster, label: str) -> Optional[int]: | ||
try: | ||
response = cluster.call_api("/.migrations_working_state/_search", data=json.dumps(query), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipeline names could be confusing. If I knew nothing about OSI, I might think that I have named RFS pipelines that I could do something with. Other than removing OSI, I'm not sure it's worth making any other changes. For that, I'd recommend opening a jira and we'll discuss in the coming weeks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://opensearch.atlassian.net/browse/MIGRATIONS-2222