Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added console backfill pause, console backfill stop archives working state #1140

Merged
merged 7 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from pprint import pprint
import sys
import time
from typing import Dict
import click
import console_link.middleware.clusters as clusters_
Expand All @@ -13,6 +14,7 @@
import console_link.middleware.tuples as tuples_

from console_link.models.cluster import HttpMethod
from console_link.models.backfill_rfs import RfsWorkersInProgress, WorkingIndexDoesntExist
from console_link.models.utils import ExitCode
from console_link.environment import Environment
from console_link.models.metrics_source import Component, MetricStatistic
Expand Down Expand Up @@ -292,6 +294,16 @@ def start_backfill_cmd(ctx, pipeline_name):
click.echo(message)


@backfill_group.command(name="pause")
@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline names could be confusing. If I knew nothing about OSI, I might think that I have named RFS pipelines that I could do something with. Other than removing OSI, I'm not sure it's worth making any other changes. For that, I'd recommend opening a jira and we'll discuss in the coming weeks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@click.pass_obj
def pause_backfill_cmd(ctx, pipeline_name):
exitcode, message = backfill_.pause(ctx.env.backfill, pipeline_name=pipeline_name)
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)


@backfill_group.command(name="stop")
@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name')
@click.pass_obj
Expand All @@ -301,6 +313,22 @@ def stop_backfill_cmd(ctx, pipeline_name):
raise click.ClickException(message)
click.echo(message)

click.echo("Archiving the working state of the backfill operation...")
exitcode, message = backfill_.archive(ctx.env.backfill)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming exitcode. It seemed like it was supposed to be the integer exit code for the process, but it looks like it's a boolean.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes enough sense that I think I'm personally fine w/ it.


if isinstance(message, WorkingIndexDoesntExist):
click.echo("Working state index doesn't exist, skipping archive operation.")
return

while isinstance(message, RfsWorkersInProgress):
click.echo("RFS Workers are still running, waiting for them to complete...")
time.sleep(5)
exitcode, message = backfill_.archive(ctx.env.backfill)

if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(f"Backfill working state archived to: {message}")


