Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added console backfill pause, console backfill stop archives working state #1140

Merged
merged 7 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from pprint import pprint
import sys
import time
from typing import Dict
import click
import console_link.middleware.clusters as clusters_
Expand All @@ -13,6 +14,7 @@
import console_link.middleware.tuples as tuples_

from console_link.models.cluster import HttpMethod
from console_link.models.backfill_rfs import RfsWorkersInProgress
from console_link.models.utils import ExitCode
from console_link.environment import Environment
from console_link.models.metrics_source import Component, MetricStatistic
Expand Down Expand Up @@ -291,6 +293,14 @@ def start_backfill_cmd(ctx, pipeline_name):
raise click.ClickException(message)
click.echo(message)

@backfill_group.command(name="pause")
@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline names could be confusing. If I knew nothing about OSI, I might think that I have named RFS pipelines that I could do something with. Other than removing OSI, I'm not sure it's worth making any other changes. For that, I'd recommend opening a jira and we'll discuss in the coming weeks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@click.pass_obj
def pause_backfill_cmd(ctx, pipeline_name):
exitcode, message = backfill_.pause(ctx.env.backfill, pipeline_name=pipeline_name)
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)

@backfill_group.command(name="stop")
@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name')
Expand All @@ -301,6 +311,17 @@ def stop_backfill_cmd(ctx, pipeline_name):
raise click.ClickException(message)
click.echo(message)

click.echo("Archiving the working state of the backfill operation...")
exitcode, message = backfill_.archive(ctx.env.backfill)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming exitcode. It seemed like it was supposed to be the integer exit code for the process, but it looks like it's a boolean.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes enough sense that I think I'm personally fine w/ it.


while isinstance(message, RfsWorkersInProgress):
click.echo(f"RFS Workers are still running, waiting for them to complete...")
time.sleep(5)
exitcode, message = backfill_.archive(ctx.env.backfill)

if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(f"Backfill working state archived to: {message}")

