-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added console backfill pause
, console backfill stop
archives working state
#1140
Changes from 3 commits
ae451a0
65eca1f
b4b9d15
7676ccd
7003634
168b424
ec091a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
import json | ||
from pprint import pprint | ||
import sys | ||
import time | ||
from typing import Dict | ||
import click | ||
import console_link.middleware.clusters as clusters_ | ||
|
@@ -13,6 +14,7 @@ | |
import console_link.middleware.tuples as tuples_ | ||
|
||
from console_link.models.cluster import HttpMethod | ||
from console_link.models.backfill_rfs import RfsWorkersInProgress | ||
from console_link.models.utils import ExitCode | ||
from console_link.environment import Environment | ||
from console_link.models.metrics_source import Component, MetricStatistic | ||
|
@@ -291,6 +293,14 @@ def start_backfill_cmd(ctx, pipeline_name): | |
raise click.ClickException(message) | ||
click.echo(message) | ||
|
||
@backfill_group.command(name="pause") | ||
@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name') | ||
@click.pass_obj | ||
def pause_backfill_cmd(ctx, pipeline_name): | ||
exitcode, message = backfill_.pause(ctx.env.backfill, pipeline_name=pipeline_name) | ||
if exitcode != ExitCode.SUCCESS: | ||
raise click.ClickException(message) | ||
click.echo(message) | ||
|
||
@backfill_group.command(name="stop") | ||
@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name') | ||
|
@@ -301,6 +311,17 @@ def stop_backfill_cmd(ctx, pipeline_name): | |
raise click.ClickException(message) | ||
click.echo(message) | ||
|
||
click.echo("Archiving the working state of the backfill operation...") | ||
exitcode, message = backfill_.archive(ctx.env.backfill) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider renaming exitcode. It seemed like it was supposed to be the integer exit code for the process, but it looks like it's a boolean. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It makes enough sense that I think I'm personally fine w/ it. |
||
|
||
while isinstance(message, RfsWorkersInProgress): | ||
click.echo(f"RFS Workers are still running, waiting for them to complete...") | ||
time.sleep(5) | ||
exitcode, message = backfill_.archive(ctx.env.backfill) | ||
|
||
if exitcode != ExitCode.SUCCESS: | ||
raise click.ClickException(message) | ||
click.echo(f"Backfill working state archived to: {message}") | ||
|
||
@backfill_group.command(name="scale") | ||
@click.argument("units", type=int, required=True) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,9 +44,14 @@ def start(self, *args, **kwargs) -> CommandResult[str]: | |
or failures--their data will begin moving to the target cluster.""" | ||
pass | ||
|
||
@abstractmethod | ||
def pause(self, *args, **kwargs) -> CommandResult[str]: | ||
"""Pause the backfill. This backfill should be resumable afterwards by invoking `start`.""" | ||
pass | ||
|
||
@abstractmethod | ||
def stop(self, *args, **kwargs) -> CommandResult[str]: | ||
"""Stop or pause the backfill. This does not make guarantees about resumeability.""" | ||
"""Stop the backfill. This does not make guarantees about resumeability.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a guarantee if it completes successfully, right? That would be a fair thing to say and give the user a better understanding of what to expect. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The intuition is that you use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why can you not guarantee if start can pickup or not if these two commands ended successfully? |
||
pass | ||
|
||
@abstractmethod | ||
|
@@ -58,5 +63,11 @@ def get_status(self, *args, **kwargs) -> CommandResult[Tuple[BackfillStatus, str | |
def scale(self, units: int, *args, **kwargs) -> CommandResult[str]: | ||
pass | ||
|
||
@abstractmethod | ||
def archive(self, *args, **kwargs) -> CommandResult[str]: | ||
"""Archive the backfill operation. Should return the information required to resume the backfill operations. | ||
Should fail if there are currently running operations.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. currently running backfill operations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This text is not surfaced to the users at all, AFAIK |
||
pass | ||
|
||
def describe(self) -> Dict: | ||
return self.config |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,14 @@ | ||
from datetime import datetime | ||
from typing import Dict, Optional | ||
import json | ||
import os | ||
from typing import Any, Dict, Optional | ||
|
||
|
||
import requests | ||
|
||
from console_link.models.backfill_base import Backfill, BackfillStatus | ||
from console_link.models.client_options import ClientOptions | ||
from console_link.models.cluster import Cluster | ||
from console_link.models.cluster import Cluster, HttpMethod | ||
from console_link.models.schema_tools import contains_one_of | ||
from console_link.models.command_result import CommandResult | ||
from console_link.models.ecs_service import ECSService | ||
|
@@ -17,6 +19,8 @@ | |
|
||
logger = logging.getLogger(__name__) | ||
|
||
WORKING_STATE_INDEX = ".migrations_working_state" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. eventually, this should be configurable, but I don't think that needs to be today. Let's do that at the same time we make it configurable across the board. |
||
|
||
DOCKER_RFS_SCHEMA = { | ||
"type": "dict", | ||
"nullable": True, | ||
|
@@ -73,19 +77,28 @@ def get_status(self, *args, **kwargs) -> CommandResult: | |
def scale(self, units: int, *args, **kwargs) -> CommandResult: | ||
raise NotImplementedError() | ||
|
||
|
||
class DockerRFSBackfill(RFSBackfill): | ||
def __init__(self, config: Dict, target_cluster: Cluster) -> None: | ||
super().__init__(config) | ||
self.target_cluster = target_cluster | ||
self.docker_config = self.config["reindex_from_snapshot"]["docker"] | ||
|
||
def pause(self, pipeline_name=None) -> CommandResult: | ||
raise NotImplementedError() | ||
|
||
def get_status(self, *args, **kwargs) -> CommandResult: | ||
return CommandResult(True, (BackfillStatus.RUNNING, "This is my running state message")) | ||
|
||
def scale(self, units: int, *args, **kwargs) -> CommandResult: | ||
raise NotImplementedError() | ||
|
||
def archive(self, *args, **kwargs) -> CommandResult: | ||
raise NotImplementedError() | ||
|
||
|
||
class RfsWorkersInProgress(Exception): | ||
def __init__(self): | ||
super().__init__("RFS Workers are still in progress") | ||
|
||
class ECSRFSBackfill(RFSBackfill): | ||
def __init__(self, config: Dict, target_cluster: Cluster, client_options: Optional[ClientOptions] = None) -> None: | ||
|
@@ -103,6 +116,10 @@ def __init__(self, config: Dict, target_cluster: Cluster, client_options: Option | |
def start(self, *args, **kwargs) -> CommandResult: | ||
logger.info(f"Starting RFS backfill by setting desired count to {self.default_scale} instances") | ||
return self.ecs_client.set_desired_count(self.default_scale) | ||
|
||
def pause(self, *args, **kwargs) -> CommandResult: | ||
logger.info(f"Pausing RFS backfill by setting desired count to 0 instances") | ||
return self.ecs_client.set_desired_count(0) | ||
|
||
def stop(self, *args, **kwargs) -> CommandResult: | ||
logger.info("Stopping RFS backfill by setting desired count to 0 instances") | ||
|
@@ -111,6 +128,23 @@ def stop(self, *args, **kwargs) -> CommandResult: | |
def scale(self, units: int, *args, **kwargs) -> CommandResult: | ||
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances") | ||
return self.ecs_client.set_desired_count(units) | ||
|
||
def archive(self, *args, archive_dir_path: str = None, **kwargs) -> CommandResult: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does this function do if the index doesn't exist? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. line 145 seems like it would be ok, but line 140 would be a problem. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated the code to gracefully skip the archive process if the index doesn't exist. |
||
logger.info("Confirming there are no currently in-progress workers") | ||
status = self.ecs_client.get_instance_statuses() | ||
if status.running > 0 or status.pending > 0 or status.desired > 0: | ||
return CommandResult(False, RfsWorkersInProgress()) | ||
|
||
backup_path = get_working_state_index_backup_path(archive_dir_path) | ||
logger.info(f"Backing up working state index to {backup_path}") | ||
documents = self.target_cluster.fetch_all_documents(WORKING_STATE_INDEX) | ||
backup_working_state_index(documents, backup_path) | ||
logger.info(f"Working state index backed up successful") | ||
|
||
logger.info("Cleaning up working state index on target cluster") | ||
self.target_cluster.call_api(f"/{WORKING_STATE_INDEX}", method=HttpMethod.DELETE, params={"ignore_unavailable": "true"}) | ||
logger.info("Working state index cleaned up successful") | ||
return CommandResult(True, backup_path) | ||
|
||
def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult: | ||
logger.info(f"Getting status of RFS backfill, with {deep_check=}") | ||
|
@@ -185,6 +219,26 @@ def _get_detailed_status(self) -> Optional[str]: | |
|
||
return "\n".join([f"Shards {key}: {value}" for key, value in values.items() if value is not None]) | ||
|
||
def get_working_state_index_backup_path(archive_dir_path: str = None) -> str: | ||
shared_logs_dir = os.getenv("SHARED_LOGS_DIR_PATH", None) | ||
if archive_dir_path: | ||
backup_dir = archive_dir_path | ||
elif shared_logs_dir is None: | ||
backup_dir = "./working_state" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make this more specific, like backfill_working_state There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
else: | ||
backup_dir = os.path.join(shared_logs_dir, "working_state") | ||
|
||
file_name = "working_state_backup.json" | ||
return os.path.join(backup_dir, file_name) | ||
|
||
def backup_working_state_index(working_state: Dict[str, Any], backup_path: str): | ||
# Ensure the backup directory exists | ||
backup_dir = os.path.dirname(backup_path) | ||
os.makedirs(backup_dir, exist_ok=True) | ||
|
||
# Write the backup | ||
with open(backup_path, "w") as f: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This overwrites an existing file. It looks like we always write to the same file. I'd recommend opening w/ "a" and putting a header in that can clearly show the beginning so that the segments can be demarcated. The other option is to open the file exclusively for create ("x") w/ a timestamp (if there is a collision, the user could see it and just try again a second or ms later - though there really shouldn't be in 99.9999% of real world use cases). If we think that files could be very big, this is probably a much better option than the previous one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made individual, timestamped files |
||
json.dump(working_state, f, indent=4) | ||
|
||
def parse_query_response(query: dict, cluster: Cluster, label: str) -> Optional[int]: | ||
try: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
from typing import Any, Dict, Optional | ||
from enum import Enum | ||
import json | ||
import logging | ||
import subprocess | ||
|
||
|
@@ -198,3 +199,63 @@ def execute_benchmark_workload(self, workload: str, | |
display_command = command.replace(f"basic_auth_password:{password_to_censor}", "basic_auth_password:********") | ||
logger.info(f"Executing command: {display_command}") | ||
subprocess.run(command, shell=True) | ||
|
||
def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Dict[str, Any]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the results here are big enough to warrant a scroll (& good catch, I think they are) - does it also make sense to write the contents to a file rather than accumulate them in memory? If a customer has 20K shards and we divide those into 20 work units each and each document is ~200 bytes of data plus pointers for strings, you're looking at 80MB. A visitor pattern would look nice here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Per discussion, turned this into a generator. |
||
documents = {} | ||
scroll_id = None | ||
session = requests.Session() | ||
|
||
# Step 1: Initiate the scroll | ||
path = f"/{index_name}/_search?scroll=1m" | ||
headers = {'Content-Type': 'application/json'} | ||
body = json.dumps({"size": batch_size, "query": {"match_all": {}}}) | ||
response = self.call_api( | ||
path=path, | ||
method=HttpMethod.POST, | ||
data=body, | ||
headers=headers, | ||
session=session | ||
) | ||
|
||
response_json = response.json() | ||
scroll_id = response_json.get('_scroll_id') | ||
hits = response_json.get('hits', {}).get('hits', []) | ||
|
||
# Add documents to result dictionary | ||
for hit in hits: | ||
documents[hit['_id']] = hit['_source'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you could put this into the beginning of the while loop - that makes the code easier to maintain if you're either putting this stuff to memory or stream |
||
|
||
# Step 2: Continue scrolling until no more documents | ||
while scroll_id and hits: | ||
path = f"/_search/scroll" | ||
body = json.dumps({"scroll": "1m", "scroll_id": scroll_id}) | ||
response = self.call_api( | ||
path=path, | ||
method=HttpMethod.POST, | ||
data=body, | ||
headers=headers, | ||
session=session | ||
) | ||
|
||
response_json = response.json() | ||
scroll_id = response_json.get('_scroll_id') | ||
hits = response_json.get('hits', {}).get('hits', []) | ||
|
||
# Add documents to result dictionary | ||
for hit in hits: | ||
documents[hit['_id']] = hit['_source'] | ||
|
||
# Step 3: Cleanup the scroll if necessary | ||
if scroll_id: | ||
path = f"/_search/scroll" | ||
body = json.dumps({"scroll_id": scroll_id}) | ||
self.call_api( | ||
path=path, | ||
method=HttpMethod.DELETE, | ||
data=body, | ||
headers=headers, | ||
session=session, | ||
raise_error=False | ||
) | ||
|
||
return documents |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipeline names could be confusing. If I knew nothing about OSI, I might think that I have named RFS pipelines that I could do something with. Other than removing OSI, I'm not sure it's worth making any other changes. For that, I'd recommend opening a jira and we'll discuss in the coming weeks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://opensearch.atlassian.net/browse/MIGRATIONS-2222