Skip to content

Commit

Permalink
Added console backfill pause, console backfill stop archives work…
Browse files Browse the repository at this point in the history
…ing state (#1140)

* Added `console backfill pause`

Signed-off-by: Chris Helma <[email protected]>

* Updated `console backfill stop` to archive the working state

Signed-off-by: Chris Helma <[email protected]>

* `console backfill stop` gracefully handles no state index

Signed-off-by: Chris Helma <[email protected]>

* Updates per PR comments

Signed-off-by: Chris Helma <[email protected]>

* Linting pass

Signed-off-by: Chris Helma <[email protected]>

* Another linting pass

Signed-off-by: Chris Helma <[email protected]>

---------

Signed-off-by: Chris Helma <[email protected]>
Co-authored-by: Andre Kurait <[email protected]>
  • Loading branch information
chelma and AndreKurait authored Nov 15, 2024
1 parent 65a662b commit 05cf84b
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from pprint import pprint
import sys
import time
from typing import Dict
import click
import console_link.middleware.clusters as clusters_
Expand All @@ -13,6 +14,7 @@
import console_link.middleware.tuples as tuples_

from console_link.models.cluster import HttpMethod
from console_link.models.backfill_rfs import RfsWorkersInProgress, WorkingIndexDoesntExist
from console_link.models.utils import ExitCode
from console_link.environment import Environment
from console_link.models.metrics_source import Component, MetricStatistic
Expand Down Expand Up @@ -292,6 +294,16 @@ def start_backfill_cmd(ctx, pipeline_name):
click.echo(message)


@backfill_group.command(name="pause")
@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name')
@click.pass_obj
def pause_backfill_cmd(ctx, pipeline_name):
exitcode, message = backfill_.pause(ctx.env.backfill, pipeline_name=pipeline_name)
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)


@backfill_group.command(name="stop")
@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name')
@click.pass_obj
Expand All @@ -301,6 +313,22 @@ def stop_backfill_cmd(ctx, pipeline_name):
raise click.ClickException(message)
click.echo(message)

click.echo("Archiving the working state of the backfill operation...")
exitcode, message = backfill_.archive(ctx.env.backfill)

if isinstance(message, WorkingIndexDoesntExist):
click.echo("Working state index doesn't exist, skipping archive operation.")
return

while isinstance(message, RfsWorkersInProgress):
click.echo("RFS Workers are still running, waiting for them to complete...")
time.sleep(5)
exitcode, message = backfill_.archive(ctx.env.backfill)

if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(f"Backfill working state archived to: {message}")