@backfill_group.command(name="scale")
@click.argument("units", type=int, required=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,19 @@ def start(backfill: Backfill, *args, **kwargs) -> CommandResult[str]:
logger.info("Starting backfill")
return backfill.start(*args, **kwargs)

@handle_errors("backfill",
on_success=lambda result: (ExitCode.SUCCESS, "Backfill paused successfully." + "\n" + result))
def pause(backfill: Backfill, *args, **kwargs) -> CommandResult[str]:
logger.info("Pausing backfill")
return backfill.pause(*args, **kwargs)


@handle_errors("backfill",
on_success=lambda result: (ExitCode.SUCCESS, "Backfill stopped successfully." + "\n" + result))
def stop(backfill: Backfill, *args, **kwargs) -> CommandResult[str]:
logger.info("Stopping backfill")
return backfill.stop(*args, **kwargs)


@handle_errors("backfill",
on_success=lambda status: (ExitCode.SUCCESS, f"{status[0]}\n{status[1]}"))
def status(backfill: Backfill, deep_check: bool, *args, **kwargs) -> CommandResult[Tuple[BackfillStatus, str]]:
Expand All @@ -54,3 +59,8 @@ def status(backfill: Backfill, deep_check: bool, *args, **kwargs) -> CommandResu
def scale(backfill: Backfill, units: int, *args, **kwargs) -> CommandResult[str]:
logger.info(f"Scaling backfill to {units} units")
return backfill.scale(units, *args, **kwargs)

@handle_errors("backfill")
def archive(backfill: Backfill, *args, **kwargs) -> CommandResult[str]:
logger.info("Archiving backfill operation")
return backfill.archive(*args, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@


def handle_errors(service_type: str,
on_success: Callable[[Any], Tuple[ExitCode, str]] = lambda status: (ExitCode.SUCCESS, status)
on_success: Callable[[Any], Tuple[ExitCode, str]] = lambda status: (ExitCode.SUCCESS, status),
on_failure: Callable[[Any], Tuple[ExitCode, str]] = lambda status: (ExitCode.FAILURE, status)
) -> Callable[[Any], Tuple[ExitCode, str]]:
def decorator(func: Callable[[Any], Tuple[ExitCode, str]]) -> Callable[[Any], Tuple[ExitCode, str]]:
def wrapper(service: Service, *args, **kwargs) -> Tuple[ExitCode, str]:
Expand All @@ -29,6 +30,8 @@ def wrapper(service: Service, *args, **kwargs) -> Tuple[ExitCode, str]:
except Exception as e:
logger.error(f"Failed to {func.__name__} {service_type}: {e}")
return ExitCode.FAILURE, f"Failure on {func.__name__} for {service_type}: {type(e).__name__} {e}"
return on_success(result.value)
if result.success:
return on_success(result.value)
return on_failure(result.value)
return wrapper
return decorator
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,14 @@ def start(self, *args, **kwargs) -> CommandResult[str]:
or failures--their data will begin moving to the target cluster."""
pass

@abstractmethod
def pause(self, *args, **kwargs) -> CommandResult[str]:
"""Pause the backfill. This backfill should be resumable afterwards by invoking `start`."""
pass

@abstractmethod
def stop(self, *args, **kwargs) -> CommandResult[str]:
"""Stop or pause the backfill. This does not make guarantees about resumeability."""
"""Stop the backfill. This does not make guarantees about resumeability."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a guarantee if it completes successfully, right? That would be a fair thing to say and give the user a better understanding of what to expect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intuition is that you use pause when you want the ability to resume, and stop if you want to clean things up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can you not guarantee if start can pickup or not if these two commands ended successfully?

pass

@abstractmethod
Expand All @@ -58,5 +63,11 @@ def get_status(self, *args, **kwargs) -> CommandResult[Tuple[BackfillStatus, str
def scale(self, units: int, *args, **kwargs) -> CommandResult[str]:
pass

@abstractmethod
def archive(self, *args, **kwargs) -> CommandResult[str]:
"""Archive the backfill operation. Should return the information required to resume the backfill operations.
Should fail if there are currently running operations."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently running backfill operations.
The user could think that some independent activity (or who knows ECS tasks?!) could cause failure here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This text is not surfaced to the users at all, AFAIK

pass

def describe(self) -> Dict:
return self.config
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def start(self, pipeline_name=None):
pipeline_name = self.osi_props.pipeline_name
start_pipeline(osi_client=self.osi_client, pipeline_name=pipeline_name)

def pause(self, pipeline_name=None) -> CommandResult:
raise NotImplementedError()

def stop(self, pipeline_name=None):
if pipeline_name is None:
pipeline_name = self.osi_props.pipeline_name
Expand All @@ -100,3 +103,6 @@ def get_status(self, *args, **kwargs) -> CommandResult:

def scale(self, units: int, *args, **kwargs) -> CommandResult:
raise NotImplementedError()

def archive(self, pipeline_name=None) -> CommandResult:
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from datetime import datetime
from typing import Dict, Optional
import json
import os
from typing import Any, Dict, Optional


import requests

from console_link.models.backfill_base import Backfill, BackfillStatus
from console_link.models.client_options import ClientOptions
from console_link.models.cluster import Cluster
from console_link.models.cluster import Cluster, HttpMethod
from console_link.models.schema_tools import contains_one_of
from console_link.models.command_result import CommandResult
from console_link.models.ecs_service import ECSService
Expand All @@ -17,6 +19,8 @@

logger = logging.getLogger(__name__)

WORKING_STATE_INDEX = ".migrations_working_state"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventually, this should be configurable, but I don't think that needs to be today. Let's do that at the same time we make it configurable across the board.


DOCKER_RFS_SCHEMA = {
"type": "dict",
"nullable": True,
Expand Down Expand Up @@ -73,19 +77,28 @@ def get_status(self, *args, **kwargs) -> CommandResult:
def scale(self, units: int, *args, **kwargs) -> CommandResult:
raise NotImplementedError()


class DockerRFSBackfill(RFSBackfill):
def __init__(self, config: Dict, target_cluster: Cluster) -> None:
super().__init__(config)
self.target_cluster = target_cluster
self.docker_config = self.config["reindex_from_snapshot"]["docker"]

def pause(self, pipeline_name=None) -> CommandResult:
raise NotImplementedError()

def get_status(self, *args, **kwargs) -> CommandResult:
return CommandResult(True, (BackfillStatus.RUNNING, "This is my running state message"))

def scale(self, units: int, *args, **kwargs) -> CommandResult:
raise NotImplementedError()

def archive(self, *args, **kwargs) -> CommandResult:
raise NotImplementedError()


class RfsWorkersInProgress(Exception):
def __init__(self):
super().__init__("RFS Workers are still in progress")

class ECSRFSBackfill(RFSBackfill):
def __init__(self, config: Dict, target_cluster: Cluster, client_options: Optional[ClientOptions] = None) -> None:
Expand All @@ -103,6 +116,10 @@ def __init__(self, config: Dict, target_cluster: Cluster, client_options: Option
def start(self, *args, **kwargs) -> CommandResult:
logger.info(f"Starting RFS backfill by setting desired count to {self.default_scale} instances")
return self.ecs_client.set_desired_count(self.default_scale)

def pause(self, *args, **kwargs) -> CommandResult:
logger.info(f"Pausing RFS backfill by setting desired count to 0 instances")
return self.ecs_client.set_desired_count(0)

def stop(self, *args, **kwargs) -> CommandResult:
logger.info("Stopping RFS backfill by setting desired count to 0 instances")
Expand All @@ -111,6 +128,23 @@ def stop(self, *args, **kwargs) -> CommandResult:
def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances")
return self.ecs_client.set_desired_count(units)

def archive(self, *args, archive_dir_path: str = None, **kwargs) -> CommandResult:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this function do if the index doesn't exist?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 145 seems like it would be ok, but line 140 would be a problem.
The User version of this question is what happens if a user runs it twice? If it were idempotent, it would just print out the path again, so maybe the outer message can be tweaked slightly and there can be another check in here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the code to gracefully skip the archive process if the index doesn't exist.

logger.info("Confirming there are no currently in-progress workers")
status = self.ecs_client.get_instance_statuses()
if status.running > 0 or status.pending > 0 or status.desired > 0:
return CommandResult(False, RfsWorkersInProgress())

backup_path = get_working_state_index_backup_path(archive_dir_path)
logger.info(f"Backing up working state index to {backup_path}")
documents = self.target_cluster.fetch_all_documents(WORKING_STATE_INDEX)
backup_working_state_index(documents, backup_path)
logger.info(f"Working state index backed up successful")

logger.info("Cleaning up working state index on target cluster")
self.target_cluster.call_api(f"/{WORKING_STATE_INDEX}", method=HttpMethod.DELETE, params={"ignore_unavailable": "true"})
logger.info("Working state index cleaned up successful")
return CommandResult(True, backup_path)

def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult:
logger.info(f"Getting status of RFS backfill, with {deep_check=}")
Expand Down Expand Up @@ -185,6 +219,26 @@ def _get_detailed_status(self) -> Optional[str]:

return "\n".join([f"Shards {key}: {value}" for key, value in values.items() if value is not None])

def get_working_state_index_backup_path(archive_dir_path: str = None) -> str:
shared_logs_dir = os.getenv("SHARED_LOGS_DIR_PATH", None)
if archive_dir_path:
backup_dir = archive_dir_path
elif shared_logs_dir is None:
backup_dir = "./working_state"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this more specific, like backfill_working_state

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

else:
backup_dir = os.path.join(shared_logs_dir, "working_state")

file_name = "working_state_backup.json"
return os.path.join(backup_dir, file_name)

def backup_working_state_index(working_state: Dict[str, Any], backup_path: str):
# Ensure the backup directory exists
backup_dir = os.path.dirname(backup_path)
os.makedirs(backup_dir, exist_ok=True)

# Write the backup
with open(backup_path, "w") as f:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overwrites an existing file. It looks like we always write to the same file. I'd recommend opening w/ "a" and putting a header in that can clearly show the beginning so that the segments can be demarcated.

The other option is to open the file exclusively for create ("x") w/ a timestamp (if there is a collision, the user could see it and just try again a second or ms later - though there really shouldn't be in 99.9999% of real world use cases). If we think that files could be very big, this is probably a much better option than the previous one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made individual, timestamped files

json.dump(working_state, f, indent=4)

def parse_query_response(query: dict, cluster: Cluster, label: str) -> Optional[int]:
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Dict, Optional
from enum import Enum
import json
import logging
import subprocess

Expand Down Expand Up @@ -198,3 +199,63 @@ def execute_benchmark_workload(self, workload: str,
display_command = command.replace(f"basic_auth_password:{password_to_censor}", "basic_auth_password:********")
logger.info(f"Executing command: {display_command}")
subprocess.run(command, shell=True)

def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Dict[str, Any]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the results here are big enough to warrant a scroll (& good catch, I think they are) - does it also make sense to write the contents to a file rather than accumulate them in memory? If a customer has 20K shards and we divide those into 20 work units each and each document is ~200 bytes of data plus pointers for strings, you're looking at 80MB. A visitor pattern would look nice here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion, turned this into a generator.

documents = {}
scroll_id = None
session = requests.Session()

# Step 1: Initiate the scroll
path = f"/{index_name}/_search?scroll=1m"
headers = {'Content-Type': 'application/json'}
body = json.dumps({"size": batch_size, "query": {"match_all": {}}})
response = self.call_api(
path=path,
method=HttpMethod.POST,
data=body,
headers=headers,
session=session
)

response_json = response.json()
scroll_id = response_json.get('_scroll_id')
hits = response_json.get('hits', {}).get('hits', [])

# Add documents to result dictionary
for hit in hits:
documents[hit['_id']] = hit['_source']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could put this into the beginning of the while loop - that makes the code easier to maintain if you're either putting this stuff to memory or stream


# Step 2: Continue scrolling until no more documents
while scroll_id and hits:
path = f"/_search/scroll"
body = json.dumps({"scroll": "1m", "scroll_id": scroll_id})
response = self.call_api(
path=path,
method=HttpMethod.POST,
data=body,
headers=headers,
session=session
)

response_json = response.json()
scroll_id = response_json.get('_scroll_id')
hits = response_json.get('hits', {}).get('hits', [])

# Add documents to result dictionary
for hit in hits:
documents[hit['_id']] = hit['_source']

# Step 3: Cleanup the scroll if necessary
if scroll_id:
path = f"/_search/scroll"
body = json.dumps({"scroll_id": scroll_id})
self.call_api(
path=path,
method=HttpMethod.DELETE,
data=body,
headers=headers,
session=session,
raise_error=False
)

return documents
Loading
Loading