@backfill_group.command(name="scale")
@click.argument("units", type=int, required=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ def start(backfill: Backfill, *args, **kwargs) -> CommandResult[str]:
return backfill.start(*args, **kwargs)


@handle_errors("backfill",
on_success=lambda result: (ExitCode.SUCCESS, "Backfill paused successfully." + "\n" + result))
def pause(backfill: Backfill, *args, **kwargs) -> CommandResult[str]:
logger.info("Pausing backfill")
return backfill.pause(*args, **kwargs)


@handle_errors("backfill",
on_success=lambda result: (ExitCode.SUCCESS, "Backfill stopped successfully." + "\n" + result))
def stop(backfill: Backfill, *args, **kwargs) -> CommandResult[str]:
Expand All @@ -54,3 +61,9 @@ def status(backfill: Backfill, deep_check: bool, *args, **kwargs) -> CommandResu
def scale(backfill: Backfill, units: int, *args, **kwargs) -> CommandResult[str]:
logger.info(f"Scaling backfill to {units} units")
return backfill.scale(units, *args, **kwargs)


@handle_errors("backfill")
def archive(backfill: Backfill, *args, **kwargs) -> CommandResult[str]:
logger.info("Archiving backfill operation")
return backfill.archive(*args, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@


def handle_errors(service_type: str,
on_success: Callable[[Any], Tuple[ExitCode, str]] = lambda status: (ExitCode.SUCCESS, status)
on_success: Callable[[Any], Tuple[ExitCode, str]] = lambda status: (ExitCode.SUCCESS, status),
on_failure: Callable[[Any], Tuple[ExitCode, str]] = lambda status: (ExitCode.FAILURE, status)
) -> Callable[[Any], Tuple[ExitCode, str]]:
def decorator(func: Callable[[Any], Tuple[ExitCode, str]]) -> Callable[[Any], Tuple[ExitCode, str]]:
def wrapper(service: Service, *args, **kwargs) -> Tuple[ExitCode, str]:
Expand All @@ -29,6 +30,8 @@ def wrapper(service: Service, *args, **kwargs) -> Tuple[ExitCode, str]:
except Exception as e:
logger.error(f"Failed to {func.__name__} {service_type}: {e}")
return ExitCode.FAILURE, f"Failure on {func.__name__} for {service_type}: {type(e).__name__} {e}"
return on_success(result.value)
if result.success:
return on_success(result.value)
return on_failure(result.value)
return wrapper
return decorator
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,14 @@ def start(self, *args, **kwargs) -> CommandResult[str]:
or failures--their data will begin moving to the target cluster."""
pass

@abstractmethod
def pause(self, *args, **kwargs) -> CommandResult[str]:
"""Pause the backfill. This backfill should be resumable afterwards by invoking `start`."""
pass

@abstractmethod
def stop(self, *args, **kwargs) -> CommandResult[str]:
"""Stop or pause the backfill. This does not make guarantees about resumeability."""
"""Stop the backfill. This does not make guarantees about resumeability."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a guarantee if it completes successfully, right? That would be a fair thing to say and give the user a better understanding of what to expect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intuition is that you use pause when you want the ability to resume, and stop if you want to clean things up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can you not guarantee if start can pickup or not if these two commands ended successfully?

pass

@abstractmethod
Expand All @@ -58,5 +63,11 @@ def get_status(self, *args, **kwargs) -> CommandResult[Tuple[BackfillStatus, str
def scale(self, units: int, *args, **kwargs) -> CommandResult[str]:
pass

@abstractmethod
def archive(self, *args, **kwargs) -> CommandResult[str]:
"""Archive the backfill operation. Should return the information required to resume the backfill operations.
Should fail if there are currently running operations."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently running backfill operations.
The user could think that some independent activity (or who knows ECS tasks?!) could cause failure here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This text is not surfaced to the users at all, AFAIK

pass

def describe(self) -> Dict:
return self.config
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def start(self, pipeline_name=None):
pipeline_name = self.osi_props.pipeline_name
start_pipeline(osi_client=self.osi_client, pipeline_name=pipeline_name)

def pause(self, pipeline_name=None) -> CommandResult:
raise NotImplementedError()

def stop(self, pipeline_name=None):
if pipeline_name is None:
pipeline_name = self.osi_props.pipeline_name
Expand All @@ -100,3 +103,6 @@ def get_status(self, *args, **kwargs) -> CommandResult:

def scale(self, units: int, *args, **kwargs) -> CommandResult:
raise NotImplementedError()

def archive(self, pipeline_name=None) -> CommandResult:
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from datetime import datetime
from typing import Dict, Optional
import json
import os
from typing import Dict, Optional


import requests

from console_link.models.backfill_base import Backfill, BackfillStatus
from console_link.models.client_options import ClientOptions
from console_link.models.cluster import Cluster
from console_link.models.cluster import Cluster, HttpMethod
from console_link.models.schema_tools import contains_one_of
from console_link.models.command_result import CommandResult
from console_link.models.ecs_service import ECSService
Expand All @@ -17,6 +19,8 @@

logger = logging.getLogger(__name__)

WORKING_STATE_INDEX = ".migrations_working_state"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventually, this should be configurable, but I don't think that needs to be today. Let's do that at the same time we make it configurable across the board.


DOCKER_RFS_SCHEMA = {
"type": "dict",
"nullable": True,
Expand Down Expand Up @@ -80,11 +84,27 @@ def __init__(self, config: Dict, target_cluster: Cluster) -> None:
self.target_cluster = target_cluster
self.docker_config = self.config["reindex_from_snapshot"]["docker"]

def pause(self, pipeline_name=None) -> CommandResult:
raise NotImplementedError()

def get_status(self, *args, **kwargs) -> CommandResult:
return CommandResult(True, (BackfillStatus.RUNNING, "This is my running state message"))

def scale(self, units: int, *args, **kwargs) -> CommandResult:
raise NotImplementedError()

def archive(self, *args, **kwargs) -> CommandResult:
raise NotImplementedError()


class RfsWorkersInProgress(Exception):
def __init__(self):
super().__init__("RFS Workers are still in progress")


class WorkingIndexDoesntExist(Exception):
def __init__(self, index_name: str):
super().__init__(f"The working state index '{index_name}' does not exist")


class ECSRFSBackfill(RFSBackfill):
Expand All @@ -103,6 +123,10 @@ def __init__(self, config: Dict, target_cluster: Cluster, client_options: Option
def start(self, *args, **kwargs) -> CommandResult:
logger.info(f"Starting RFS backfill by setting desired count to {self.default_scale} instances")
return self.ecs_client.set_desired_count(self.default_scale)

def pause(self, *args, **kwargs) -> CommandResult:
logger.info("Pausing RFS backfill by setting desired count to 0 instances")
return self.ecs_client.set_desired_count(0)

def stop(self, *args, **kwargs) -> CommandResult:
logger.info("Stopping RFS backfill by setting desired count to 0 instances")
Expand All @@ -111,6 +135,31 @@ def stop(self, *args, **kwargs) -> CommandResult:
def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances")
return self.ecs_client.set_desired_count(units)

def archive(self, *args, archive_dir_path: str = None, archive_file_name: str = None, **kwargs) -> CommandResult:
logger.info("Confirming there are no currently in-progress workers")
status = self.ecs_client.get_instance_statuses()
if status.running > 0 or status.pending > 0 or status.desired > 0:
return CommandResult(False, RfsWorkersInProgress())

try:
backup_path = get_working_state_index_backup_path(archive_dir_path, archive_file_name)
logger.info(f"Backing up working state index to {backup_path}")
backup_working_state_index(self.target_cluster, WORKING_STATE_INDEX, backup_path)
logger.info("Working state index backed up successful")

logger.info("Cleaning up working state index on target cluster")
self.target_cluster.call_api(
f"/{WORKING_STATE_INDEX}",
method=HttpMethod.DELETE,
params={"ignore_unavailable": "true"}
)
logger.info("Working state index cleaned up successful")
return CommandResult(True, backup_path)
except requests.HTTPError as e:
if e.response.status_code == 404:
return CommandResult(False, WorkingIndexDoesntExist(WORKING_STATE_INDEX))
return CommandResult(False, e)

def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult:
logger.info(f"Getting status of RFS backfill, with {deep_check=}")
Expand Down Expand Up @@ -186,6 +235,45 @@ def _get_detailed_status(self) -> Optional[str]:
return "\n".join([f"Shards {key}: {value}" for key, value in values.items() if value is not None])


def get_working_state_index_backup_path(archive_dir_path: str = None, archive_file_name: str = None) -> str:
shared_logs_dir = os.getenv("SHARED_LOGS_DIR_PATH", None)
if archive_dir_path:
backup_dir = archive_dir_path
elif shared_logs_dir is None:
backup_dir = "./backfill_working_state"
else:
backup_dir = os.path.join(shared_logs_dir, "backfill_working_state")

if archive_file_name:
file_name = archive_file_name
else:
file_name = f"working_state_backup_{datetime.now().strftime('%Y%m%d%H%M%S')}.json"
return os.path.join(backup_dir, file_name)


def backup_working_state_index(cluster: Cluster, index_name: str, backup_path: str):
# Ensure the backup directory exists
backup_dir = os.path.dirname(backup_path)
os.makedirs(backup_dir, exist_ok=True)

# Backup the docs in the working state index as a JSON array containing batches of documents
with open(backup_path, 'w') as outfile:
outfile.write("[\n") # Start the JSON array
first_batch = True

for batch in cluster.fetch_all_documents(index_name=index_name):
if not first_batch:
outfile.write(",\n")
else:
first_batch = False

# Dump the batch of documents as an entry in the array
batch_json = json.dumps(batch, indent=4)
outfile.write(batch_json)

outfile.write("\n]") # Close the JSON array


def parse_query_response(query: dict, cluster: Cluster, label: str) -> Optional[int]:
try:
response = cluster.call_api("/.migrations_working_state/_search", data=json.dumps(query),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Dict, Optional
from typing import Any, Dict, Generator, Optional
from enum import Enum
import json
import logging
import subprocess

Expand Down Expand Up @@ -198,3 +199,62 @@ def execute_benchmark_workload(self, workload: str,
display_command = command.replace(f"basic_auth_password:{password_to_censor}", "basic_auth_password:********")
logger.info(f"Executing command: {display_command}")
subprocess.run(command, shell=True)

def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Generator[Dict[str, Any], None, None]:
"""
Generator that fetches all documents from the specified index in batches
"""

session = requests.Session()

# Step 1: Initiate the scroll
path = f"/{index_name}/_search?scroll=1m"
headers = {'Content-Type': 'application/json'}
body = json.dumps({"size": batch_size, "query": {"match_all": {}}})
response = self.call_api(
path=path,
method=HttpMethod.POST,
data=body,
headers=headers,
session=session
)

response_json = response.json()
scroll_id = response_json.get('_scroll_id')
hits = response_json.get('hits', {}).get('hits', [])

# Yield the first batch of documents
if hits:
yield {hit['_id']: hit['_source'] for hit in hits}

# Step 2: Continue scrolling until no more documents
while scroll_id and hits:
path = "/_search/scroll"
body = json.dumps({"scroll": "1m", "scroll_id": scroll_id})
response = self.call_api(
path=path,
method=HttpMethod.POST,
data=body,
headers=headers,
session=session
)

response_json = response.json()
scroll_id = response_json.get('_scroll_id')
hits = response_json.get('hits', {}).get('hits', [])

if hits:
yield {hit['_id']: hit['_source'] for hit in hits}

# Step 3: Cleanup the scroll if necessary
if scroll_id:
path = "/_search/scroll"
body = json.dumps({"scroll_id": scroll_id})
self.call_api(
path=path,
method=HttpMethod.DELETE,
data=body,
headers=headers,
session=session,
raise_error=False
)
Loading