From ae451a01bf8d38de121f05f377f30acafb8b80f6 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Tue, 12 Nov 2024 08:04:19 -0600 Subject: [PATCH 1/6] Added `console backfill pause` Signed-off-by: Chris Helma --- .../migrationConsole/lib/console_link/console_link/cli.py | 8 ++++++++ .../lib/console_link/console_link/middleware/backfill.py | 6 ++++++ .../lib/console_link/console_link/models/backfill_base.py | 7 ++++++- .../lib/console_link/console_link/models/backfill_osi.py | 3 +++ .../lib/console_link/console_link/models/backfill_rfs.py | 7 +++++++ .../lib/console_link/tests/test_backfill.py | 7 +++++++ .../migrationConsole/lib/console_link/tests/test_cli.py | 8 ++++++++ 7 files changed, 45 insertions(+), 1 deletion(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py index a5230dc27..611cfa778 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py @@ -291,6 +291,14 @@ def start_backfill_cmd(ctx, pipeline_name): raise click.ClickException(message) click.echo(message) +@backfill_group.command(name="pause") +@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name') +@click.pass_obj +def pause_backfill_cmd(ctx, pipeline_name): + exitcode, message = backfill_.pause(ctx.env.backfill, pipeline_name=pipeline_name) + if exitcode != ExitCode.SUCCESS: + raise click.ClickException(message) + click.echo(message) @backfill_group.command(name="stop") @click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name') diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/backfill.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/backfill.py index 70832089f..86bd24088 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/backfill.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/backfill.py @@ -34,6 +34,12 @@ def start(backfill: Backfill, *args, **kwargs) -> CommandResult[str]: logger.info("Starting backfill") return backfill.start(*args, **kwargs) +@handle_errors("backfill", + on_success=lambda result: (ExitCode.SUCCESS, "Backfill paused successfully." + "\n" + result)) +def pause(backfill: Backfill, *args, **kwargs) -> CommandResult[str]: + logger.info("Pausing backfill") + return backfill.pause(*args, **kwargs) + @handle_errors("backfill", on_success=lambda result: (ExitCode.SUCCESS, "Backfill stopped successfully." + "\n" + result)) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_base.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_base.py index 0fe6b90fc..79e5f0e69 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_base.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_base.py @@ -44,9 +44,14 @@ def start(self, *args, **kwargs) -> CommandResult[str]: or failures--their data will begin moving to the target cluster.""" pass + @abstractmethod + def pause(self, *args, **kwargs) -> CommandResult[str]: + """Pause the backfill. This backfill should be resumable afterwards by invoking `start`.""" + pass + @abstractmethod def stop(self, *args, **kwargs) -> CommandResult[str]: - """Stop or pause the backfill. This does not make guarantees about resumeability.""" + """Stop the backfill. This does not make guarantees about resumeability.""" pass @abstractmethod diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_osi.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_osi.py index 17aa3f8b1..806a27353 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_osi.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_osi.py @@ -90,6 +90,9 @@ def start(self, pipeline_name=None): pipeline_name = self.osi_props.pipeline_name start_pipeline(osi_client=self.osi_client, pipeline_name=pipeline_name) + def pause(self, pipeline_name=None) -> CommandResult: + raise NotImplementedError() + def stop(self, pipeline_name=None): if pipeline_name is None: pipeline_name = self.osi_props.pipeline_name diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py index 6a81357fa..f202f7a83 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py @@ -80,6 +80,9 @@ def __init__(self, config: Dict, target_cluster: Cluster) -> None: self.target_cluster = target_cluster self.docker_config = self.config["reindex_from_snapshot"]["docker"] + def pause(self, pipeline_name=None) -> CommandResult: + raise NotImplementedError() + def get_status(self, *args, **kwargs) -> CommandResult: return CommandResult(True, (BackfillStatus.RUNNING, "This is my running state message")) @@ -103,6 +106,10 @@ def __init__(self, config: Dict, target_cluster: Cluster, client_options: Option def start(self, *args, **kwargs) -> CommandResult: logger.info(f"Starting RFS backfill by setting desired count to {self.default_scale} instances") return self.ecs_client.set_desired_count(self.default_scale) + + def pause(self, *args, **kwargs) -> CommandResult: + logger.info(f"Pausing RFS backfill by setting desired count to 0 instances") + return self.ecs_client.set_desired_count(0) def stop(self, *args, **kwargs) -> CommandResult: logger.info("Stopping RFS backfill by setting desired count to 0 instances") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py index 994349193..bb054620f 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py @@ -180,6 +180,13 @@ def test_ecs_rfs_backfill_start_sets_ecs_desired_count(ecs_rfs_backfill, mocker) assert isinstance(ecs_rfs_backfill, ECSRFSBackfill) mock.assert_called_once_with(ecs_rfs_backfill.ecs_client, 5) +def test_ecs_rfs_backfill_pause_sets_ecs_desired_count(ecs_rfs_backfill, mocker): + assert ecs_rfs_backfill.default_scale == 5 + mock = mocker.patch.object(ECSService, 'set_desired_count', autospec=True) + ecs_rfs_backfill.pause() + + assert isinstance(ecs_rfs_backfill, ECSRFSBackfill) + mock.assert_called_once_with(ecs_rfs_backfill.ecs_client, 0) def test_ecs_rfs_backfill_stop_sets_ecs_desired_count(ecs_rfs_backfill, mocker): assert ecs_rfs_backfill.default_scale == 5 diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py index 710d42c80..1e4f1ee1a 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py @@ -463,6 +463,14 @@ def test_cli_backfill_start(runner, mocker): mock.assert_called_once() assert result.exit_code == 0 +def test_cli_backfill_pause(runner, mocker): + mock = mocker.patch.object(ECSRFSBackfill, 'pause', autospec=True) + result = runner.invoke(cli, ['--config-file', str(TEST_DATA_DIRECTORY / "services_with_ecs_rfs.yaml"), + 'backfill', 'pause'], + catch_exceptions=True) + mock.assert_called_once() + assert result.exit_code == 0 + def test_cli_backfill_stop(runner, mocker): mock = mocker.patch.object(ECSRFSBackfill, 'stop', autospec=True) From 65eca1fbd80d9b233c2165f6348e757f89496b58 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Thu, 14 Nov 2024 09:30:02 -0600 Subject: [PATCH 2/6] Updated `console backfill stop` to archive the working state Signed-off-by: Chris Helma --- .../lib/console_link/console_link/cli.py | 13 ++++ .../console_link/middleware/backfill.py | 6 +- .../console_link/middleware/error_handler.py | 7 ++- .../console_link/models/backfill_base.py | 6 ++ .../console_link/models/backfill_osi.py | 3 + .../console_link/models/backfill_rfs.py | 53 +++++++++++++++- .../console_link/models/cluster.py | 61 +++++++++++++++++++ .../lib/console_link/tests/test_backfill.py | 44 ++++++++++++- .../lib/console_link/tests/test_cli.py | 18 ++++-- .../lib/console_link/tests/test_cluster.py | 30 +++++++++ 10 files changed, 228 insertions(+), 13 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py index 611cfa778..4cf924224 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py @@ -1,6 +1,7 @@ import json from pprint import pprint import sys +import time from typing import Dict import click import console_link.middleware.clusters as clusters_ @@ -13,6 +14,7 @@ import console_link.middleware.tuples as tuples_ from console_link.models.cluster import HttpMethod +from console_link.models.backfill_rfs import RfsWorkersInProgress from console_link.models.utils import ExitCode from console_link.environment import Environment from console_link.models.metrics_source import Component, MetricStatistic @@ -309,6 +311,17 @@ def stop_backfill_cmd(ctx, pipeline_name): raise click.ClickException(message) click.echo(message) + click.echo("Archiving the working state of the backfill operation...") + exitcode, message = backfill_.archive(ctx.env.backfill) + + while isinstance(message, RfsWorkersInProgress): + click.echo(f"RFS Workers are still running, waiting for them to complete...") + time.sleep(5) + exitcode, message = backfill_.archive(ctx.env.backfill) + + if exitcode != ExitCode.SUCCESS: + raise click.ClickException(message) + click.echo(f"Backfill working state archived to: {message}") @backfill_group.command(name="scale") @click.argument("units", type=int, required=True) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/backfill.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/backfill.py index 86bd24088..3fb5b8c13 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/backfill.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/backfill.py @@ -47,7 +47,6 @@ def stop(backfill: Backfill, *args, **kwargs) -> CommandResult[str]: logger.info("Stopping backfill") return backfill.stop(*args, **kwargs) - @handle_errors("backfill", on_success=lambda status: (ExitCode.SUCCESS, f"{status[0]}\n{status[1]}")) def status(backfill: Backfill, deep_check: bool, *args, **kwargs) -> CommandResult[Tuple[BackfillStatus, str]]: @@ -60,3 +59,8 @@ def status(backfill: Backfill, deep_check: bool, *args, **kwargs) -> CommandResu def scale(backfill: Backfill, units: int, *args, **kwargs) -> CommandResult[str]: logger.info(f"Scaling backfill to {units} units") return backfill.scale(units, *args, **kwargs) + +@handle_errors("backfill") +def archive(backfill: Backfill, *args, **kwargs) -> CommandResult[str]: + logger.info("Archiving backfill operation") + return backfill.archive(*args, **kwargs) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/error_handler.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/error_handler.py index f09658fb8..dce3acfb8 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/error_handler.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/error_handler.py @@ -16,7 +16,8 @@ def handle_errors(service_type: str, - on_success: Callable[[Any], Tuple[ExitCode, str]] = lambda status: (ExitCode.SUCCESS, status) + on_success: Callable[[Any], Tuple[ExitCode, str]] = lambda status: (ExitCode.SUCCESS, status), + on_failure: Callable[[Any], Tuple[ExitCode, str]] = lambda status: (ExitCode.FAILURE, status) ) -> Callable[[Any], Tuple[ExitCode, str]]: def decorator(func: Callable[[Any], Tuple[ExitCode, str]]) -> Callable[[Any], Tuple[ExitCode, str]]: def wrapper(service: Service, *args, **kwargs) -> Tuple[ExitCode, str]: @@ -29,6 +30,8 @@ def wrapper(service: Service, *args, **kwargs) -> Tuple[ExitCode, str]: except Exception as e: logger.error(f"Failed to {func.__name__} {service_type}: {e}") return ExitCode.FAILURE, f"Failure on {func.__name__} for {service_type}: {type(e).__name__} {e}" - return on_success(result.value) + if result.success: + return on_success(result.value) + return on_failure(result.value) return wrapper return decorator diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_base.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_base.py index 79e5f0e69..0fc51275c 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_base.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_base.py @@ -63,5 +63,11 @@ def get_status(self, *args, **kwargs) -> CommandResult[Tuple[BackfillStatus, str def scale(self, units: int, *args, **kwargs) -> CommandResult[str]: pass + @abstractmethod + def archive(self, *args, **kwargs) -> CommandResult[str]: + """Archive the backfill operation. Should return the information required to resume the backfill operations. + Should fail if there are currently running operations.""" + pass + def describe(self) -> Dict: return self.config diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_osi.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_osi.py index 806a27353..1a1859087 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_osi.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_osi.py @@ -103,3 +103,6 @@ def get_status(self, *args, **kwargs) -> CommandResult: def scale(self, units: int, *args, **kwargs) -> CommandResult: raise NotImplementedError() + + def archive(self, pipeline_name=None) -> CommandResult: + raise NotImplementedError() \ No newline at end of file diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py index f202f7a83..a06dce57d 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py @@ -1,12 +1,14 @@ from datetime import datetime -from typing import Dict, Optional import json +import os +from typing import Any, Dict, Optional + import requests from console_link.models.backfill_base import Backfill, BackfillStatus from console_link.models.client_options import ClientOptions -from console_link.models.cluster import Cluster +from console_link.models.cluster import Cluster, HttpMethod from console_link.models.schema_tools import contains_one_of from console_link.models.command_result import CommandResult from console_link.models.ecs_service import ECSService @@ -17,6 +19,8 @@ logger = logging.getLogger(__name__) +WORKING_STATE_INDEX = ".migrations_working_state" + DOCKER_RFS_SCHEMA = { "type": "dict", "nullable": True, @@ -73,7 +77,6 @@ def get_status(self, *args, **kwargs) -> CommandResult: def scale(self, units: int, *args, **kwargs) -> CommandResult: raise NotImplementedError() - class DockerRFSBackfill(RFSBackfill): def __init__(self, config: Dict, target_cluster: Cluster) -> None: super().__init__(config) @@ -88,7 +91,14 @@ def get_status(self, *args, **kwargs) -> CommandResult: def scale(self, units: int, *args, **kwargs) -> CommandResult: raise NotImplementedError() + + def archive(self, *args, **kwargs) -> CommandResult: + raise NotImplementedError() + +class RfsWorkersInProgress(Exception): + def __init__(self): + super().__init__("RFS Workers are still in progress") class ECSRFSBackfill(RFSBackfill): def __init__(self, config: Dict, target_cluster: Cluster, client_options: Optional[ClientOptions] = None) -> None: @@ -118,6 +128,23 @@ def stop(self, *args, **kwargs) -> CommandResult: def scale(self, units: int, *args, **kwargs) -> CommandResult: logger.info(f"Scaling RFS backfill by setting desired count to {units} instances") return self.ecs_client.set_desired_count(units) + + def archive(self, *args, archive_dir_path: str = None, **kwargs) -> CommandResult: + logger.info("Confirming there are no currently in-progress workers") + status = self.ecs_client.get_instance_statuses() + if status.running > 0 or status.pending > 0 or status.desired > 0: + return CommandResult(False, RfsWorkersInProgress()) + + backup_path = get_working_state_index_backup_path(archive_dir_path) + logger.info(f"Backing up working state index to {backup_path}") + documents = self.target_cluster.fetch_all_documents(WORKING_STATE_INDEX) + backup_working_state_index(documents, backup_path) + logger.info(f"Working state index backed up successful") + + logger.info("Cleaning up working state index on target cluster") + self.target_cluster.call_api(f"/{WORKING_STATE_INDEX}", method=HttpMethod.DELETE, params={"ignore_unavailable": "true"}) + logger.info("Working state index cleaned up successful") + return CommandResult(True, backup_path) def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult: logger.info(f"Getting status of RFS backfill, with {deep_check=}") @@ -192,6 +219,26 @@ def _get_detailed_status(self) -> Optional[str]: return "\n".join([f"Shards {key}: {value}" for key, value in values.items() if value is not None]) +def get_working_state_index_backup_path(archive_dir_path: str = None) -> str: + shared_logs_dir = os.getenv("SHARED_LOGS_DIR_PATH", None) + if archive_dir_path: + backup_dir = archive_dir_path + elif shared_logs_dir is None: + backup_dir = "./working_state" + else: + backup_dir = os.path.join(shared_logs_dir, "working_state") + + file_name = "working_state_backup.json" + return os.path.join(backup_dir, file_name) + +def backup_working_state_index(working_state: Dict[str, Any], backup_path: str): + # Ensure the backup directory exists + backup_dir = os.path.dirname(backup_path) + os.makedirs(backup_dir, exist_ok=True) + + # Write the backup + with open(backup_path, "w") as f: + json.dump(working_state, f, indent=4) def parse_query_response(query: dict, cluster: Cluster, label: str) -> Optional[int]: try: diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py index 1d6856be0..a8e005e7f 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py @@ -1,5 +1,6 @@ from typing import Any, Dict, Optional from enum import Enum +import json import logging import subprocess @@ -198,3 +199,63 @@ def execute_benchmark_workload(self, workload: str, display_command = command.replace(f"basic_auth_password:{password_to_censor}", "basic_auth_password:********") logger.info(f"Executing command: {display_command}") subprocess.run(command, shell=True) + + def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Dict[str, Any]: + documents = {} + scroll_id = None + session = requests.Session() + + # Step 1: Initiate the scroll + path = f"/{index_name}/_search?scroll=1m" + headers = {'Content-Type': 'application/json'} + body = json.dumps({"size": batch_size, "query": {"match_all": {}}}) + response = self.call_api( + path=path, + method=HttpMethod.POST, + data=body, + headers=headers, + session=session + ) + + response_json = response.json() + scroll_id = response_json.get('_scroll_id') + hits = response_json.get('hits', {}).get('hits', []) + + # Add documents to result dictionary + for hit in hits: + documents[hit['_id']] = hit['_source'] + + # Step 2: Continue scrolling until no more documents + while scroll_id and hits: + path = f"/_search/scroll" + body = json.dumps({"scroll": "1m", "scroll_id": scroll_id}) + response = self.call_api( + path=path, + method=HttpMethod.POST, + data=body, + headers=headers, + session=session + ) + + response_json = response.json() + scroll_id = response_json.get('_scroll_id') + hits = response_json.get('hits', {}).get('hits', []) + + # Add documents to result dictionary + for hit in hits: + documents[hit['_id']] = hit['_source'] + + # Step 3: Cleanup the scroll if necessary + if scroll_id: + path = f"/_search/scroll" + body = json.dumps({"scroll_id": scroll_id}) + self.call_api( + path=path, + method=HttpMethod.DELETE, + data=body, + headers=headers, + session=session, + raise_error=False + ) + + return documents diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py index bb054620f..568806570 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py @@ -1,14 +1,16 @@ import json +import os import pathlib +from unittest.mock import ANY import pytest import requests import requests_mock -from console_link.models.cluster import Cluster +from console_link.models.cluster import Cluster, HttpMethod from console_link.models.backfill_base import Backfill, BackfillStatus from console_link.models.backfill_osi import OpenSearchIngestionBackfill -from console_link.models.backfill_rfs import DockerRFSBackfill, ECSRFSBackfill +from console_link.models.backfill_rfs import DockerRFSBackfill, ECSRFSBackfill, RfsWorkersInProgress from console_link.models.ecs_service import ECSService, InstanceStatuses from console_link.models.factories import (UnsupportedBackfillTypeError, get_backfill) @@ -196,7 +198,6 @@ def test_ecs_rfs_backfill_stop_sets_ecs_desired_count(ecs_rfs_backfill, mocker): assert isinstance(ecs_rfs_backfill, ECSRFSBackfill) mock.assert_called_once_with(ecs_rfs_backfill.ecs_client, 0) - def test_ecs_rfs_backfill_scale_sets_ecs_desired_count(ecs_rfs_backfill, mocker): mock = mocker.patch.object(ECSService, 'set_desired_count', autospec=True) ecs_rfs_backfill.scale(3) @@ -306,6 +307,43 @@ def test_ecs_rfs_deep_status_check_failure(ecs_rfs_backfill, mocker, caplog): assert result.success assert result.value[0] == BackfillStatus.RUNNING +def test_ecs_rfs_backfill_archive_as_expected(ecs_rfs_backfill, mocker, tmpdir): + mocked_instance_status = InstanceStatuses( + desired=0, + running=0, + pending=0 + ) + mocker.patch.object(ECSService, 'get_instance_statuses', autospec=True, return_value=mocked_instance_status) + + mocked_docs = {"id": {"key": "value"}} + mocker.patch.object(Cluster, 'fetch_all_documents', autospec=True, return_value=mocked_docs) + + mock_api = mocker.patch.object(Cluster, 'call_api', autospec=True, return_value=requests.Response()) + + result = ecs_rfs_backfill.archive(archive_dir_path=tmpdir.strpath) + + assert result.success + expected_path = os.path.join(tmpdir.strpath, "working_state_backup.json") + assert result.value == expected_path + assert os.path.exists(expected_path) + with open(expected_path, "r") as f: + assert json.load(f) == mocked_docs + + mock_api.assert_called_once_with(ANY, "/.migrations_working_state", method=HttpMethod.DELETE, params={"ignore_unavailable": "true"}) + +def test_ecs_rfs_backfill_archive_errors_if_in_progress(ecs_rfs_backfill, mocker): + mocked_instance_status = InstanceStatuses( + desired=3, + running=1, + pending=2 + ) + mock = mocker.patch.object(ECSService, 'get_instance_statuses', autospec=True, return_value=mocked_instance_status) + result = ecs_rfs_backfill.archive() + + mock.assert_called_once_with(ecs_rfs_backfill.ecs_client) + assert not result.success + assert isinstance(result.value, RfsWorkersInProgress) + def test_docker_backfill_not_implemented_commands(): docker_rfs_config = { diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py index 1e4f1ee1a..fa019b44a 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py @@ -1,6 +1,7 @@ import json import pathlib import os +import time import pytest import requests_mock @@ -9,7 +10,7 @@ import console_link.middleware as middleware from console_link.cli import cli from console_link.environment import Environment -from console_link.models.backfill_rfs import ECSRFSBackfill +from console_link.models.backfill_rfs import ECSRFSBackfill, RfsWorkersInProgress from console_link.models.cluster import Cluster, HttpMethod from console_link.models.command_result import CommandResult from console_link.models.ecs_service import ECSService, InstanceStatuses @@ -473,11 +474,20 @@ def test_cli_backfill_pause(runner, mocker): def test_cli_backfill_stop(runner, mocker): - mock = mocker.patch.object(ECSRFSBackfill, 'stop', autospec=True) + mock_stop = mocker.patch.object(ECSRFSBackfill, 'stop', autospec=True) + + archive_result_seq = [ + CommandResult(success=False, value=RfsWorkersInProgress()), + CommandResult(success=True, value="/path/to/archive.json") + ] + mock_archive = mocker.patch.object(ECSRFSBackfill, 'archive', autospec=True, side_effect=archive_result_seq) + mocker.patch.object(time, 'sleep', autospec=True) # make a no-op + result = runner.invoke(cli, ['--config-file', str(TEST_DATA_DIRECTORY / "services_with_ecs_rfs.yaml"), 'backfill', 'stop'], - catch_exceptions=True) - mock.assert_called_once() + catch_exceptions=False) + mock_stop.assert_called_once() + assert mock_archive.call_count == 2 assert result.exit_code == 0 diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cluster.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cluster.py index 228645af7..d84d6119e 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cluster.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cluster.py @@ -247,6 +247,36 @@ def test_valid_cluster_api_call_with_client_options(requests_mock): assert test_user_agent in requests_mock.last_request.headers['User-Agent'] +def test_valid_cluster_fetch_all_documents(requests_mock): + cluster = create_valid_cluster(auth_type=AuthMethod.NO_AUTH) + assert isinstance(cluster, Cluster) + + test_index = "test_index" + batch_size = 1 + test_scroll_id = "test_scroll_id" + requests_mock.post( + f"{cluster.endpoint}/{test_index}/_search?scroll=1m", + json={ + "_scroll_id": test_scroll_id, + "hits": { + "hits": [{"_id": "id_1", "_source": {"test1": True}}] + } + } + ) + requests_mock.post( + f"{cluster.endpoint}/_search/scroll", + json={ + "_scroll_id": None, + "hits": { + "hits": [{"_id": "id_2", "_source": {"test2": True}}] + } + } + ) + requests_mock.delete(f"{cluster.endpoint}/_search/scroll") + documents = cluster.fetch_all_documents(test_index, batch_size=batch_size) + assert documents == {"id_1": {"test1": True}, "id_2": {"test2": True}} + + def test_connection_check_with_exception(mocker): cluster = create_valid_cluster() api_mock = mocker.patch.object(Cluster, 'call_api', side_effect=Exception('Attempt to connect to cluster failed')) From 7676ccd21ba6d197fe3c4eb00a174ec61a2fdff0 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Fri, 15 Nov 2024 09:02:42 -0600 Subject: [PATCH 3/6] `console backfill stop` gracefully handles no state index Signed-off-by: Chris Helma --- .../lib/console_link/console_link/cli.py | 6 +++- .../console_link/models/backfill_rfs.py | 31 ++++++++++++------- .../lib/console_link/tests/test_backfill.py | 19 +++++++++++- .../lib/console_link/tests/test_cli.py | 17 ++++++++-- 4 files changed, 58 insertions(+), 15 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py index 4cf924224..d9da72f55 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py @@ -14,7 +14,7 @@ import console_link.middleware.tuples as tuples_ from console_link.models.cluster import HttpMethod -from console_link.models.backfill_rfs import RfsWorkersInProgress +from console_link.models.backfill_rfs import RfsWorkersInProgress, WorkingIndexDoesntExist from console_link.models.utils import ExitCode from console_link.environment import Environment from console_link.models.metrics_source import Component, MetricStatistic @@ -314,6 +314,10 @@ def stop_backfill_cmd(ctx, pipeline_name): click.echo("Archiving the working state of the backfill operation...") exitcode, message = backfill_.archive(ctx.env.backfill) + if isinstance(message, WorkingIndexDoesntExist): + click.echo(f"Working state index doesn't exist, skipping archive operation.") + return + while isinstance(message, RfsWorkersInProgress): click.echo(f"RFS Workers are still running, waiting for them to complete...") time.sleep(5) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py index a06dce57d..8cfd5965d 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py @@ -100,6 +100,10 @@ class RfsWorkersInProgress(Exception): def __init__(self): super().__init__("RFS Workers are still in progress") +class WorkingIndexDoesntExist(Exception): + def __init__(self, index_name: str): + super().__init__(f"The working state index '{index_name}' does not exist") + class ECSRFSBackfill(RFSBackfill): def __init__(self, config: Dict, target_cluster: Cluster, client_options: Optional[ClientOptions] = None) -> None: super().__init__(config) @@ -134,17 +138,22 @@ def archive(self, *args, archive_dir_path: str = None, **kwargs) -> CommandResul status = self.ecs_client.get_instance_statuses() if status.running > 0 or status.pending > 0 or status.desired > 0: return CommandResult(False, RfsWorkersInProgress()) - - backup_path = get_working_state_index_backup_path(archive_dir_path) - logger.info(f"Backing up working state index to {backup_path}") - documents = self.target_cluster.fetch_all_documents(WORKING_STATE_INDEX) - backup_working_state_index(documents, backup_path) - logger.info(f"Working state index backed up successful") - - logger.info("Cleaning up working state index on target cluster") - self.target_cluster.call_api(f"/{WORKING_STATE_INDEX}", method=HttpMethod.DELETE, params={"ignore_unavailable": "true"}) - logger.info("Working state index cleaned up successful") - return CommandResult(True, backup_path) + + try: + backup_path = get_working_state_index_backup_path(archive_dir_path) + logger.info(f"Backing up working state index to {backup_path}") + documents = self.target_cluster.fetch_all_documents(WORKING_STATE_INDEX) + backup_working_state_index(documents, backup_path) + logger.info(f"Working state index backed up successful") + + logger.info("Cleaning up working state index on target cluster") + self.target_cluster.call_api(f"/{WORKING_STATE_INDEX}", method=HttpMethod.DELETE, params={"ignore_unavailable": "true"}) + logger.info("Working state index cleaned up successful") + return CommandResult(True, backup_path) + except requests.HTTPError as e: + if e.response.status_code == 404: + return CommandResult(False, WorkingIndexDoesntExist(WORKING_STATE_INDEX)) + return CommandResult(False, e) def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult: logger.info(f"Getting status of RFS backfill, with {deep_check=}") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py index 568806570..d8e2d2d8e 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py @@ -10,7 +10,7 @@ from console_link.models.cluster import Cluster, HttpMethod from console_link.models.backfill_base import Backfill, BackfillStatus from console_link.models.backfill_osi import OpenSearchIngestionBackfill -from console_link.models.backfill_rfs import DockerRFSBackfill, ECSRFSBackfill, RfsWorkersInProgress +from console_link.models.backfill_rfs import DockerRFSBackfill, ECSRFSBackfill, RfsWorkersInProgress, WorkingIndexDoesntExist from console_link.models.ecs_service import ECSService, InstanceStatuses from console_link.models.factories import (UnsupportedBackfillTypeError, get_backfill) @@ -331,6 +331,23 @@ def test_ecs_rfs_backfill_archive_as_expected(ecs_rfs_backfill, mocker, tmpdir): mock_api.assert_called_once_with(ANY, "/.migrations_working_state", method=HttpMethod.DELETE, params={"ignore_unavailable": "true"}) +def test_ecs_rfs_backfill_archive_no_index_as_expected(ecs_rfs_backfill, mocker, tmpdir): + mocked_instance_status = InstanceStatuses( + desired=0, + running=0, + pending=0 + ) + mocker.patch.object(ECSService, 'get_instance_statuses', autospec=True, return_value=mocked_instance_status) + + response_404 = requests.Response() + response_404.status_code = 404 + mocker.patch.object(Cluster, 'fetch_all_documents', autospec=True, side_effect=requests.HTTPError(response=response_404, request=requests.Request())) + + result = ecs_rfs_backfill.archive(archive_dir_path=tmpdir.strpath) + + assert not result.success + assert isinstance(result.value, WorkingIndexDoesntExist) + def test_ecs_rfs_backfill_archive_errors_if_in_progress(ecs_rfs_backfill, mocker): mocked_instance_status = InstanceStatuses( desired=3, diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py index fa019b44a..abf0fec5f 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py @@ -10,7 +10,7 @@ import console_link.middleware as middleware from console_link.cli import cli from console_link.environment import Environment -from console_link.models.backfill_rfs import ECSRFSBackfill, RfsWorkersInProgress +from console_link.models.backfill_rfs import ECSRFSBackfill, RfsWorkersInProgress, WorkingIndexDoesntExist from console_link.models.cluster import Cluster, HttpMethod from console_link.models.command_result import CommandResult from console_link.models.ecs_service import ECSService, InstanceStatuses @@ -472,7 +472,6 @@ def test_cli_backfill_pause(runner, mocker): mock.assert_called_once() assert result.exit_code == 0 - def test_cli_backfill_stop(runner, mocker): mock_stop = mocker.patch.object(ECSRFSBackfill, 'stop', autospec=True) @@ -490,6 +489,20 @@ def test_cli_backfill_stop(runner, mocker): assert mock_archive.call_count == 2 assert result.exit_code == 0 +def test_cli_backfill_stop_no_index(runner, mocker): + mock_stop = mocker.patch.object(ECSRFSBackfill, 'stop', autospec=True) + + archive_result = CommandResult(success=False, value=WorkingIndexDoesntExist("index")) + mock_archive = mocker.patch.object(ECSRFSBackfill, 'archive', autospec=True, return_value=archive_result) + mocker.patch.object(time, 'sleep', autospec=True) # make a no-op + + result = runner.invoke(cli, ['--config-file', str(TEST_DATA_DIRECTORY / "services_with_ecs_rfs.yaml"), + 'backfill', 'stop'], + catch_exceptions=False) + mock_stop.assert_called_once() + assert mock_archive.call_count == 1 + assert result.exit_code == 0 + def test_cli_backfill_scale(runner, mocker): mock = mocker.patch.object(ECSRFSBackfill, 'scale', autospec=True) From 7003634808c7a19bcbf5dcaf63babe898be5260d Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Fri, 15 Nov 2024 11:28:29 -0600 Subject: [PATCH 4/6] Updates per PR comments Signed-off-by: Chris Helma --- .../console_link/models/backfill_rfs.py | 39 +++++++++++++------ .../console_link/models/cluster.py | 23 ++++++----- .../lib/console_link/tests/test_backfill.py | 8 ++-- .../lib/console_link/tests/test_cluster.py | 4 +- 4 files changed, 44 insertions(+), 30 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py index 8cfd5965d..975dc9743 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py @@ -133,17 +133,16 @@ def scale(self, units: int, *args, **kwargs) -> CommandResult: logger.info(f"Scaling RFS backfill by setting desired count to {units} instances") return self.ecs_client.set_desired_count(units) - def archive(self, *args, archive_dir_path: str = None, **kwargs) -> CommandResult: + def archive(self, *args, archive_dir_path: str = None, archive_file_name: str = None, **kwargs) -> CommandResult: logger.info("Confirming there are no currently in-progress workers") status = self.ecs_client.get_instance_statuses() if status.running > 0 or status.pending > 0 or status.desired > 0: return CommandResult(False, RfsWorkersInProgress()) try: - backup_path = get_working_state_index_backup_path(archive_dir_path) + backup_path = get_working_state_index_backup_path(archive_dir_path, archive_file_name) logger.info(f"Backing up working state index to {backup_path}") - documents = self.target_cluster.fetch_all_documents(WORKING_STATE_INDEX) - backup_working_state_index(documents, backup_path) + backup_working_state_index(self.target_cluster, WORKING_STATE_INDEX, backup_path) logger.info(f"Working state index backed up successful") logger.info("Cleaning up working state index on target cluster") @@ -228,26 +227,42 @@ def _get_detailed_status(self) -> Optional[str]: return "\n".join([f"Shards {key}: {value}" for key, value in values.items() if value is not None]) -def get_working_state_index_backup_path(archive_dir_path: str = None) -> str: +def get_working_state_index_backup_path(archive_dir_path: str = None, archive_file_name: str = None) -> str: shared_logs_dir = os.getenv("SHARED_LOGS_DIR_PATH", None) if archive_dir_path: backup_dir = archive_dir_path elif shared_logs_dir is None: - backup_dir = "./working_state" + backup_dir = "./backfill_working_state" else: - backup_dir = os.path.join(shared_logs_dir, "working_state") + backup_dir = os.path.join(shared_logs_dir, "backfill_working_state") - file_name = "working_state_backup.json" + if archive_file_name: + file_name = archive_file_name + else: + file_name = f"working_state_backup_{datetime.now().strftime('%Y%m%d%H%M%S')}.json" return os.path.join(backup_dir, file_name) -def backup_working_state_index(working_state: Dict[str, Any], backup_path: str): +def backup_working_state_index(cluster: Cluster, index_name:str, backup_path: str): # Ensure the backup directory exists backup_dir = os.path.dirname(backup_path) os.makedirs(backup_dir, exist_ok=True) - # Write the backup - with open(backup_path, "w") as f: - json.dump(working_state, f, indent=4) + # Backup the docs in the working state index as a JSON array containing batches of documents + with open(backup_path, 'w') as outfile: + outfile.write("[\n") # Start the JSON array + first_batch = True + + for batch in cluster.fetch_all_documents(index_name=index_name): + if not first_batch: + outfile.write(",\n") + else: + first_batch = False + + # Dump the batch of documents as an entry in the array + batch_json = json.dumps(batch, indent=4) + outfile.write(batch_json) + + outfile.write("\n]") # Close the JSON array def parse_query_response(query: dict, cluster: Cluster, label: str) -> Optional[int]: try: diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py index a8e005e7f..9de275249 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Optional +from typing import Any, Dict, Generator, Optional from enum import Enum import json import logging @@ -200,9 +200,11 @@ def execute_benchmark_workload(self, workload: str, logger.info(f"Executing command: {display_command}") subprocess.run(command, shell=True) - def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Dict[str, Any]: - documents = {} - scroll_id = None + def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Generator[Dict[str, Any], None, None]: + """ + Generator that fetches all documents from the specified index in batches + """ + session = requests.Session() # Step 1: Initiate the scroll @@ -221,9 +223,9 @@ def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Dict[st scroll_id = response_json.get('_scroll_id') hits = response_json.get('hits', {}).get('hits', []) - # Add documents to result dictionary - for hit in hits: - documents[hit['_id']] = hit['_source'] + # Yield the first batch of documents + if hits: + yield {hit['_id']: hit['_source'] for hit in hits} # Step 2: Continue scrolling until no more documents while scroll_id and hits: @@ -241,9 +243,8 @@ def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Dict[st scroll_id = response_json.get('_scroll_id') hits = response_json.get('hits', {}).get('hits', []) - # Add documents to result dictionary - for hit in hits: - documents[hit['_id']] = hit['_source'] + if hits: + yield {hit['_id']: hit['_source'] for hit in hits} # Step 3: Cleanup the scroll if necessary if scroll_id: @@ -257,5 +258,3 @@ def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Dict[st session=session, raise_error=False ) - - return documents diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py index d8e2d2d8e..6f08aaf60 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py @@ -315,15 +315,15 @@ def test_ecs_rfs_backfill_archive_as_expected(ecs_rfs_backfill, mocker, tmpdir): ) mocker.patch.object(ECSService, 'get_instance_statuses', autospec=True, return_value=mocked_instance_status) - mocked_docs = {"id": {"key": "value"}} + mocked_docs = [{"id": {"key": "value"}}] mocker.patch.object(Cluster, 'fetch_all_documents', autospec=True, return_value=mocked_docs) mock_api = mocker.patch.object(Cluster, 'call_api', autospec=True, return_value=requests.Response()) - result = ecs_rfs_backfill.archive(archive_dir_path=tmpdir.strpath) + result = ecs_rfs_backfill.archive(archive_dir_path=tmpdir.strpath, archive_file_name="backup.json") assert result.success - expected_path = os.path.join(tmpdir.strpath, "working_state_backup.json") + expected_path = os.path.join(tmpdir.strpath, "backup.json") assert result.value == expected_path assert os.path.exists(expected_path) with open(expected_path, "r") as f: @@ -343,7 +343,7 @@ def test_ecs_rfs_backfill_archive_no_index_as_expected(ecs_rfs_backfill, mocker, response_404.status_code = 404 mocker.patch.object(Cluster, 'fetch_all_documents', autospec=True, side_effect=requests.HTTPError(response=response_404, request=requests.Request())) - result = ecs_rfs_backfill.archive(archive_dir_path=tmpdir.strpath) + result = ecs_rfs_backfill.archive() assert not result.success assert isinstance(result.value, WorkingIndexDoesntExist) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cluster.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cluster.py index d84d6119e..b87fcb9d8 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cluster.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cluster.py @@ -273,8 +273,8 @@ def test_valid_cluster_fetch_all_documents(requests_mock): } ) requests_mock.delete(f"{cluster.endpoint}/_search/scroll") - documents = cluster.fetch_all_documents(test_index, batch_size=batch_size) - assert documents == {"id_1": {"test1": True}, "id_2": {"test2": True}} + documents = [batch for batch in cluster.fetch_all_documents(test_index, batch_size=batch_size)] + assert documents == [{"id_1": {"test1": True}}, {"id_2": {"test2": True}}] def test_connection_check_with_exception(mocker): From 168b424e71f5ef54b59be7e7ab8140493426baa6 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Fri, 15 Nov 2024 11:42:06 -0600 Subject: [PATCH 5/6] Linting pass Signed-off-by: Chris Helma --- .../lib/console_link/console_link/cli.py | 7 ++++-- .../console_link/middleware/backfill.py | 3 +++ .../console_link/models/backfill_osi.py | 2 +- .../console_link/models/backfill_rfs.py | 20 +++++++++++++---- .../console_link/models/cluster.py | 4 ++-- .../lib/console_link/tests/test_backfill.py | 22 ++++++++++++++----- .../lib/console_link/tests/test_cli.py | 7 ++++-- 7 files changed, 49 insertions(+), 16 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py index d9da72f55..e8c6b60b3 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py @@ -293,6 +293,7 @@ def start_backfill_cmd(ctx, pipeline_name): raise click.ClickException(message) click.echo(message) + @backfill_group.command(name="pause") @click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name') @click.pass_obj @@ -302,6 +303,7 @@ def pause_backfill_cmd(ctx, pipeline_name): raise click.ClickException(message) click.echo(message) + @backfill_group.command(name="stop") @click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name') @click.pass_obj @@ -315,11 +317,11 @@ def stop_backfill_cmd(ctx, pipeline_name): exitcode, message = backfill_.archive(ctx.env.backfill) if isinstance(message, WorkingIndexDoesntExist): - click.echo(f"Working state index doesn't exist, skipping archive operation.") + click.echo("Working state index doesn't exist, skipping archive operation.") return while isinstance(message, RfsWorkersInProgress): - click.echo(f"RFS Workers are still running, waiting for them to complete...") + click.echo("RFS Workers are still running, waiting for them to complete...") time.sleep(5) exitcode, message = backfill_.archive(ctx.env.backfill) @@ -327,6 +329,7 @@ def stop_backfill_cmd(ctx, pipeline_name): raise click.ClickException(message) click.echo(f"Backfill working state archived to: {message}") + @backfill_group.command(name="scale") @click.argument("units", type=int, required=True) @click.pass_obj diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/backfill.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/backfill.py index 3fb5b8c13..868003f3b 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/backfill.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/middleware/backfill.py @@ -34,6 +34,7 @@ def start(backfill: Backfill, *args, **kwargs) -> CommandResult[str]: logger.info("Starting backfill") return backfill.start(*args, **kwargs) + @handle_errors("backfill", on_success=lambda result: (ExitCode.SUCCESS, "Backfill paused successfully." + "\n" + result)) def pause(backfill: Backfill, *args, **kwargs) -> CommandResult[str]: @@ -47,6 +48,7 @@ def stop(backfill: Backfill, *args, **kwargs) -> CommandResult[str]: logger.info("Stopping backfill") return backfill.stop(*args, **kwargs) + @handle_errors("backfill", on_success=lambda status: (ExitCode.SUCCESS, f"{status[0]}\n{status[1]}")) def status(backfill: Backfill, deep_check: bool, *args, **kwargs) -> CommandResult[Tuple[BackfillStatus, str]]: @@ -60,6 +62,7 @@ def scale(backfill: Backfill, units: int, *args, **kwargs) -> CommandResult[str] logger.info(f"Scaling backfill to {units} units") return backfill.scale(units, *args, **kwargs) + @handle_errors("backfill") def archive(backfill: Backfill, *args, **kwargs) -> CommandResult[str]: logger.info("Archiving backfill operation") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_osi.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_osi.py index 1a1859087..7ce9a3367 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_osi.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_osi.py @@ -105,4 +105,4 @@ def scale(self, units: int, *args, **kwargs) -> CommandResult: raise NotImplementedError() def archive(self, pipeline_name=None) -> CommandResult: - raise NotImplementedError() \ No newline at end of file + raise NotImplementedError() diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py index 975dc9743..9becc7c14 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py @@ -1,7 +1,7 @@ from datetime import datetime import json import os -from typing import Any, Dict, Optional +from typing import Dict, Optional import requests @@ -77,6 +77,7 @@ def get_status(self, *args, **kwargs) -> CommandResult: def scale(self, units: int, *args, **kwargs) -> CommandResult: raise NotImplementedError() + class DockerRFSBackfill(RFSBackfill): def __init__(self, config: Dict, target_cluster: Cluster) -> None: super().__init__(config) @@ -100,10 +101,12 @@ class RfsWorkersInProgress(Exception): def __init__(self): super().__init__("RFS Workers are still in progress") + class WorkingIndexDoesntExist(Exception): def __init__(self, index_name: str): super().__init__(f"The working state index '{index_name}' does not exist") + class ECSRFSBackfill(RFSBackfill): def __init__(self, config: Dict, target_cluster: Cluster, client_options: Optional[ClientOptions] = None) -> None: super().__init__(config) @@ -122,7 +125,7 @@ def start(self, *args, **kwargs) -> CommandResult: return self.ecs_client.set_desired_count(self.default_scale) def pause(self, *args, **kwargs) -> CommandResult: - logger.info(f"Pausing RFS backfill by setting desired count to 0 instances") + logger.info("Pausing RFS backfill by setting desired count to 0 instances") return self.ecs_client.set_desired_count(0) def stop(self, *args, **kwargs) -> CommandResult: @@ -143,10 +146,14 @@ def archive(self, *args, archive_dir_path: str = None, archive_file_name: str = backup_path = get_working_state_index_backup_path(archive_dir_path, archive_file_name) logger.info(f"Backing up working state index to {backup_path}") backup_working_state_index(self.target_cluster, WORKING_STATE_INDEX, backup_path) - logger.info(f"Working state index backed up successful") + logger.info("Working state index backed up successful") logger.info("Cleaning up working state index on target cluster") - self.target_cluster.call_api(f"/{WORKING_STATE_INDEX}", method=HttpMethod.DELETE, params={"ignore_unavailable": "true"}) + self.target_cluster.call_api( + f"/{WORKING_STATE_INDEX}", + method=HttpMethod.DELETE, + params={"ignore_unavailable": "true"} + ) logger.info("Working state index cleaned up successful") return CommandResult(True, backup_path) except requests.HTTPError as e: @@ -154,6 +161,7 @@ def archive(self, *args, archive_dir_path: str = None, archive_file_name: str = return CommandResult(False, WorkingIndexDoesntExist(WORKING_STATE_INDEX)) return CommandResult(False, e) + def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult: logger.info(f"Getting status of RFS backfill, with {deep_check=}") instance_statuses = self.ecs_client.get_instance_statuses() @@ -176,6 +184,7 @@ def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult: return CommandResult(True, (BackfillStatus.STARTING, status_string)) return CommandResult(True, (BackfillStatus.STOPPED, status_string)) + def _get_detailed_status(self) -> Optional[str]: # Check whether the working state index exists. If not, we can't run queries. try: @@ -227,6 +236,7 @@ def _get_detailed_status(self) -> Optional[str]: return "\n".join([f"Shards {key}: {value}" for key, value in values.items() if value is not None]) + def get_working_state_index_backup_path(archive_dir_path: str = None, archive_file_name: str = None) -> str: shared_logs_dir = os.getenv("SHARED_LOGS_DIR_PATH", None) if archive_dir_path: @@ -242,6 +252,7 @@ def get_working_state_index_backup_path(archive_dir_path: str = None, archive_fi file_name = f"working_state_backup_{datetime.now().strftime('%Y%m%d%H%M%S')}.json" return os.path.join(backup_dir, file_name) + def backup_working_state_index(cluster: Cluster, index_name:str, backup_path: str): # Ensure the backup directory exists backup_dir = os.path.dirname(backup_path) @@ -264,6 +275,7 @@ def backup_working_state_index(cluster: Cluster, index_name:str, backup_path: st outfile.write("\n]") # Close the JSON array + def parse_query_response(query: dict, cluster: Cluster, label: str) -> Optional[int]: try: response = cluster.call_api("/.migrations_working_state/_search", data=json.dumps(query), diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py index 9de275249..01dec1824 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py @@ -229,7 +229,7 @@ def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Generat # Step 2: Continue scrolling until no more documents while scroll_id and hits: - path = f"/_search/scroll" + path = "/_search/scroll" body = json.dumps({"scroll": "1m", "scroll_id": scroll_id}) response = self.call_api( path=path, @@ -248,7 +248,7 @@ def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Generat # Step 3: Cleanup the scroll if necessary if scroll_id: - path = f"/_search/scroll" + path = "/_search/scroll" body = json.dumps({"scroll_id": scroll_id}) self.call_api( path=path, diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py index 6f08aaf60..81816fbb5 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py @@ -10,10 +10,10 @@ from console_link.models.cluster import Cluster, HttpMethod from console_link.models.backfill_base import Backfill, BackfillStatus from console_link.models.backfill_osi import OpenSearchIngestionBackfill -from console_link.models.backfill_rfs import DockerRFSBackfill, ECSRFSBackfill, RfsWorkersInProgress, WorkingIndexDoesntExist +from console_link.models.backfill_rfs import (DockerRFSBackfill, ECSRFSBackfill, RfsWorkersInProgress, + WorkingIndexDoesntExist) from console_link.models.ecs_service import ECSService, InstanceStatuses -from console_link.models.factories import (UnsupportedBackfillTypeError, - get_backfill) +from console_link.models.factories import UnsupportedBackfillTypeError, get_backfill from tests.utils import create_valid_cluster TEST_DATA_DIRECTORY = pathlib.Path(__file__).parent / "data" @@ -182,6 +182,7 @@ def test_ecs_rfs_backfill_start_sets_ecs_desired_count(ecs_rfs_backfill, mocker) assert isinstance(ecs_rfs_backfill, ECSRFSBackfill) mock.assert_called_once_with(ecs_rfs_backfill.ecs_client, 5) + def test_ecs_rfs_backfill_pause_sets_ecs_desired_count(ecs_rfs_backfill, mocker): assert ecs_rfs_backfill.default_scale == 5 mock = mocker.patch.object(ECSService, 'set_desired_count', autospec=True) @@ -190,6 +191,7 @@ def test_ecs_rfs_backfill_pause_sets_ecs_desired_count(ecs_rfs_backfill, mocker) assert isinstance(ecs_rfs_backfill, ECSRFSBackfill) mock.assert_called_once_with(ecs_rfs_backfill.ecs_client, 0) + def test_ecs_rfs_backfill_stop_sets_ecs_desired_count(ecs_rfs_backfill, mocker): assert ecs_rfs_backfill.default_scale == 5 mock = mocker.patch.object(ECSService, 'set_desired_count', autospec=True) @@ -198,6 +200,7 @@ def test_ecs_rfs_backfill_stop_sets_ecs_desired_count(ecs_rfs_backfill, mocker): assert isinstance(ecs_rfs_backfill, ECSRFSBackfill) mock.assert_called_once_with(ecs_rfs_backfill.ecs_client, 0) + def test_ecs_rfs_backfill_scale_sets_ecs_desired_count(ecs_rfs_backfill, mocker): mock = mocker.patch.object(ECSService, 'set_desired_count', autospec=True) ecs_rfs_backfill.scale(3) @@ -307,6 +310,7 @@ def test_ecs_rfs_deep_status_check_failure(ecs_rfs_backfill, mocker, caplog): assert result.success assert result.value[0] == BackfillStatus.RUNNING + def test_ecs_rfs_backfill_archive_as_expected(ecs_rfs_backfill, mocker, tmpdir): mocked_instance_status = InstanceStatuses( desired=0, @@ -329,7 +333,11 @@ def test_ecs_rfs_backfill_archive_as_expected(ecs_rfs_backfill, mocker, tmpdir): with open(expected_path, "r") as f: assert json.load(f) == mocked_docs - mock_api.assert_called_once_with(ANY, "/.migrations_working_state", method=HttpMethod.DELETE, params={"ignore_unavailable": "true"}) + mock_api.assert_called_once_with( + ANY, "/.migrations_working_state", method=HttpMethod.DELETE, + params={"ignore_unavailable": "true"} + ) + def test_ecs_rfs_backfill_archive_no_index_as_expected(ecs_rfs_backfill, mocker, tmpdir): mocked_instance_status = InstanceStatuses( @@ -341,13 +349,17 @@ def test_ecs_rfs_backfill_archive_no_index_as_expected(ecs_rfs_backfill, mocker, response_404 = requests.Response() response_404.status_code = 404 - mocker.patch.object(Cluster, 'fetch_all_documents', autospec=True, side_effect=requests.HTTPError(response=response_404, request=requests.Request())) + mocker.patch.object( + Cluster, 'fetch_all_documents', autospec=True, + side_effect=requests.HTTPError(response=response_404, request=requests.Request()) + ) result = ecs_rfs_backfill.archive() assert not result.success assert isinstance(result.value, WorkingIndexDoesntExist) + def test_ecs_rfs_backfill_archive_errors_if_in_progress(ecs_rfs_backfill, mocker): mocked_instance_status = InstanceStatuses( desired=3, diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py index abf0fec5f..f46dcd2c3 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py @@ -464,6 +464,7 @@ def test_cli_backfill_start(runner, mocker): mock.assert_called_once() assert result.exit_code == 0 + def test_cli_backfill_pause(runner, mocker): mock = mocker.patch.object(ECSRFSBackfill, 'pause', autospec=True) result = runner.invoke(cli, ['--config-file', str(TEST_DATA_DIRECTORY / "services_with_ecs_rfs.yaml"), @@ -472,6 +473,7 @@ def test_cli_backfill_pause(runner, mocker): mock.assert_called_once() assert result.exit_code == 0 + def test_cli_backfill_stop(runner, mocker): mock_stop = mocker.patch.object(ECSRFSBackfill, 'stop', autospec=True) @@ -480,7 +482,7 @@ def test_cli_backfill_stop(runner, mocker): CommandResult(success=True, value="/path/to/archive.json") ] mock_archive = mocker.patch.object(ECSRFSBackfill, 'archive', autospec=True, side_effect=archive_result_seq) - mocker.patch.object(time, 'sleep', autospec=True) # make a no-op + mocker.patch.object(time, 'sleep', autospec=True) # make a no-op result = runner.invoke(cli, ['--config-file', str(TEST_DATA_DIRECTORY / "services_with_ecs_rfs.yaml"), 'backfill', 'stop'], @@ -489,12 +491,13 @@ def test_cli_backfill_stop(runner, mocker): assert mock_archive.call_count == 2 assert result.exit_code == 0 + def test_cli_backfill_stop_no_index(runner, mocker): mock_stop = mocker.patch.object(ECSRFSBackfill, 'stop', autospec=True) archive_result = CommandResult(success=False, value=WorkingIndexDoesntExist("index")) mock_archive = mocker.patch.object(ECSRFSBackfill, 'archive', autospec=True, return_value=archive_result) - mocker.patch.object(time, 'sleep', autospec=True) # make a no-op + mocker.patch.object(time, 'sleep', autospec=True) # make a no-op result = runner.invoke(cli, ['--config-file', str(TEST_DATA_DIRECTORY / "services_with_ecs_rfs.yaml"), 'backfill', 'stop'], From ec091a4793b97c1aeba13a0a2685b6e3c4d5cd56 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Fri, 15 Nov 2024 11:45:14 -0600 Subject: [PATCH 6/6] Another linting pass Signed-off-by: Chris Helma --- .../lib/console_link/console_link/models/backfill_rfs.py | 4 +--- .../migrationConsole/lib/console_link/tests/test_backfill.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py index 9becc7c14..ef23ee334 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py @@ -161,7 +161,6 @@ def archive(self, *args, archive_dir_path: str = None, archive_file_name: str = return CommandResult(False, WorkingIndexDoesntExist(WORKING_STATE_INDEX)) return CommandResult(False, e) - def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult: logger.info(f"Getting status of RFS backfill, with {deep_check=}") instance_statuses = self.ecs_client.get_instance_statuses() @@ -184,7 +183,6 @@ def get_status(self, deep_check: bool, *args, **kwargs) -> CommandResult: return CommandResult(True, (BackfillStatus.STARTING, status_string)) return CommandResult(True, (BackfillStatus.STOPPED, status_string)) - def _get_detailed_status(self) -> Optional[str]: # Check whether the working state index exists. If not, we can't run queries. try: @@ -253,7 +251,7 @@ def get_working_state_index_backup_path(archive_dir_path: str = None, archive_fi return os.path.join(backup_dir, file_name) -def backup_working_state_index(cluster: Cluster, index_name:str, backup_path: str): +def backup_working_state_index(cluster: Cluster, index_name: str, backup_path: str): # Ensure the backup directory exists backup_dir = os.path.dirname(backup_path) os.makedirs(backup_dir, exist_ok=True) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py index 81816fbb5..dde324838 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py @@ -11,7 +11,7 @@ from console_link.models.backfill_base import Backfill, BackfillStatus from console_link.models.backfill_osi import OpenSearchIngestionBackfill from console_link.models.backfill_rfs import (DockerRFSBackfill, ECSRFSBackfill, RfsWorkersInProgress, - WorkingIndexDoesntExist) + WorkingIndexDoesntExist) from console_link.models.ecs_service import ECSService, InstanceStatuses from console_link.models.factories import UnsupportedBackfillTypeError, get_backfill from tests.utils import create_valid_cluster