@backfill_group.command(name="scale")
@click.argument("units", type=int, required=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ def start(backfill: Backfill, *args, **kwargs) -> CommandResult[str]:
return backfill.start(*args, **kwargs)


@handle_errors("backfill",
on_success=lambda result: (ExitCode.SUCCESS, "Backfill paused successfully." + "\n" + result))
def pause(backfill: Backfill, *args, **kwargs) -> CommandResult[str]:
logger.info("Pausing backfill")
return backfill.pause(*args, **kwargs)


@handle_errors("backfill",
on_success=lambda result: (ExitCode.SUCCESS, "Backfill stopped successfully." + "\n" + result))
def stop(backfill: Backfill, *args, **kwargs) -> CommandResult[str]:
Expand All @@ -54,3 +61,9 @@ def status(backfill: Backfill, deep_check: bool, *args, **kwargs) -> CommandResu
def scale(backfill: Backfill, units: int, *args, **kwargs) -> CommandResult[str]:
logger.info(f"Scaling backfill to {units} units")
return backfill.scale(units, *args, **kwargs)


@handle_errors("backfill")
def archive(backfill: Backfill, *args, **kwargs) -> CommandResult[str]:
logger.info("Archiving backfill operation")
return backfill.archive(*args, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@


def handle_errors(service_type: str,
on_success: Callable[[Any], Tuple[ExitCode, str]] = lambda status: (ExitCode.SUCCESS, status)
on_success: Callable[[Any], Tuple[ExitCode, str]] = lambda status: (ExitCode.SUCCESS, status),
on_failure: Callable[[Any], Tuple[ExitCode, str]] = lambda status: (ExitCode.FAILURE, status)
) -> Callable[[Any], Tuple[ExitCode, str]]:
def decorator(func: Callable[[Any], Tuple[ExitCode, str]]) -> Callable[[Any], Tuple[ExitCode, str]]:
def wrapper(service: Service, *args, **kwargs) -> Tuple[ExitCode, str]:
Expand All @@ -29,6 +30,8 @@ def wrapper(service: Service, *args, **kwargs) -> Tuple[ExitCode, str]:
except Exception as e:
logger.error(f"Failed to {func.__name__} {service_type}: {e}")
return ExitCode.FAILURE, f"Failure on {func.__name__} for {service_type}: {type(e).__name__} {e}"
return on_success(result.value)
if result.success:
return on_success(result.value)
return on_failure(result.value)
return wrapper
return decorator
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,14 @@ def start(self, *args, **kwargs) -> CommandResult[str]:
or failures--their data will begin moving to the target cluster."""
pass

@abstractmethod
def pause(self, *args, **kwargs) -> CommandResult[str]:
"""Pause the backfill. This backfill should be resumable afterwards by invoking `start`."""
pass

@abstractmethod
def stop(self, *args, **kwargs) -> CommandResult[str]:
"""Stop or pause the backfill. This does not make guarantees about resumeability."""
"""Stop the backfill. This does not make guarantees about resumeability."""
pass

@abstractmethod
Expand All @@ -58,5 +63,11 @@ def get_status(self, *args, **kwargs) -> CommandResult[Tuple[BackfillStatus, str
def scale(self, units: int, *args, **kwargs) -> CommandResult[str]:
pass

@abstractmethod
def archive(self, *args, **kwargs) -> CommandResult[str]:
"""Archive the backfill operation. Should return the information required to resume the backfill operations.
Should fail if there are currently running operations."""
pass

def describe(self) -> Dict:
return self.config
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def start(self, pipeline_name=None):
pipeline_name = self.osi_props.pipeline_name
start_pipeline(osi_client=self.osi_client, pipeline_name=pipeline_name)

def pause(self, pipeline_name=None) -> CommandResult:
raise NotImplementedError()

def stop(self, pipeline_name=None):
if pipeline_name is None:
pipeline_name = self.osi_props.pipeline_name
Expand All @@ -100,3 +103,6 @@ def get_status(self, *args, **kwargs) -> CommandResult:

def scale(self, units: int, *args, **kwargs) -> CommandResult:
raise NotImplementedError()

def archive(self, pipeline_name=None) -> CommandResult:
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from datetime import datetime
from typing import Dict, Optional
import json
import os
from typing import Dict, Optional


import requests

from console_link.models.backfill_base import Backfill, BackfillStatus
from console_link.models.client_options import ClientOptions
from console_link.models.cluster import Cluster
from console_link.models.cluster import Cluster, HttpMethod
from console_link.models.schema_tools import contains_one_of
from console_link.models.command_result import CommandResult
from console_link.models.ecs_service import ECSService
Expand All @@ -17,6 +19,8 @@

logger = logging.getLogger(__name__)

WORKING_STATE_INDEX = ".migrations_working_state"

DOCKER_RFS_SCHEMA = {
"type": "dict",
"nullable": True,
Expand Down Expand Up @@ -80,11 +84,27 @@ def __init__(self, config: Dict, target_cluster: Cluster) -> None:
self.target_cluster = target_cluster
self.docker_config = self.config["reindex_from_snapshot"]["docker"]

def pause(self, pipeline_name=None) -> CommandResult:
raise NotImplementedError()

def get_status(self, *args, **kwargs) -> CommandResult:
return CommandResult(True, (BackfillStatus.RUNNING, "This is my running state message"))

def scale(self, units: int, *args, **kwargs) -> CommandResult:
raise NotImplementedError()

def archive(self, *args, **kwargs) -> CommandResult:
raise NotImplementedError()


class RfsWorkersInProgress(Exception):
def __init__(self):
super().__init__("RFS Workers are still in progress")


class WorkingIndexDoesntExist(Exception):
def __init__(self, index_name: str):
super().__init__(f"The working state index '{index_name}' does not exist")


class ECSRFSBackfill(RFSBackfill):
Expand All @@ -103,6 +123,10 @@ def __init__(self, config: Dict, target_cluster: Cluster, client_options: Option
def start(self, *args, **kwargs) -> CommandResult:
logger.info(f"Starting RFS backfill by setting desired count to {self.default_scale} instances")
return self.ecs_client.set_desired_count(self.default_scale)

def pause(self, *args, **kwargs) -> CommandResult:
logger.info("Pausing RFS backfill by setting desired count to 0 instances")
return self.ecs_client.set_desired_count(0)

def stop(self, *args, **kwargs) -> CommandResult:
logger.info("Stopping RFS backfill by setting desired count to 0 instances")
Expand All @@ -111,6 +135,31 @@ def stop(self, *args, **kwargs) -> CommandResult:
def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances")
return self.ecs_client.set_desired_count(units)

def archive(self, *args, archive_dir_path: str = None, archive_file_name: str = None, **kwargs) -> CommandResult:
logger.info("Confirming there are no currently in-progress workers")
status = self.ecs_client.get_instance_statuses()
if status.running > 0 or status.pending > 0 or status.desired > 0:
return CommandResult(False, RfsWorkersInProgress())

try:
backup_path = get_working_state_index_backup_path(archive_dir_path, archive_file_name)
logger.info(f"Backing up working state index to {backup_path}")
backup_working_state_index(self.target_cluster, WORKING_STATE_INDEX, backup_path)
logger.info("Working state index backed up successful")

logger.info("Cleaning up working state index on target cluster")
self.target_cluster.call_api(
f"/{WORKING_STATE_INDEX}",
method=HttpMethod.DELETE,
params={"ignore_unavailable": "true"}
)
logger.info("Working state index cleaned up successful")
return CommandResult(True, backup_path)
except requests.HTTPError as e:
if e.response.status_code == 404:
return CommandResult(False, WorkingIndexDoesntExist(WORKING_STATE_INDEX))
return CommandResult(False, e)

def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult:
logger.info(f"Getting status of RFS backfill, with {deep_check=}")
Expand Down Expand Up @@ -186,6 +235,45 @@ def _get_detailed_status(self) -> Optional[str]:
return "\n".join([f"Shards {key}: {value}" for key, value in values.items() if value is not None])


def get_working_state_index_backup_path(archive_dir_path: str = None, archive_file_name: str = None) -> str:
shared_logs_dir = os.getenv("SHARED_LOGS_DIR_PATH", None)
if archive_dir_path:
backup_dir = archive_dir_path
elif shared_logs_dir is None:
backup_dir = "./backfill_working_state"
else:
backup_dir = os.path.join(shared_logs_dir, "backfill_working_state")

if archive_file_name:
file_name = archive_file_name
else:
file_name = f"working_state_backup_{datetime.now().strftime('%Y%m%d%H%M%S')}.json"
return os.path.join(backup_dir, file_name)


def backup_working_state_index(cluster: Cluster, index_name: str, backup_path: str):
# Ensure the backup directory exists
backup_dir = os.path.dirname(backup_path)
os.makedirs(backup_dir, exist_ok=True)

# Backup the docs in the working state index as a JSON array containing batches of documents
with open(backup_path, 'w') as outfile:
outfile.write("[\n") # Start the JSON array
first_batch = True

for batch in cluster.fetch_all_documents(index_name=index_name):
if not first_batch:
outfile.write(",\n")
else:
first_batch = False

# Dump the batch of documents as an entry in the array
batch_json = json.dumps(batch, indent=4)
outfile.write(batch_json)

outfile.write("\n]") # Close the JSON array


def parse_query_response(query: dict, cluster: Cluster, label: str) -> Optional[int]:
try:
response = cluster.call_api("/.migrations_working_state/_search", data=json.dumps(query),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Dict, Optional
from typing import Any, Dict, Generator, Optional
from enum import Enum
import json
import logging
import subprocess

Expand Down Expand Up @@ -198,3 +199,62 @@ def execute_benchmark_workload(self, workload: str,
display_command = command.replace(f"basic_auth_password:{password_to_censor}", "basic_auth_password:********")
logger.info(f"Executing command: {display_command}")
subprocess.run(command, shell=True)

def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Generator[Dict[str, Any], None, None]:
"""
Generator that fetches all documents from the specified index in batches
"""

session = requests.Session()

# Step 1: Initiate the scroll
path = f"/{index_name}/_search?scroll=1m"
headers = {'Content-Type': 'application/json'}
body = json.dumps({"size": batch_size, "query": {"match_all": {}}})
response = self.call_api(
path=path,
method=HttpMethod.POST,
data=body,
headers=headers,
session=session
)

response_json = response.json()
scroll_id = response_json.get('_scroll_id')
hits = response_json.get('hits', {}).get('hits', [])

# Yield the first batch of documents
if hits:
yield {hit['_id']: hit['_source'] for hit in hits}

# Step 2: Continue scrolling until no more documents
while scroll_id and hits:
path = "/_search/scroll"
body = json.dumps({"scroll": "1m", "scroll_id": scroll_id})
response = self.call_api(
path=path,
method=HttpMethod.POST,
data=body,
headers=headers,
session=session
)

response_json = response.json()
scroll_id = response_json.get('_scroll_id')
hits = response_json.get('hits', {}).get('hits', [])

if hits:
yield {hit['_id']: hit['_source'] for hit in hits}

# Step 3: Cleanup the scroll if necessary
if scroll_id:
path = "/_search/scroll"
body = json.dumps({"scroll_id": scroll_id})
self.call_api(
path=path,
method=HttpMethod.DELETE,
data=body,
headers=headers,
session=session,
raise_error=False
)
Loading

0 comments on commit 05cf84b

Please sign in to comment.