From 46dd9829f752b4d641a8762e6620c37a0385519a Mon Sep 17 00:00:00 2001 From: Rodolfo Olivieri Date: Mon, 17 Jun 2024 10:37:15 -0300 Subject: [PATCH] WIP --- .../actions/pre_ponr_changes/backup_system.py | 2 +- convert2rhel/cli.py | 412 +++++++++ convert2rhel/main.py | 4 +- convert2rhel/subscription.py | 19 +- convert2rhel/systeminfo.py | 3 +- convert2rhel/toolopts.py | 799 ++++------------- convert2rhel/unit_tests/__init__.py | 5 +- .../pre_ponr_changes/backup_system_test.py | 4 +- .../pre_ponr_changes/subscription_test.py | 6 +- convert2rhel/unit_tests/cli_test.py | 848 ++++++++++++++++++ convert2rhel/unit_tests/main_test.py | 48 +- convert2rhel/unit_tests/toolopts_test.py | 802 +---------------- convert2rhel/utils/subscription.py | 136 +++ man/__init__.py | 4 +- 14 files changed, 1591 insertions(+), 1501 deletions(-) create mode 100644 convert2rhel/cli.py create mode 100644 convert2rhel/unit_tests/cli_test.py create mode 100644 convert2rhel/utils/subscription.py diff --git a/convert2rhel/actions/pre_ponr_changes/backup_system.py b/convert2rhel/actions/pre_ponr_changes/backup_system.py index 5d8a7a6934..796a31f0f2 100644 --- a/convert2rhel/actions/pre_ponr_changes/backup_system.py +++ b/convert2rhel/actions/pre_ponr_changes/backup_system.py @@ -21,12 +21,12 @@ from convert2rhel import actions, backup, exceptions from convert2rhel.backup.files import MissingFile, RestorableFile +from convert2rhel.cli import PRE_RPM_VA_LOG_FILENAME from convert2rhel.logger import LOG_DIR from convert2rhel.pkghandler import VERSIONLOCK_FILE_PATH from convert2rhel.redhatrelease import os_release_file, system_release_file from convert2rhel.repo import DEFAULT_DNF_VARS_DIR, DEFAULT_YUM_REPOFILE_DIR, DEFAULT_YUM_VARS_DIR from convert2rhel.systeminfo import system_info -from convert2rhel.toolopts import PRE_RPM_VA_LOG_FILENAME # Regex explanation: diff --git a/convert2rhel/cli.py b/convert2rhel/cli.py new file mode 100644 index 0000000000..9c18afce58 --- /dev/null +++ b/convert2rhel/cli.py @@ -0,0 +1,412 @@ +# -*- coding: utf-8 -*- +# +# Copyright(C) 2024 Red Hat, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__metaclass__ = type + + +import argparse +import logging +import os +import sys + +from convert2rhel import __version__, utils +from convert2rhel.toolopts import tool_opts + + +loggerinst = logging.getLogger(__name__) + + +# For a list of modified rpm files before the conversion starts +PRE_RPM_VA_LOG_FILENAME = "rpm_va.log" + +# For a list of modified rpm files after the conversion finishes for comparison purposes +POST_RPM_VA_LOG_FILENAME = "rpm_va_after_conversion.log" + +ARGS_WITH_VALUES = [ + "-u", + "--username", + "-p", + "--password", + "-k", + "--activationkey", + "-o", + "--org", + "--pool", + "--serverurl", +] +PARENT_ARGS = ["--debug", "--help", "-h", "--version"] + + +class CLI: + def __init__(self): + self._parser = self._get_argparser() + self._shared_options_parser = argparse.ArgumentParser(add_help=False) + # Duplicating parent options here as we want to make it + # available for any other basic operation that we run without a + # subcommand in mind, and, it is a shared option so we can share it + # between any subcommands we may create in the future. + self._register_parent_options(self._parser) + self._register_parent_options(self._shared_options_parser) + self._register_options() + self._process_cli_options() + + @staticmethod + def usage(subcommand_to_print=""): + # Override the subcommand_to_print parameter if the tool has been executed through CLI but without + # subcommand specified. This is to make sure that runnning `convert2rhel --help` on the CLI will print the + # usage with generic , while the manpage generated using argparse_manpage will be able to print the + # usage correctly for subcommands as it does not execute convert2rhel from the CLI. + subcommand_not_used_on_cli = "/usr/bin/convert2rhel" in sys.argv[0] and not _subcommand_used(sys.argv) + if subcommand_not_used_on_cli: + subcommand_to_print = "" + usage = ( + "\n" + " convert2rhel [--version] [-h]\n" + " convert2rhel {subcommand} [-u username] [-p password | -c conf_file_path] [--pool pool_id | -a] [--disablerepo repoid]" + " [--enablerepo repoid] [--serverurl url] [--no-rpm-va] [--eus] [--els] [--debug] [--restart] [-y]\n" + " convert2rhel {subcommand} [--no-rhsm] [--disablerepo repoid] [--enablerepo repoid] [--no-rpm-va] [--eus] [--els] [--debug] [--restart] [-y]\n" + " convert2rhel {subcommand} [-k activation_key | -c conf_file_path] [-o organization] [--pool pool_id | -a] [--disablerepo repoid] [--enablerepo" + " repoid] [--serverurl url] [--no-rpm-va] [--eus] [--els] [--debug] [--restart] [-y]\n" + ).format(subcommand=subcommand_to_print) + + if subcommand_not_used_on_cli: + usage = usage + "\n Subcommands: analyze, convert" + return usage + + def _get_argparser(self): + return argparse.ArgumentParser(conflict_handler="resolve", usage=self.usage()) + + def _register_commands(self): + """Configures parsers specific to the analyze and convert subcommands""" + subparsers = self._parser.add_subparsers(title="Subcommands", dest="command") + self._analyze_parser = subparsers.add_parser( + "analyze", + help="Run all Convert2RHEL initial checks up until the" + " Point of no Return (PONR) and generate a report with the findings." + " A rollback is initiated after the checks to put the system back" + " in the original state.", + parents=[self._shared_options_parser], + usage=self.usage(subcommand_to_print="analyze"), + ) + self._convert_parser = subparsers.add_parser( + "convert", + help="Convert the system. If no subcommand is given, 'convert' is used as a default.", + parents=[self._shared_options_parser], + usage=self.usage(subcommand_to_print="convert"), + ) + + @staticmethod + def _register_parent_options(parser): + """Prescribe what parent command line options the tool accepts.""" + parser.add_argument( + "--version", + action="version", + version=__version__, + help="Show convert2rhel version and exit.", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Print traceback in case of an abnormal exit and messages that could help find an issue.", + ) + + def _register_options(self): + """Prescribe what command line options the tool accepts.""" + self._parser.add_argument( + "-h", + "--help", + action="help", + help="Show help message and exit.", + ) + self._shared_options_parser.add_argument( + "--no-rpm-va", + action="store_true", + help="Skip gathering changed rpm files using" + " 'rpm -Va'. By default it's performed before and after the conversion with the output" + " stored in log files %s and %s. At the end of the conversion, these logs are compared" + " to show you what rpm files have been affected by the conversion." + " Cannot be used with analyze subcommand." + " The environment variable CONVERT2RHEL_INCOMPLETE_ROLLBACK" + " needs to be set to 1 to use this argument." % (PRE_RPM_VA_LOG_FILENAME, POST_RPM_VA_LOG_FILENAME), + ) + self._shared_options_parser.add_argument( + "--eus", + action="store_true", + help="Explicitly recognize the system as eus, utilizing eus repos." + " This option is meant for el8.8+ systems.", + ) + self._shared_options_parser.add_argument( + "--els", + action="store_true", + help="Explicitly recognize the system as els, utilizing els repos." + " This option is meant for el7 systems.", + ) + self._shared_options_parser.add_argument( + "--enablerepo", + metavar="repoidglob", + action="append", + help="Enable specific" + " repositories by ID or glob. For more repositories to enable, use this option" + " multiple times. If you don't use the --no-rhsm option, you can use this option" + " to override the default RHEL repoids that convert2rhel enables through" + " subscription-manager.", + ) + self._shared_options_parser.add_argument( + "--disablerepo", + metavar="repoidglob", + action="append", + help="Disable specific" + " repositories by ID or glob. For more repositories to disable, use this option" + " multiple times. This option defaults to all repositories ('*').", + ) + self._shared_options_parser.add_argument( + "-r", + "--restart", + help="Restart the system when it is successfully converted to RHEL to boot the new RHEL kernel." + " It has no effect when used with the 'analyze' subcommand.", + action="store_true", + ) + self._shared_options_parser.add_argument( + "-y", + help="Answer yes to all yes/no questions the tool asks.", + dest="auto_accept", + action="store_true", + ) + self._add_subscription_manager_options() + self._add_alternative_installation_options() + self._register_commands() + + def _add_alternative_installation_options(self): + """Prescribe what alternative command line options the tool accepts.""" + group = self._shared_options_parser.add_argument_group( + title="Alternative Installation Options", + description="The following options are required if you do not intend on using subscription-manager.", + ) + group.add_argument( + "--no-rhsm", + action="store_true", + help="Do not use the subscription-manager, use custom repositories instead. See --enablerepo/--disablerepo" + " options. Without this option, the subscription-manager is used to access RHEL repositories by default." + " Using this option requires to have the --enablerepo specified.", + ) + + def _add_subscription_manager_options(self): + """Prescribe what subscription manager command line options the tool accepts.""" + group = self._shared_options_parser.add_argument_group( + title="Subscription Manager Options", + description="The following options are specific to using subscription-manager.", + ) + group.add_argument( + "-u", + "--username", + help="Username for the" + " subscription-manager. If neither --username nor" + " --activation-key option is used, the user" + " is asked to enter the username.", + ) + group.add_argument( + "-p", + "--password", + help="Password for the" + " subscription-manager. If --password, --config-file or --activationkey are not" + " used, the user is asked to enter the password." + " We recommend using the --config-file option instead to prevent leaking the password" + " through a list of running processes.", + ) + group.add_argument( + "-k", + "--activationkey", + help="Activation key used" + " for the system registration by the" + " subscription-manager. It requires to have the --org" + " option specified." + " We recommend using the --config-file option instead to prevent leaking the activation key" + " through a list of running processes.", + dest="activation_key", + ) + group.add_argument( + "-o", + "--org", + help="Organization with which the" + " system will be registered by the" + " subscription-manager. A list of available" + " organizations is possible to obtain by running" + " 'subscription-manager orgs'. From the listed pairs" + " Name:Key, use the Key here.", + ) + group.add_argument( + "-c", + "--config-file", + help="The configuration file is an optional way to safely pass either a user password or an activation key" + " to the subscription-manager to register the system. This is more secure than passing these values" + " through the --activationkey or --password option, which might leak the values" + " through a list of running processes." + " You can edit the pre-installed configuration file template at /etc/convert2rhel.ini or create a new" + " configuration file at ~/.convert2rhel.ini. The convert2rhel utility loads the configuration from either" + " of those locations, the latter having preference over the former. Alternatively, you can specify a path" + " to the configuration file using the --config-file option to override other configurations.", + ) + group.add_argument( + "-a", + "--auto-attach", + help="Automatically attach compatible subscriptions to the system.", + action="store_true", + ) + group.add_argument( + "--pool", + help="Subscription pool ID. A list of the available" + " subscriptions is possible to obtain by running" + " 'subscription-manager list --available'." + " If no pool ID is provided, the --auto option is used", + ) + group.add_argument( + "--serverurl", + help="Hostname of the subscription service to be used when registering the system with" + " subscription-manager. The default is the Customer Portal Subscription Management service" + " (subscription.rhsm.redhat.com). It is not to be used to specify a Satellite server. For that, read" + " the product documentation at https://access.redhat.com/.", + ) + + def _process_cli_options(self): + """Process command line options used with the tool.""" + _log_command_used() + + # algorithm function to properly organize all CLI args + argv = _add_default_command(sys.argv[1:]) + parsed_opts = self._parser.parse_args(argv) + print(parsed_opts) + exit(0) + + # Processing the configuration file + # corner case: password on CLI and activation-key in the config file + # password from CLI has precedence and activation_key and org must be deleted (unused) + if config_opts.activation_key and parsed_opts.password: + tool_opts.activation_key = None + tool_opts.org = None + + if parsed_opts.no_rpm_va: + if tool_opts.activity == "analysis": + loggerinst.warning( + "We will proceed with ignoring the --no-rpm-va option as running rpm -Va" + " in the analysis mode is essential for a complete rollback to the original" + " system state at the end of the analysis." + ) + elif os.getenv("CONVERT2RHEL_INCOMPLETE_ROLLBACK", None): + tool_opts.no_rpm_va = True + else: + message = ( + "We need to run the 'rpm -Va' command to be able to perform a complete rollback of changes" + " done to the system during the pre-conversion analysis. If you accept the risk of an" + " incomplete rollback, set the CONVERT2RHEL_INCOMPLETE_ROLLBACK=1 environment" + " variable. Otherwise, remove the --no-rpm-va option." + ) + loggerinst.critical(message) + + # Check if we have duplicate repositories specified + if parsed_opts.enablerepo or parsed_opts.disablerepo: + duplicate_repos = set(tool_opts.disablerepo) & set(tool_opts.enablerepo) + if duplicate_repos: + message = "Duplicate repositories were found across disablerepo and enablerepo options:" + for repo in duplicate_repos: + message += "\n%s" % repo + message += "\nThis ambiguity may have unintended consequences." + loggerinst.warning(message) + + if parsed_opts.no_rhsm: + tool_opts.no_rhsm = True + if not tool_opts.enablerepo: + loggerinst.critical("The --enablerepo option is required when --no-rhsm is used.") + + # Security notice + if parsed_opts.password or parsed_opts.activationkey: + loggerinst.warning( + "Passing the RHSM password or activation key through the --activationkey or --password options is" + " insecure as it leaks the values through the list of running processes. We recommend using the safer" + " --config-file option instead." + ) + + # Checks of multiple authentication sources + if parsed_opts.password and parsed_opts.activation_key: + loggerinst.warning( + "Either a password or an activation key can be used for system registration." + " We're going to use the activation key." + ) + + if (parsed_opts.org and not parsed_opts.activation_key) or (not parsed_opts.org and parsed_opts.activation_key): + loggerinst.critical( + "Either the --org or the --activationkey option is missing. You can't use one without the other." + ) + + # Config files matches + # if config_opts.username and parsed_opts.username: + # loggerinst.warning( + # "You have passed the RHSM username through both the command line and the" + # " configuration file. We're going to use the command line values." + # ) + + # if config_opts.org and parsed_opts.org: + # loggerinst.warning( + # "You have passed the RHSM org through both the command line and the" + # " configuration file. We're going to use the command line values." + # ) + + # if (config_opts.activation_key or config_opts.password) and (parsed_opts.activationkey or parsed_opts.password): + # loggerinst.warning( + # "You have passed either the RHSM password or activation key through both the command line and the" + # " configuration file. We're going to use the command line values." + # ) + + # if (parsed_opts.password or config_opts.password) and not (parsed_opts.username or config_opts.username): + # loggerinst.warning( + # "You have passed the RHSM password without an associated username. Please provide a username together" + # " with the password." + # ) + + # if (parsed_opts.username or config_opts.username) and not (parsed_opts.password or config_opts.password): + # loggerinst.warning( + # "You have passed the RHSM username without an associated password. Please provide a password together" + # " with the username." + # ) + + +def _log_command_used(): + """We want to log the command used for convert2rhel to make it easier to know what command was used + when debugging the log files. Since we can't differentiate between the handlers we log to both stdout + and the logfile + """ + command = " ".join(utils.hide_secrets(sys.argv)) + loggerinst.info("convert2rhel command used:\n{0}".format(command)) + + +def _add_default_command(argv): + """Add the default command when none is given""" + subcommand = _subcommand_used(argv) + args = argv + if not subcommand: + args.insert(0, "convert") + + return args + + +def _subcommand_used(args): + """Return what subcommand has been used by the user. Return None if no subcommand has been used.""" + for index, argument in enumerate(args): + if argument in ("convert", "analyze"): + return argument + + if not argument in PARENT_ARGS and args[index - 1] in ARGS_WITH_VALUES: + return None diff --git a/convert2rhel/main.py b/convert2rhel/main.py index 92743bc2d1..9bc98cdd48 100644 --- a/convert2rhel/main.py +++ b/convert2rhel/main.py @@ -20,7 +20,7 @@ import logging import os -from convert2rhel import actions, applock, backup, breadcrumbs, checks, exceptions, grub, hostmetering +from convert2rhel import actions, applock, backup, breadcrumbs, checks, cli, exceptions, grub, hostmetering from convert2rhel import logger as logger_module from convert2rhel import pkghandler, pkgmanager, redhatrelease, subscription, systeminfo, toolopts, utils from convert2rhel.actions import level_for_raw_action_data, report @@ -109,7 +109,7 @@ def main(): """ # handle command line arguments - toolopts.CLI() + cli.CLI() # Make sure we're being run by root utils.require_root() diff --git a/convert2rhel/subscription.py b/convert2rhel/subscription.py index f5aca45ced..bc90ac5484 100644 --- a/convert2rhel/subscription.py +++ b/convert2rhel/subscription.py @@ -23,7 +23,6 @@ import os import re -from functools import partial from time import sleep import dbus @@ -35,7 +34,7 @@ from convert2rhel.redhatrelease import os_release_file from convert2rhel.repo import DEFAULT_DNF_VARS_DIR, DEFAULT_YUM_VARS_DIR from convert2rhel.systeminfo import system_info -from convert2rhel.toolopts import _should_subscribe, tool_opts +from convert2rhel.toolopts import tool_opts loggerinst = logging.getLogger(__name__) @@ -982,19 +981,3 @@ def get_rhsm_facts(): "Failed to get the RHSM facts : %s." % e, ) return rhsm_facts - - -# subscription is the natural place to look for should_subscribe but it -# is needed by toolopts. So define it as a private function in toolopts but -# create a public identifier to access it here. - -#: Whether we should subscribe the system with subscription-manager. -#: -#: If the user has specified some way to authenticate with subscription-manager -#: then we need to subscribe the system. If not, the assumption is that the -#: user has already subscribed the system or that this machine does not need to -#: subscribe to rhsm in order to get the RHEL rpm packages. -#: -#: :returns: Returns True if we need to subscribe the system, otherwise return False. -#: :rtype: bool -should_subscribe = partial(_should_subscribe, tool_opts) diff --git a/convert2rhel/systeminfo.py b/convert2rhel/systeminfo.py index 8f74aa7235..dbda8fb13c 100644 --- a/convert2rhel/systeminfo.py +++ b/convert2rhel/systeminfo.py @@ -28,7 +28,8 @@ from six.moves import configparser from convert2rhel import logger, utils -from convert2rhel.toolopts import POST_RPM_VA_LOG_FILENAME, PRE_RPM_VA_LOG_FILENAME, tool_opts +from convert2rhel.cli import POST_RPM_VA_LOG_FILENAME, PRE_RPM_VA_LOG_FILENAME +from convert2rhel.toolopts import tool_opts from convert2rhel.utils import run_subprocess diff --git a/convert2rhel/toolopts.py b/convert2rhel/toolopts.py index 3328f9aea7..fe50e02a35 100644 --- a/convert2rhel/toolopts.py +++ b/convert2rhel/toolopts.py @@ -16,23 +16,16 @@ # along with this program. If not, see . __metaclass__ = type -import argparse import copy import logging import os import re -import sys from six.moves import configparser, urllib -from convert2rhel import __version__, utils +from convert2rhel.utils.subscription import setup_rhsm_parts -loggerinst = logging.getLogger(__name__) - -# Paths for configuration files -CONFIG_PATHS = ["~/.convert2rhel.ini", "/etc/convert2rhel.ini"] - #: Map name of the convert2rhel mode to run in from the command line to the #: activity name that we use in the code and breadcrumbs. CLI commands should #: be verbs but an activity is a noun. @@ -42,26 +35,8 @@ "analyse": "analysis", } -ARGS_WITH_VALUES = [ - "-u", - "--username", - "-p", - "--password", - "-k", - "--activationkey", - "-o", - "--org", - "--pool", - "--serverurl", -] -PARENT_ARGS = ["--debug", "--help", "-h", "--version"] - -# For a list of modified rpm files before the conversion starts -PRE_RPM_VA_LOG_FILENAME = "rpm_va.log" - -# For a list of modified rpm files after the conversion finishes for comparison purposes -POST_RPM_VA_LOG_FILENAME = "rpm_va_after_conversion.log" - +# Mapping of supported headers and options for each configuration in the +# `convert2rhel.ini` file we support. CONFIG_FILE_MAPPING_OPTIONS = { "subscription_manager": ["username", "password", "org", "activation_key"], "settings": [ @@ -75,662 +50,194 @@ ], } +loggerinst = logging.getLogger(__name__) -class ToolOpts: - def __init__(self): - self.debug = False - self.username = None - self.config_file = None - self.password = None - self.no_rhsm = False - self.enablerepo = [] - self.disablerepo = [] - self.pool = None - self.rhsm_hostname = None - self.rhsm_port = None - self.rhsm_prefix = None - self.autoaccept = None - self.auto_attach = None - self.restart = None - self.activation_key = None - self.org = None - self.arch = None - self.no_rpm_va = False - self.eus = False - self.els = False - self.activity = None - - # Settings - self.incomplete_rollback = None - self.tainted_kernel_module_check_skip = None - self.outdated_package_check_skip = None - self.allow_older_version = None - self.allow_unavailable_kmods = None - self.configure_host_metering = None - self.skip_kernel_currency_check = None - - def set_opts(self, supported_opts): - """Set ToolOpts data using dict with values from config file. - :param supported_opts: Supported options in config file +class BaseConfig: + debug = False + username = None + config_file = None + password = None + no_rhsm = False + enablerepo = [] + disablerepo = [] + pool = None + rhsm_hostname = None + rhsm_port = None + rhsm_prefix = None + autoaccept = None + auto_attach = None + restart = None + activation_key = None + org = None + arch = None + no_rpm_va = False + eus = False + els = False + activity = None + + # Settings + incomplete_rollback = None + tainted_kernel_module_check_skip = None + outdated_package_check_skip = None + allow_older_version = None + allow_unavailable_kmods = None + configure_host_metering = None + skip_kernel_currency_check = None + + def set_opts(self, opts): + """Set ToolOpts data using dict with values from config file. + :param opts: Supported options in config file """ - for key, value in supported_opts.items(): - if value and hasattr(self, key): - setattr(self, key, value) - - -class CLI: - def __init__(self): - self._parser = self._get_argparser() - self._shared_options_parser = argparse.ArgumentParser(add_help=False) - # Duplicating parent options here as we want to make it - # available for any other basic operation that we run without a - # subcommand in mind, and, it is a shared option so we can share it - # between any subcommands we may create in the future. - self._register_parent_options(self._parser) - self._register_parent_options(self._shared_options_parser) - self._register_options() - self._process_cli_options() - - @staticmethod - def usage(subcommand_to_print=""): - # Override the subcommand_to_print parameter if the tool has been executed through CLI but without - # subcommand specified. This is to make sure that runnning `convert2rhel --help` on the CLI will print the - # usage with generic , while the manpage generated using argparse_manpage will be able to print the - # usage correctly for subcommands as it does not execute convert2rhel from the CLI. - subcommand_not_used_on_cli = "/usr/bin/convert2rhel" in sys.argv[0] and not _subcommand_used(sys.argv) - if subcommand_not_used_on_cli: - subcommand_to_print = "" - usage = ( - "\n" - " convert2rhel [--version] [-h]\n" - " convert2rhel {subcommand} [-u username] [-p password | -c conf_file_path] [--pool pool_id | -a] [--disablerepo repoid]" - " [--enablerepo repoid] [--serverurl url] [--no-rpm-va] [--eus] [--els] [--debug] [--restart] [-y]\n" - " convert2rhel {subcommand} [--no-rhsm] [--disablerepo repoid] [--enablerepo repoid] [--no-rpm-va] [--eus] [--els] [--debug] [--restart] [-y]\n" - " convert2rhel {subcommand} [-k activation_key | -c conf_file_path] [-o organization] [--pool pool_id | -a] [--disablerepo repoid] [--enablerepo" - " repoid] [--serverurl url] [--no-rpm-va] [--eus] [--els] [--debug] [--restart] [-y]\n" - ).format(subcommand=subcommand_to_print) - - if subcommand_not_used_on_cli: - usage = usage + "\n Subcommands: analyze, convert" - return usage - - def _get_argparser(self): - return argparse.ArgumentParser(conflict_handler="resolve", usage=self.usage()) - - def _register_commands(self): - """Configures parsers specific to the analyze and convert subcommands""" - subparsers = self._parser.add_subparsers(title="Subcommands", dest="command") - self._analyze_parser = subparsers.add_parser( - "analyze", - help="Run all Convert2RHEL initial checks up until the" - " Point of no Return (PONR) and generate a report with the findings." - " A rollback is initiated after the checks to put the system back" - " in the original state.", - parents=[self._shared_options_parser], - usage=self.usage(subcommand_to_print="analyze"), - ) - self._convert_parser = subparsers.add_parser( - "convert", - help="Convert the system. If no subcommand is given, 'convert' is used as a default.", - parents=[self._shared_options_parser], - usage=self.usage(subcommand_to_print="convert"), - ) - - @staticmethod - def _register_parent_options(parser): - """Prescribe what parent command line options the tool accepts.""" - parser.add_argument( - "--version", - action="version", - version=__version__, - help="Show convert2rhel version and exit.", - ) - parser.add_argument( - "--debug", - action="store_true", - help="Print traceback in case of an abnormal exit and messages that could help find an issue.", - ) - - def _register_options(self): - """Prescribe what command line options the tool accepts.""" - self._parser.add_argument( - "-h", - "--help", - action="help", - help="Show help message and exit.", - ) - self._shared_options_parser.add_argument( - "--no-rpm-va", - action="store_true", - help="Skip gathering changed rpm files using" - " 'rpm -Va'. By default it's performed before and after the conversion with the output" - " stored in log files %s and %s. At the end of the conversion, these logs are compared" - " to show you what rpm files have been affected by the conversion." - " Cannot be used with analyze subcommand." - " The environment variable CONVERT2RHEL_INCOMPLETE_ROLLBACK" - " needs to be set to 1 to use this argument." % (PRE_RPM_VA_LOG_FILENAME, POST_RPM_VA_LOG_FILENAME), - ) - self._shared_options_parser.add_argument( - "--eus", - action="store_true", - help="Explicitly recognize the system as eus, utilizing eus repos." - " This option is meant for el8.8+ systems.", - ) - self._shared_options_parser.add_argument( - "--els", - action="store_true", - help="Explicitly recognize the system as els, utilizing els repos." - " This option is meant for el7 systems.", - ) - self._shared_options_parser.add_argument( - "--enablerepo", - metavar="repoidglob", - action="append", - help="Enable specific" - " repositories by ID or glob. For more repositories to enable, use this option" - " multiple times. If you don't use the --no-rhsm option, you can use this option" - " to override the default RHEL repoids that convert2rhel enables through" - " subscription-manager.", - ) - self._shared_options_parser.add_argument( - "--disablerepo", - metavar="repoidglob", - action="append", - help="Disable specific" - " repositories by ID or glob. For more repositories to disable, use this option" - " multiple times. This option defaults to all repositories ('*').", - ) - self._shared_options_parser.add_argument( - "-r", - "--restart", - help="Restart the system when it is successfully converted to RHEL to boot the new RHEL kernel." - " It has no effect when used with the 'analyze' subcommand.", - action="store_true", - ) - self._shared_options_parser.add_argument( - "-y", - help="Answer yes to all yes/no questions the tool asks.", - action="store_true", - ) - self._add_subscription_manager_options() - self._add_alternative_installation_options() - self._register_commands() - - def _add_alternative_installation_options(self): - """Prescribe what alternative command line options the tool accepts.""" - group = self._shared_options_parser.add_argument_group( - title="Alternative Installation Options", - description="The following options are required if you do not intend on using subscription-manager.", - ) - group.add_argument( - "--no-rhsm", - action="store_true", - help="Do not use the subscription-manager, use custom repositories instead. See --enablerepo/--disablerepo" - " options. Without this option, the subscription-manager is used to access RHEL repositories by default." - " Using this option requires to have the --enablerepo specified.", - ) - - def _add_subscription_manager_options(self): - """Prescribe what subscription manager command line options the tool accepts.""" - group = self._shared_options_parser.add_argument_group( - title="Subscription Manager Options", - description="The following options are specific to using subscription-manager.", - ) - group.add_argument( - "-u", - "--username", - help="Username for the" - " subscription-manager. If neither --username nor" - " --activation-key option is used, the user" - " is asked to enter the username.", - ) - group.add_argument( - "-p", - "--password", - help="Password for the" - " subscription-manager. If --password, --config-file or --activationkey are not" - " used, the user is asked to enter the password." - " We recommend using the --config-file option instead to prevent leaking the password" - " through a list of running processes.", - ) - group.add_argument( - "-k", - "--activationkey", - help="Activation key used" - " for the system registration by the" - " subscription-manager. It requires to have the --org" - " option specified." - " We recommend using the --config-file option instead to prevent leaking the activation key" - " through a list of running processes.", - ) - group.add_argument( - "-o", - "--org", - help="Organization with which the" - " system will be registered by the" - " subscription-manager. A list of available" - " organizations is possible to obtain by running" - " 'subscription-manager orgs'. From the listed pairs" - " Name:Key, use the Key here.", - ) - group.add_argument( - "-c", - "--config-file", - help="The configuration file is an optional way to safely pass either a user password or an activation key" - " to the subscription-manager to register the system. This is more secure than passing these values" - " through the --activationkey or --password option, which might leak the values" - " through a list of running processes." - " You can edit the pre-installed configuration file template at /etc/convert2rhel.ini or create a new" - " configuration file at ~/.convert2rhel.ini. The convert2rhel utility loads the configuration from either" - " of those locations, the latter having preference over the former. Alternatively, you can specify a path" - " to the configuration file using the --config-file option to override other configurations.", - ) - group.add_argument( - "-a", - "--auto-attach", - help="Automatically attach compatible subscriptions to the system.", - action="store_true", - ) - group.add_argument( - "--pool", - help="Subscription pool ID. A list of the available" - " subscriptions is possible to obtain by running" - " 'subscription-manager list --available'." - " If no pool ID is provided, the --auto option is used", - ) - group.add_argument( - "--serverurl", - help="Hostname of the subscription service to be used when registering the system with" - " subscription-manager. The default is the Customer Portal Subscription Management service" - " (subscription.rhsm.redhat.com). It is not to be used to specify a Satellite server. For that, read" - " the product documentation at https://access.redhat.com/.", - ) - - def _process_cli_options(self): - """Process command line options used with the tool.""" - _log_command_used() - - # algorithm function to properly organize all CLI args - argv = _add_default_command(sys.argv[1:]) - parsed_opts = self._parser.parse_args(argv) - - if parsed_opts.debug: - tool_opts.debug = True - - # Once we use a subcommand to set the activity that convert2rhel will perform - tool_opts.activity = _COMMAND_TO_ACTIVITY[parsed_opts.command] - - # Processing the configuration file - conf_file_opts = options_from_config_files(parsed_opts.config_file) - tool_opts.set_opts(conf_file_opts) - config_opts = copy.copy(tool_opts) - tool_opts.config_file = parsed_opts.config_file - # corner case: password on CLI and activation-key in the config file - # password from CLI has precedence and activation_key and org must be deleted (unused) - if config_opts.activation_key and parsed_opts.password: - tool_opts.activation_key = None - tool_opts.org = None - - if parsed_opts.no_rpm_va: - if tool_opts.activity == "analysis": - loggerinst.warning( - "We will proceed with ignoring the --no-rpm-va option as running rpm -Va" - " in the analysis mode is essential for a complete rollback to the original" - " system state at the end of the analysis." - ) - elif os.getenv("CONVERT2RHEL_INCOMPLETE_ROLLBACK", None): - tool_opts.no_rpm_va = True - else: - message = ( - "We need to run the 'rpm -Va' command to be able to perform a complete rollback of changes" - " done to the system during the pre-conversion analysis. If you accept the risk of an" - " incomplete rollback, set the CONVERT2RHEL_INCOMPLETE_ROLLBACK=1 environment" - " variable. Otherwise, remove the --no-rpm-va option." - ) - loggerinst.critical(message) - - if parsed_opts.username: - tool_opts.username = parsed_opts.username - - if parsed_opts.password: - tool_opts.password = parsed_opts.password - - if parsed_opts.enablerepo: - tool_opts.enablerepo = parsed_opts.enablerepo - if parsed_opts.disablerepo: - tool_opts.disablerepo = parsed_opts.disablerepo - - # Check if we have duplicate repositories specified - if parsed_opts.enablerepo or parsed_opts.disablerepo: - duplicate_repos = set(tool_opts.disablerepo) & set(tool_opts.enablerepo) - if duplicate_repos: - message = "Duplicate repositories were found across disablerepo and enablerepo options:" - for repo in duplicate_repos: - message += "\n%s" % repo - message += "\nThis ambiguity may have unintended consequences." - loggerinst.warning(message) - - if parsed_opts.no_rhsm: - tool_opts.no_rhsm = True - if not tool_opts.enablerepo: - loggerinst.critical("The --enablerepo option is required when --no-rhsm is used.") - - if parsed_opts.eus: - tool_opts.eus = True - - if parsed_opts.els: - tool_opts.els = True - - if not tool_opts.disablerepo: - # Default to disable every repo except: - # - the ones passed through --enablerepo - # - the ones enabled through subscription-manager based on convert2rhel config files - tool_opts.disablerepo = ["*"] - - if parsed_opts.pool: - tool_opts.pool = parsed_opts.pool - - if parsed_opts.activationkey: - tool_opts.activation_key = parsed_opts.activationkey - - if parsed_opts.org: - tool_opts.org = parsed_opts.org - - if parsed_opts.serverurl: - if tool_opts.no_rhsm: - loggerinst.warning("Ignoring the --serverurl option. It has no effect when --no-rhsm is used.") - # WARNING: We cannot use the following helper until after no_rhsm, - # username, password, activation_key, and organization have been set. - elif not _should_subscribe(tool_opts): - loggerinst.warning( - "Ignoring the --serverurl option. It has no effect when no credentials to subscribe the system were given." - ) - else: - # Parse the serverurl and save the components. - try: - url_parts = _parse_subscription_manager_serverurl(parsed_opts.serverurl) - url_parts = _validate_serverurl_parsing(url_parts) - except ValueError as e: - # If we fail to parse, fail the conversion. The reason for - # this harsh treatment is that we will be submitting - # credentials to the server parsed from the serverurl. If - # the user is specifying an internal subscription-manager - # server but typo the url, we would fallback to the - # public red hat subscription-manager server. That would - # mean the user thinks the credentials are being passed - # to their internal subscription-manager server but it - # would really be passed externally. That's not a good - # security practice. - loggerinst.critical( - "Failed to parse a valid subscription-manager server from the --serverurl option.\n" - "Please check for typos and run convert2rhel again with a corrected --serverurl.\n" - "Supplied serverurl: %s\nError: %s" % (parsed_opts.serverurl, e) - ) + for key, value in opts.items(): + if value and hasattr(BaseConfig, key): + setattr(BaseConfig, key, value) - tool_opts.rhsm_hostname = url_parts.hostname - - if url_parts.port: - # urllib.parse.urlsplit() converts this into an int but we - # always use it as a str - tool_opts.rhsm_port = str(url_parts.port) - - if url_parts.path: - tool_opts.rhsm_prefix = url_parts.path - tool_opts.autoaccept = parsed_opts.y - tool_opts.auto_attach = parsed_opts.auto_attach +class FileConfig(BaseConfig): + def __init__(self, config_files=("~/.convert2rhel.ini", "/etc/convert2rhel.ini")): + self._config_files = config_files - # conversion only options - if tool_opts.activity == "conversion": - tool_opts.restart = parsed_opts.restart + def run(self): + opts = self.options_from_config_files() + self.set_opts(opts) - # Security notice - if tool_opts.password or tool_opts.activation_key: - loggerinst.warning( - "Passing the RHSM password or activation key through the --activationkey or --password options is" - " insecure as it leaks the values through the list of running processes. We recommend using the safer" - " --config-file option instead." - ) - - # Checks of multiple authentication sources - if tool_opts.password and tool_opts.activation_key: - loggerinst.warning( - "Either a password or an activation key can be used for system registration." - " We're going to use the activation key." - ) - - # Config files matches - if config_opts.username and parsed_opts.username: - loggerinst.warning( - "You have passed the RHSM username through both the command line and the" - " configuration file. We're going to use the command line values." - ) - - if config_opts.org and parsed_opts.org: - loggerinst.warning( - "You have passed the RHSM org through both the command line and the" - " configuration file. We're going to use the command line values." - ) - - if (config_opts.activation_key or config_opts.password) and (parsed_opts.activationkey or parsed_opts.password): - loggerinst.warning( - "You have passed either the RHSM password or activation key through both the command line and the" - " configuration file. We're going to use the command line values." - ) - - if (tool_opts.org and not tool_opts.activation_key) or (not tool_opts.org and tool_opts.activation_key): - loggerinst.critical( - "Either the --org or the --activationkey option is missing. You can't use one without the other." - ) - - if (parsed_opts.password or config_opts.password) and not (parsed_opts.username or config_opts.username): - loggerinst.warning( - "You have passed the RHSM password without an associated username. Please provide a username together" - " with the password." - ) - - if (parsed_opts.username or config_opts.username) and not (parsed_opts.password or config_opts.password): - loggerinst.warning( - "You have passed the RHSM username without an associated password. Please provide a password together" - " with the username." - ) - - -def options_from_config_files(cfg_path): - """Parse the convert2rhel.ini configuration file. - - This function will try to parse the convert2rhel.ini configuration file and - return a dictionary containing the values found in the file. - - .. note:: - This function will parse the configuration file in the following way: - - 1) If the path provided by the user in cfg_path is set (Highest - priority), then we use only that. - - Otherwise, if cfg_path is `None`, we proceed to check the following - paths: - - 2) ~/.convert2rhel.ini (The 2nd highest priority). - 3) /etc/convert2rhel.ini (The lowest priority). - - In any case, they are parsed in reversed order, meaning that we will - start with the lowest priority and go until the highest. - - :param cfg_path: Path of a custom configuration file - :type cfg_path: str - - :return: Dict with the supported options alongside their values. - :rtype: dict[str, str] - """ - # Paths for the configuration files. In case we have cfg_path defined - # (meaning that the user entered something through the `-c` option), we - # will use only that, as it has a higher priority over the rest - config_paths = [cfg_path] if cfg_path else CONFIG_PATHS - paths = [os.path.expanduser(path) for path in config_paths if os.path.exists(os.path.expanduser(path))] - - if cfg_path and not paths: - raise FileNotFoundError("No such file or directory: %s" % ", ".join(paths)) - - found_opts = _parse_options_from_config(paths) - return found_opts - - -def _parse_options_from_config(paths): - """Parse the options from the given config files. - - .. note:: - If no configuration file is provided through the command line option - (`-c`), we will use the default paths and follow their priority. - - :param paths: List of paths to iterate through and gather the options from - them. - :type paths: list[str] - """ - config_file = configparser.ConfigParser() - found_opts = {} - - for path in reversed(paths): - loggerinst.debug("Checking configuration file at %s" % path) - # Check for correct permissions on file - if not oct(os.stat(path).st_mode)[-4:].endswith("00"): - loggerinst.critical("The %s file must only be accessible by the owner (0600)" % path) - - config_file.read(path) - - # Mapping of all supported options we can have in the config file - for supported_header, supported_opts in CONFIG_FILE_MAPPING_OPTIONS.items(): - loggerinst.debug("Checking for header '%s'" % supported_header) - if supported_header not in config_file.sections(): - loggerinst.warning("Couldn't find header '%s' in the configuration file %s." % (supported_header, path)) - continue + def options_from_config_files(self): + """Parse the convert2rhel.ini configuration file. - options = _get_options_value(config_file, supported_header, supported_opts) - found_opts.update(options) + This function will try to parse the convert2rhel.ini configuration file and + return a dictionary containing the values found in the file. - return found_opts + .. note:: + This function will parse the configuration file in the following way: + 1) If the path provided by the user in cfg_path is set (Highest + priority), then we use only that. -def _get_options_value(config_file, header, supported_opts): - """Helper function to iterate through the options in a config file. + Otherwise, if cfg_path is `None`, we proceed to check the following + paths: - :param config_file: An instance of `py:ConfigParser` after reading the file - to iterate through the options. - :type config_file: configparser.ConfigParser - :param header: The header name to get options from. - :type header: str - :param supported_opts: List of supported options that can be parsed from - the config file. - :type supported_opts: list[str] - """ - options = {} - conf_options = config_file.options(header) + 2) ~/.convert2rhel.ini (The 2nd highest priority). + 3) /etc/convert2rhel.ini (The lowest priority). - if len(conf_options) == 0: - loggerinst.debug("No options found for %s. It seems to be empty or commented." % header) - return options - - for option in conf_options: - if option.lower() not in supported_opts: - loggerinst.warning("Unsupported option '%s' in '%s'" % (option, header)) - continue + In any case, they are parsed in reversed order, meaning that we will + start with the lowest priority and go until the highest. - options[option] = config_file.get(header, option).strip('"') - loggerinst.debug("Found %s in %s" % (option, header)) + :param cfg_path: Path of a custom configuration file + :type cfg_path: str - return options + :return: Dict with the supported options alongside their values. + :rtype: dict[str, str] + """ + # Paths for the configuration files. In case we have cfg_path defined + # (meaning that the user entered something through the `-c` option), we + # will use only that, as it has a higher priority over the rest + paths = [os.path.expanduser(path) for path in self._config_files if os.path.exists(os.path.expanduser(path))] + if not paths: + raise FileNotFoundError("No such file or directory: %s" % ", ".join(paths)) -def _log_command_used(): - """We want to log the command used for convert2rhel to make it easier to know what command was used - when debugging the log files. Since we can't differentiate between the handlers we log to both stdout - and the logfile - """ - command = " ".join(utils.hide_secrets(sys.argv)) - loggerinst.info("convert2rhel command used:\n{0}".format(command)) + found_opts = self._parse_options_from_config(paths) + return found_opts + def _parse_options_from_config(self, paths): + """Parse the options from the given config files. -def _parse_subscription_manager_serverurl(serverurl): - """Parse a url string in a manner mostly compatible with subscription-manager --serverurl.""" - # This is an adaptation of what subscription-manager's cli enforces: - # https://github.com/candlepin/subscription-manager/blob/main/src/rhsm/utils.py#L112 + .. note:: + If no configuration file is provided through the command line option + (`-c`), we will use the default paths and follow their priority. - # Don't modify http:// and https:// as they are fine - if not re.match("https?://[^/]+", serverurl): - # Anthing that looks like a malformed scheme is immediately discarded - if re.match("^[^:]+:/.+", serverurl): - raise ValueError("Unable to parse --serverurl. Make sure it starts with http://HOST or https://HOST") + :param paths: List of paths to iterate through and gather the options from + them. + :type paths: list[str] + """ + config_file = configparser.ConfigParser() + found_opts = {} + + for path in reversed(paths): + loggerinst.debug("Checking configuration file at %s" % path) + # Check for correct permissions on file + if not oct(os.stat(path).st_mode)[-4:].endswith("00"): + loggerinst.critical("The %s file must only be accessible by the owner (0600)" % path) + + config_file.read(path) + + # Mapping of all supported options we can have in the config file + for supported_header, supported_opts in CONFIG_FILE_MAPPING_OPTIONS.items(): + loggerinst.debug("Checking for header '%s'" % supported_header) + if supported_header not in config_file.sections(): + loggerinst.warning( + "Couldn't find header '%s' in the configuration file %s." % (supported_header, path) + ) + continue - # If there isn't a scheme, add one now - serverurl = "https://%s" % serverurl + options = self._get_options_value(config_file, supported_header, supported_opts) + found_opts.update(options) - url_parts = urllib.parse.urlsplit(serverurl, allow_fragments=False) + return found_opts - return url_parts + def _get_options_value(self, config_file, header, supported_opts): + """Helper function to iterate through the options in a config file. + :param config_file: An instance of `py:ConfigParser` after reading the file + to iterate through the options. + :type config_file: configparser.ConfigParser + :param header: The header name to get options from. + :type header: str + :param supported_opts: List of supported options that can be parsed from + the config file. + :type supported_opts: list[str] + """ + options = {} + conf_options = config_file.options(header) -def _validate_serverurl_parsing(url_parts): - """ - Perform some tests that we parsed the subscription-manager serverurl successfully. + if len(conf_options) == 0: + loggerinst.debug("No options found for %s. It seems to be empty or commented." % header) + return options - :arg url_parts: The parsed serverurl as returned by urllib.parse.urlsplit() - :raises ValueError: If any of the checks fail. - :returns: url_parts If the check was successful. - """ - if url_parts.scheme not in ("https", "http"): - raise ValueError( - "Subscription manager must be accessed over http or https. %s is not valid" % url_parts.scheme - ) + for option in conf_options: + if option.lower() not in supported_opts: + loggerinst.warning("Unsupported option '%s' in '%s'" % (option, header)) + continue - if not url_parts.hostname: - raise ValueError("A hostname must be specified in a subscription-manager serverurl") + options[option] = config_file.get(header, option).strip('"') + loggerinst.debug("Found %s in %s" % (option, header)) - return url_parts + return options -def _add_default_command(argv): - """Add the default command when none is given""" - subcommand = _subcommand_used(argv) - args = argv - if not subcommand: - args.insert(0, "convert") +class CliConfig(BaseConfig): + def __init__(self, opts): + self._opts = opts - return args + def run(self): + parts = setup_rhsm_parts(self._opts) + opts = vars(self._opts) + opts.update(parts) -def _subcommand_used(args): - """Return what subcommand has been used by the user. Return None if no subcommand has been used.""" - for index, argument in enumerate(args): - if argument in ("convert", "analyze"): - return argument + opts = self._normalize_opts(opts) + self.set_opts(opts) - if not argument in PARENT_ARGS and args[index - 1] in ARGS_WITH_VALUES: - return None + def _normalize_opts(self, opts): + unparsed_opts = copy.copy(opts) + unparsed_opts["activity"] = _COMMAND_TO_ACTIVITY[opts.pop("command", "convert")] + unparsed_opts["restart"] = True if unparsed_opts["activity"] == "conversion" else False + unparsed_opts["disablerepo"] = opts.pop("disablerepo", ["*"]) + return unparsed_opts -def _should_subscribe(tool_opts): - """ - Whether we should subscribe the system with subscription-manager. - If there are no ways to authenticate to subscription-manager, then we will attempt to convert - without subscribing the system. The assumption is that the user has already subscribed the - system or that this machine does not need to subscribe to rhsm in order to get the RHEL rpm - packages. - """ - # No means to authenticate with rhsm. - if not (tool_opts.username and tool_opts.password) and not (tool_opts.activation_key and tool_opts.org): - return False +class ToolOpts(BaseConfig): + def __init__(self, config_sources): + super(ToolOpts, self).__init__() + for config in reversed(config_sources): + config.run() - # --no-rhsm means that there is no need to use any part of rhsm to - # convert this host. (Usually used when you configure - # your RHEL repos another way, like a local mirror and telling - # convert2rhel about it using --enablerepo) - if tool_opts.no_rhsm: - return False - return True +def initialize_toolopts(config_sources): + global tool_opts + return ToolOpts(config_sources=config_sources) -# Code to be executed upon module import -tool_opts = ToolOpts() # pylint: disable=C0103 +tool_opts = None diff --git a/convert2rhel/unit_tests/__init__.py b/convert2rhel/unit_tests/__init__.py index 095ef6bfaf..7c71084a30 100644 --- a/convert2rhel/unit_tests/__init__.py +++ b/convert2rhel/unit_tests/__init__.py @@ -30,6 +30,7 @@ from convert2rhel import ( backup, breadcrumbs, + cli, exceptions, grub, initialize, @@ -587,12 +588,12 @@ class ResolveSystemInfoMocked(MockFunctionObject): # -# toolopts mocks +# cli mocks # class CLIMocked(MockFunctionObject): - spec = toolopts.CLI + spec = cli.CLI # diff --git a/convert2rhel/unit_tests/actions/pre_ponr_changes/backup_system_test.py b/convert2rhel/unit_tests/actions/pre_ponr_changes/backup_system_test.py index 1d58f9e231..322b4921ee 100644 --- a/convert2rhel/unit_tests/actions/pre_ponr_changes/backup_system_test.py +++ b/convert2rhel/unit_tests/actions/pre_ponr_changes/backup_system_test.py @@ -22,11 +22,11 @@ import pytest import six -from convert2rhel import subscription, unit_tests +from convert2rhel import unit_tests from convert2rhel.actions.pre_ponr_changes import backup_system from convert2rhel.backup import files from convert2rhel.backup.files import RestorableFile -from convert2rhel.toolopts import PRE_RPM_VA_LOG_FILENAME +from convert2rhel.cli import PRE_RPM_VA_LOG_FILENAME from convert2rhel.unit_tests import CriticalErrorCallableObject from convert2rhel.unit_tests.conftest import all_systems, centos7, centos8 diff --git a/convert2rhel/unit_tests/actions/pre_ponr_changes/subscription_test.py b/convert2rhel/unit_tests/actions/pre_ponr_changes/subscription_test.py index 8792e15f77..7fa77eb414 100644 --- a/convert2rhel/unit_tests/actions/pre_ponr_changes/subscription_test.py +++ b/convert2rhel/unit_tests/actions/pre_ponr_changes/subscription_test.py @@ -24,7 +24,7 @@ import pytest import six -from convert2rhel import actions, pkghandler, repo, subscription, toolopts, unit_tests, utils +from convert2rhel import actions, cli, pkghandler, repo, subscription, toolopts, unit_tests, utils from convert2rhel.actions import STATUS_CODE from convert2rhel.actions.pre_ponr_changes import subscription as appc_subscription from convert2rhel.actions.pre_ponr_changes.subscription import PreSubscription, SubscribeSystem @@ -228,7 +228,7 @@ def test_subscribe_system_do_not_subscribe(self, global_tool_opts, subscribe_sys # partial saves the real copy of tool_opts to use with # _should_subscribe so we have to monkeypatch with the mocked version # of tool_opts. - monkeypatch.setattr(subscription, "should_subscribe", partial(toolopts._should_subscribe, global_tool_opts)) + monkeypatch.setattr(subscription, "should_subscribe", partial(cli._should_subscribe, global_tool_opts)) monkeypatch.setattr(RestorableSystemSubscription, "enable", mock.Mock()) monkeypatch.setattr(repo, "get_rhel_repoids", mock.Mock()) monkeypatch.setattr(RestorableDisableRepositories, "enable", mock.Mock()) @@ -249,7 +249,7 @@ def test_subscribe_system_no_rhsm_option_detected( # partial saves the real copy of tool_opts to use with # _should_subscribe so we have to monkeypatch with the mocked version # of tool_opts. - monkeypatch.setattr(subscription, "should_subscribe", partial(toolopts._should_subscribe, global_tool_opts)) + monkeypatch.setattr(subscription, "should_subscribe", partial(cli._should_subscribe, global_tool_opts)) expected = set( ( diff --git a/convert2rhel/unit_tests/cli_test.py b/convert2rhel/unit_tests/cli_test.py new file mode 100644 index 0000000000..b869d77f63 --- /dev/null +++ b/convert2rhel/unit_tests/cli_test.py @@ -0,0 +1,848 @@ +import os +import sys + +from collections import namedtuple + +import pytest +import six + +from convert2rhel import cli, toolopts, utils + + +six.add_move(six.MovedModule("mock", "mock", "unittest.mock")) +from six.moves import mock + + +def mock_cli_arguments(args): + """ + Return a list of cli arguments where the first one is always the name of + the executable, followed by 'args'. + """ + return sys.argv[0:1] + args + + +@pytest.fixture(autouse=True) +def apply_global_tool_opts(monkeypatch, global_tool_opts): + monkeypatch.setattr(cli, "tool_opts", global_tool_opts) + + +class TestTooloptsParseFromCLI: + def test_cmdline_interactive_username_without_passwd(self, monkeypatch): + monkeypatch.setattr(sys, "argv", mock_cli_arguments(["--username", "uname"])) + cli.CLI() + assert cli.tool_opts.username == "uname" + + def test_cmdline_interactive_passwd_without_uname(self, monkeypatch): + monkeypatch.setattr(sys, "argv", mock_cli_arguments(["--password", "passwd"])) + cli.CLI() + assert cli.tool_opts.password == "passwd" + + def test_cmdline_non_interactive_with_credentials(self, monkeypatch): + monkeypatch.setattr(sys, "argv", mock_cli_arguments(["--username", "uname", "--password", "passwd"])) + cli.CLI() + assert cli.tool_opts.username == "uname" + assert cli.tool_opts.password == "passwd" + + def test_cmdline_disablerepo_defaults_to_asterisk(self, monkeypatch): + monkeypatch.setattr(sys, "argv", mock_cli_arguments(["--enablerepo", "foo"])) + cli.CLI() + assert cli.tool_opts.enablerepo == ["foo"] + assert cli.tool_opts.disablerepo == ["*"] + + # Parsing of serverurl + + @pytest.mark.parametrize( + ("serverurl", "hostname", "port", "prefix"), + ( + ("https://rhsm.redhat.com:443/", "rhsm.redhat.com", "443", "/"), + ("https://localhost/rhsm/", "localhost", None, "/rhsm/"), + ("https://rhsm.redhat.com/", "rhsm.redhat.com", None, "/"), + ("https://rhsm.redhat.com", "rhsm.redhat.com", None, None), + ("https://rhsm.redhat.com:8443", "rhsm.redhat.com", "8443", None), + ("subscription.redhat.com", "subscription.redhat.com", None, None), + ), + ) + def test_custom_serverurl(self, monkeypatch, global_tool_opts, serverurl, hostname, port, prefix): + monkeypatch.setattr(cli, "tool_opts", global_tool_opts) + monkeypatch.setattr( + sys, + "argv", + mock_cli_arguments(["--serverurl", serverurl, "--username", "User1", "--password", "Password1"]), + ) + cli.CLI() + assert global_tool_opts.rhsm_hostname == hostname + assert global_tool_opts.rhsm_port == port + assert global_tool_opts.rhsm_prefix == prefix + + def test_no_serverurl(self, monkeypatch, global_tool_opts): + monkeypatch.setattr(sys, "argv", mock_cli_arguments([])) + cli.CLI() + assert global_tool_opts.rhsm_hostname is None + assert global_tool_opts.rhsm_port is None + assert global_tool_opts.rhsm_prefix is None + + @pytest.mark.parametrize( + "serverurl", + ( + "gopher://subscription.rhsm.redhat.com/", + "https:///", + "https://", + "/", + ), + ) + def test_bad_serverurl(self, caplog, monkeypatch, global_tool_opts, serverurl): + monkeypatch.setattr(sys, "argv", mock_cli_arguments(["--serverurl", serverurl, "-o", "MyOrg", "-k", "012335"])) + + with pytest.raises(SystemExit): + cli.CLI() + + message = ( + "Failed to parse a valid subscription-manager server from the --serverurl option.\n" + "Please check for typos and run convert2rhel again with a corrected --serverurl.\n" + "Supplied serverurl: %s\nError: " % serverurl + ) + assert message in caplog.records[-1].message + assert caplog.records[-1].levelname == "CRITICAL" + + def test_serverurl_with_no_rhsm(self, caplog, monkeypatch, global_tool_opts): + monkeypatch.setattr( + sys, "argv", mock_cli_arguments(["--serverurl", "localhost", "--no-rhsm", "--enablerepo", "testrepo"]) + ) + + cli.CLI() + + message = "Ignoring the --serverurl option. It has no effect when --no-rhsm is used." + assert message in caplog.text + + def test_serverurl_with_no_rhsm_credentials(self, caplog, monkeypatch, global_tool_opts): + monkeypatch.setattr(sys, "argv", mock_cli_arguments(["--serverurl", "localhost"])) + + cli.CLI() + + message = ( + "Ignoring the --serverurl option. It has no effect when no credentials to" + " subscribe the system were given." + ) + assert message in caplog.text + + +@pytest.mark.parametrize( + ("argv", "raise_exception", "no_rhsm_value"), + ( + (mock_cli_arguments(["--no-rhsm"]), True, True), + (mock_cli_arguments(["--no-rhsm", "--enablerepo", "test_repo"]), False, True), + ), +) +@mock.patch("toolopts.tool_opts.no_rhsm", False) +@mock.patch("toolopts.tool_opts.enablerepo", []) +def test_no_rhsm_option_work(argv, raise_exception, no_rhsm_value, monkeypatch, caplog, global_tool_opts): + monkeypatch.setattr(cli, "tool_opts", global_tool_opts) + monkeypatch.setattr(sys, "argv", argv) + + if raise_exception: + with pytest.raises(SystemExit): + cli.CLI() + assert "The --enablerepo option is required when --no-rhsm is used." in caplog.text + else: + cli.CLI() + + assert toolopts.tool_opts.no_rhsm == no_rhsm_value + + +@pytest.mark.parametrize( + ("argv", "content", "output", "message"), + ( + # pytest.param( + # mock_cli_arguments([]), + # """ + # [subscription_manager] + # username=conf_user + # password=conf_pass + # activation_key=conf_key + # org=conf_org + # """, + # {"username": "conf_user", "password": "conf_pass", "activation_key": "conf_key", "org": "conf_org"}, + # None, + # id="All values set in config", + # ), + # ( + # mock_cli_arguments([]), + # """ + # [subscription_manager] + # password=conf_pass + # """, + # {"password": "conf_pass"}, + # None, + # ), + ( + mock_cli_arguments([]), + """ + [subscription_manager] + password = conf_pass + [settings] + incomplete_rollback = 1 + """, + {"password": "conf_pass", "settings": "1"}, + None, + ), + # ( + # mock_cli_arguments(["-p", "password"]), + # """ + # [subscription_manager] + # activation_key=conf_key + # """, + # {"password": "password"}, + # "You have passed either the RHSM password or activation key through both the command line and" + # " the configuration file. We're going to use the command line values.", + # ), + # ( + # mock_cli_arguments(["-k", "activation_key", "-o", "org"]), + # """ + # [subscription_manager] + # activation_key=conf_key + # """, + # {"activation_key": "activation_key"}, + # "You have passed either the RHSM password or activation key through both the command line and" + # " the configuration file. We're going to use the command line values.", + # ), + # ( + # mock_cli_arguments(["-k", "activation_key", "-o", "org"]), + # """ + # [subscription_manager] + # password=conf_pass + # """, + # {"password": "conf_pass", "activation_key": "activation_key"}, + # "You have passed either the RHSM password or activation key through both the command line and" + # " the configuration file. We're going to use the command line values.", + # ), + # ( + # mock_cli_arguments(["-k", "activation_key", "-p", "password", "-o", "org"]), + # """ + # [subscription_manager] + # password=conf_pass + # activation_key=conf_key + # """, + # {"password": "password", "activation_key": "activation_key"}, + # "You have passed either the RHSM password or activation key through both the command line and" + # " the configuration file. We're going to use the command line values.", + # ), + # ( + # mock_cli_arguments(["-o", "org"]), + # """ + # [subscription_manager] + # password=conf_pass + # activation_key=conf_key + # """, + # {"password": "conf_pass", "activation_key": "conf_key"}, + # "Either a password or an activation key can be used for system registration. We're going to use the" + # " activation key.", + # ), + # ( + # mock_cli_arguments(["-u", "McLOVIN"]), + # """ + # [subscription_manager] + # username=NotMcLOVIN + # """, + # {"username": "McLOVIN"}, + # "You have passed the RHSM username through both the command line and" + # " the configuration file. We're going to use the command line values.", + # ), + # ( + # mock_cli_arguments(["-o", "some-org"]), + # """ + # [subscription_manager] + # org=a-different-org + # activation_key=conf_key + # """, + # {"org": "some-org"}, + # "You have passed the RHSM org through both the command line and" + # " the configuration file. We're going to use the command line values.", + # ), + ), +) +def test_config_file(argv, content, output, message, monkeypatch, tmpdir, caplog): + # After each test there were left data from previous + # Re-init needed delete the set data + path = os.path.join(str(tmpdir), "convert2rhel.ini") + with open(path, "w") as file: + file.write(content) + os.chmod(path, 0o600) + + monkeypatch.setattr(sys, "argv", argv) + monkeypatch.setattr(cli, "CONFIG_PATHS", value=[path]) + cli.CLI() + + if "activation_key" in output: + assert cli.tool_opts.activation_key == output["activation_key"] + + if "password" in output: + assert cli.tool_opts.password == output["password"] + + if "username" in output: + assert cli.tool_opts.username == output["username"] + + if "org" in output: + assert cli.tool_opts.org == output["org"] + + if message: + assert message in caplog.text + + +@pytest.mark.parametrize( + ("argv", "content", "message", "output"), + ( + ( + mock_cli_arguments(["--password", "pass", "--config-file"]), + "[subscription_manager]\nactivation_key = key_cnf_file", + "You have passed either the RHSM password or activation key through both the command line and" + " the configuration file. We're going to use the command line values.", + {"password": "pass", "activation_key": None}, + ), + ), +) +def test_multiple_auth_src_combined(argv, content, message, output, caplog, monkeypatch, tmpdir, global_tool_opts): + """Test combination of password file or configuration file and CLI arguments.""" + path = os.path.join(str(tmpdir), "convert2rhel.file") + with open(path, "w") as file: + file.write(content) + os.chmod(path, 0o600) + # The path for file is the last argument + argv.append(path) + + monkeypatch.setattr(sys, "argv", argv) + monkeypatch.setattr(cli, "CONFIG_PATHS", value=[""]) + cli.CLI() + + assert message in caplog.text + assert cli.tool_opts.activation_key == output["activation_key"] + assert cli.tool_opts.password == output["password"] + + +@pytest.mark.parametrize( + ("argv", "message", "output"), + ( + ( + mock_cli_arguments(["--password", "pass", "--activationkey", "key", "-o", "org"]), + "Either a password or an activation key can be used for system registration." + " We're going to use the activation key.", + {"password": "pass", "activation_key": "key"}, + ), + ), +) +def test_multiple_auth_src_cli(argv, message, output, caplog, monkeypatch, global_tool_opts): + """Test both auth methods in CLI.""" + monkeypatch.setattr(sys, "argv", argv) + monkeypatch.setattr(cli, "CONFIG_PATHS", value=[""]) + cli.CLI() + + assert message in caplog.text + assert cli.tool_opts.activation_key == output["activation_key"] + assert cli.tool_opts.password == output["password"] + + +@pytest.mark.parametrize( + ("content", "expected_message"), + ( + ( + """ + [subscription_manager] + incorect_option = yes + """, + "Unsupported option", + ), + ( + """ + [invalid_header] + username = correct_username + """, + "Couldn't find header", + ), + ), +) +def test_options_from_config_files_invalid_head_and_options(content, expected_message, tmpdir, caplog): + path = os.path.join(str(tmpdir), "convert2rhel.ini") + + with open(path, "w") as file: + file.write(content) + os.chmod(path, 0o600) + + opts = cli.options_from_config_files(path) + + assert expected_message in caplog.text + + +@pytest.mark.parametrize( + ("content", "expected_message"), + ( + ( + """ + [subscription_manager] + """, + "No options found for subscription_manager. It seems to be empty or commented.", + ), + ), +) +def test_options_from_config_files_commented_out_options(content, expected_message, tmpdir, caplog): + path = os.path.join(str(tmpdir), "convert2rhel.ini") + + with open(path, "w") as file: + file.write(content) + os.chmod(path, 0o600) + + cli.options_from_config_files(path) + assert expected_message in caplog.text + + +@pytest.mark.parametrize( + ("content", "output"), + ( + ( + """ + [subscription_manager] + username = correct_username + """, + {"username": "correct_username"}, + ), + # Test if we will unquote this correctly + ( + """ + [subscription_manager] + username = "correct_username" + """, + {"username": "correct_username"}, + ), + ( + """ + [subscription_manager] + password = correct_password + """, + {"password": "correct_password"}, + ), + ( + """ + [subscription_manager] + activation_key = correct_key + password = correct_password + username = correct_username + org = correct_org + """, + { + "username": "correct_username", + "password": "correct_password", + "activation_key": "correct_key", + "org": "correct_org", + }, + ), + ( + """ + [subscription_manager] + org = correct_org + """, + {"org": "correct_org"}, + ), + ( + """ + [settings] + incomplete_rollback = 1 + """, + {"incomplete_rollback": "1"}, + ), + ( + """ + [subscription_manager] + org = correct_org + + [settings] + incomplete_rollback = 1 + """, + {"org": "correct_org", "incomplete_rollback": "1"}, + ), + ( + """ + [settings] + incomplete_rollback = 1 + tainted_kernel_module_check_skip = 1 + outdated_package_check_skip = 1 + allow_older_version = 1 + allow_unavailable_kmods = 1 + configure_host_metering = 1 + skip_kernel_currency_check = 1 + """, + { + "incomplete_rollback": "1", + "tainted_kernel_module_check_skip": "1", + "outdated_package_check_skip": "1", + "allow_older_version": "1", + "allow_unavailable_kmods": "1", + "configure_host_metering": "1", + "skip_kernel_currency_check": "1", + }, + ), + ), +) +def test_options_from_config_files_default(content, output, monkeypatch, tmpdir): + """Test config files in default path.""" + path = os.path.join(str(tmpdir), "convert2rhel.ini") + + with open(path, "w") as file: + file.write(content) + os.chmod(path, 0o600) + + paths = ["/nonexisting/path", path] + monkeypatch.setattr(cli, "CONFIG_PATHS", value=paths) + opts = cli.options_from_config_files(None) + + for key in ["username", "password", "activation_key", "org"]: + if key in opts: + assert opts[key] == output[key] + + +@pytest.mark.parametrize( + ("content", "output", "content_lower_priority"), + ( + ( + """ + [subscription_manager] + username = correct_username + activation_key = correct_key + """, + {"username": "correct_username", "password": None, "activation_key": "correct_key", "org": None}, + """ + [subscription_manager] + username = low_prior_username + """, + ), + ( + """ + [subscription_manager] + username = correct_username + activation_key = correct_key + """, + {"username": "correct_username", "password": None, "activation_key": "correct_key", "org": None}, + """ + [subscription_manager] + activation_key = low_prior_key + """, + ), + ( + """ + [subscription_manager] + activation_key = correct_key + org = correct_org""", + {"username": None, "password": None, "activation_key": "correct_key", "org": "correct_org"}, + """ + [subscription_manager] + org = low_prior_org + """, + ), + ( + """ + [subscription_manager] + activation_key = correct_key + Password = correct_password + """, + {"username": None, "password": "correct_password", "activation_key": "correct_key", "org": None}, + """ + [subscription_manager] + password = low_prior_pass + """, + ), + ( + """ + [subscription_manager] + activation_key = correct_key + Password = correct_password + """, + {"username": None, "password": "correct_password", "activation_key": "correct_key", "org": None}, + """ + [INVALID_HEADER] + password = low_prior_pass + """, + ), + ( + """ + [subscription_manager] + activation_key = correct_key + Password = correct_password + """, + {"username": None, "password": "correct_password", "activation_key": "correct_key", "org": None}, + """ + [subscription_manager] + incorrect_option = incorrect_option + """, + ), + ), +) +def test_options_from_config_files_specified(content, output, content_lower_priority, monkeypatch, tmpdir, caplog): + """Test user specified path for config file.""" + path_higher_priority = os.path.join(str(tmpdir), "convert2rhel.ini") + with open(path_higher_priority, "w") as file: + file.write(content) + os.chmod(path_higher_priority, 0o600) + + path_lower_priority = os.path.join(str(tmpdir), "convert2rhel_lower.ini") + with open(path_lower_priority, "w") as file: + file.write(content_lower_priority) + os.chmod(path_lower_priority, 0o600) + + paths = [path_higher_priority, path_lower_priority] + monkeypatch.setattr(cli, "CONFIG_PATHS", value=paths) + + opts = cli.options_from_config_files(None) + + for key in ["username", "password", "activation_key", "org"]: + if key in opts: + assert opts[key] == output[key] + + +UrlParts = namedtuple("UrlParts", ("scheme", "hostname", "port")) + + +@pytest.mark.parametrize( + ("url_parts", "message"), + ( + ( + UrlParts("gopher", "localhost", None), + "Subscription manager must be accessed over http or https. gopher is not valid", + ), + (UrlParts("http", None, None), "A hostname must be specified in a subscription-manager serverurl"), + (UrlParts("http", "", None), "A hostname must be specified in a subscription-manager serverurl"), + ), +) +def test_validate_serverurl_parsing(url_parts, message): + with pytest.raises(ValueError, match=message): + cli._validate_serverurl_parsing(url_parts) + + +def test_log_command_used(caplog, monkeypatch): + obfuscation_string = "*" * 5 + input_command = mock_cli_arguments( + ["--username", "uname", "--password", "123", "--activationkey", "456", "--org", "789"] + ) + expected_command = mock_cli_arguments( + [ + "--username", + obfuscation_string, + "--password", + obfuscation_string, + "--activationkey", + obfuscation_string, + "--org", + obfuscation_string, + ] + ) + monkeypatch.setattr(sys, "argv", input_command) + cli._log_command_used() + + assert " ".join(expected_command) in caplog.records[-1].message + + +@pytest.mark.parametrize( + ("argv", "message"), + ( + # The message is a log of used command + (mock_cli_arguments(["-o", "org", "-k", "key"]), "-o ***** -k *****"), + ( + mock_cli_arguments(["-o", "org"]), + "Either the --org or the --activationkey option is missing. You can't use one without the other.", + ), + ( + mock_cli_arguments(["-k", "key"]), + "Either the --org or the --activationkey option is missing. You can't use one without the other.", + ), + ), +) +def test_org_activation_key_specified(argv, message, monkeypatch, caplog): + monkeypatch.setattr(sys, "argv", argv) + + try: + cli.CLI() + except SystemExit: + # Don't care about the exception, focus on output message + pass + + assert message in caplog.text + + +@pytest.mark.parametrize( + ("argv", "expected"), + ( + (mock_cli_arguments(["convert"]), "conversion"), + (mock_cli_arguments(["analyze"]), "analysis"), + (mock_cli_arguments([]), "conversion"), + ), +) +def test_pre_assessment_set(argv, expected, monkeypatch): + monkeypatch.setattr(sys, "argv", argv) + + cli.CLI() + + assert cli.tool_opts.activity == expected + + +@pytest.mark.parametrize( + ("argv", "expected"), + ( + ( + mock_cli_arguments(["--disablerepo", "*", "--enablerepo", "*"]), + "Duplicate repositories were found across disablerepo and enablerepo options", + ), + ( + mock_cli_arguments( + ["--disablerepo", "*", "--disablerepo", "rhel-7-extras-rpm", "--enablerepo", "rhel-7-extras-rpm"] + ), + "Duplicate repositories were found across disablerepo and enablerepo options", + ), + ( + mock_cli_arguments(["--disablerepo", "test", "--enablerepo", "test"]), + "Duplicate repositories were found across disablerepo and enablerepo options", + ), + ), +) +def test_disable_and_enable_repos_has_same_repo(argv, expected, monkeypatch, caplog): + monkeypatch.setattr(sys, "argv", argv) + cli.CLI() + + assert expected in caplog.records[-1].message + + +@pytest.mark.parametrize( + ("argv", "expected"), + ( + ( + mock_cli_arguments(["--disablerepo", "*", "--enablerepo", "test"]), + "Duplicate repositories were found across disablerepo and enablerepo options", + ), + ( + mock_cli_arguments(["--disablerepo", "test", "--enablerepo", "test1"]), + "Duplicate repositories were found across disablerepo and enablerepo options", + ), + ), +) +def test_disable_and_enable_repos_with_different_repos(argv, expected, monkeypatch, caplog): + monkeypatch.setattr(sys, "argv", argv) + cli.CLI() + + assert expected not in caplog.records[-1].message + + +@pytest.mark.parametrize( + ("argv", "expected"), + ( + ([], ["convert"]), + (["--debug"], ["convert", "--debug"]), + (["analyze", "--debug"], ["analyze", "--debug"]), + (["--password=convert", "--debug"], ["convert", "--password=convert", "--debug"]), + ), +) +def test_add_default_command(argv, expected, monkeypatch): + monkeypatch.setattr(sys, "argv", argv) + assert cli._add_default_command(argv) == expected + + +@pytest.mark.parametrize( + ("username", "password", "organization", "activation_key", "no_rhsm", "expected"), + ( + ("User1", "Password1", None, None, False, True), + (None, None, "My Org", "12345ABC", False, True), + ("User1", "Password1", "My Org", "12345ABC", False, True), + (None, None, None, None, True, False), + ("User1", None, None, "12345ABC", False, False), + (None, None, None, None, False, False), + ("User1", "Password1", None, None, True, False), + ), +) +def test_should_subscribe(username, password, organization, activation_key, no_rhsm, expected): + t_opts = toolopts.ToolOpts() + t_opts.username = username + t_opts.password = password + t_opts.org = organization + t_opts.activation_key = activation_key + t_opts.no_rhsm = no_rhsm + + assert cli._should_subscribe(t_opts) is expected + + +@pytest.mark.parametrize( + ("argv", "env_var", "expected", "message"), + ( + ( + ["analyze", "--no-rpm-va"], + False, + False, + "We will proceed with ignoring the --no-rpm-va option as running rpm -Va in the analysis mode is essential for a complete rollback to the original system state at the end of the analysis.", + ), + ( + ["analyze", "--no-rpm-va"], + True, + False, + "We will proceed with ignoring the --no-rpm-va option as running rpm -Va in the analysis mode is essential for a complete rollback to the original system state at the end of the analysis.", + ), + ( + ["--no-rpm-va"], + False, + False, + "We need to run the 'rpm -Va' command to be able to perform a complete rollback of changes done to the system during the pre-conversion analysis. If you accept the risk of an incomplete rollback, set the CONVERT2RHEL_INCOMPLETE_ROLLBACK=1 environment variable. Otherwise, remove the --no-rpm-va option.", + ), + (["--no-rpm-va"], True, True, ""), + ), +) +def test_setting_no_rpm_va(argv, env_var, expected, message, monkeypatch, caplog): + monkeypatch.setattr(sys, "argv", mock_cli_arguments(argv)) + if env_var: + monkeypatch.setenv("CONVERT2RHEL_INCOMPLETE_ROLLBACK", "1") + + try: + cli.CLI() + except SystemExit: + pass + + assert cli.tool_opts.no_rpm_va == expected + if message: + assert caplog.records[-1].message == message + + +@pytest.mark.parametrize( + ("argv", "message"), + ( + # The message is a log of used command + (mock_cli_arguments(["-u", "user", "-p", "pass"]), "-u ***** -p *****"), + ( + mock_cli_arguments(["-p", "pass"]), + "You have passed the RHSM password without an associated username. Please provide a username together with the password", + ), + ( + mock_cli_arguments(["-u", "user"]), + "You have passed the RHSM username without an associated password. Please provide a password together with the username", + ), + ), +) +def test_cli_userpass_specified(argv, message, monkeypatch, caplog): + monkeypatch.setattr(sys, "argv", argv) + + try: + cli.CLI() + except SystemExit: + # Don't care about the exception, focus on output message + pass + assert message in caplog.text + + +@pytest.mark.parametrize( + ("activation_key", "organization", "argv"), + ( + ("activation_key", "org", []), + ("activation_key", "org", ["analyze", "-u name", "-p pass"]), + (None, None, ["analyze", "-u name", "-p pass"]), + ), +) +def test_cli_args_config_file_cornercase(activation_key, organization, argv, monkeypatch): + monkeypatch.setattr(sys, "argv", mock_cli_arguments(argv)) + t_opts = toolopts.ToolOpts() + t_opts.org = organization + t_opts.activation_key = activation_key + t_opts.no_rhsm = True + monkeypatch.setattr(toolopts, "tool_opts", t_opts) + + # Make sure it doesn't raise an exception + cli.CLI() diff --git a/convert2rhel/unit_tests/main_test.py b/convert2rhel/unit_tests/main_test.py index ea49fc4a68..a35c7bb66a 100644 --- a/convert2rhel/unit_tests/main_test.py +++ b/convert2rhel/unit_tests/main_test.py @@ -27,7 +27,7 @@ six.add_move(six.MovedModule("mock", "mock", "unittest.mock")) from six.moves import mock -from convert2rhel import actions, applock, backup, checks, exceptions, grub, hostmetering +from convert2rhel import actions, applock, backup, checks, cli, exceptions, grub, hostmetering from convert2rhel import logger as logger_module from convert2rhel import main, pkghandler, pkgmanager, redhatrelease, subscription, toolopts, utils from convert2rhel.actions import report @@ -250,7 +250,7 @@ def test_help_exit(monkeypatch, tmp_path): def test_main(monkeypatch, tmp_path): require_root_mock = mock.Mock() initialize_file_logging_mock = mock.Mock() - toolopts_cli_mock = mock.Mock() + cli_cli_mock = mock.Mock() show_eula_mock = mock.Mock() print_data_collection_mock = mock.Mock() resolve_system_info_mock = mock.Mock() @@ -278,7 +278,7 @@ def test_main(monkeypatch, tmp_path): monkeypatch.setattr(applock, "_DEFAULT_LOCK_DIR", str(tmp_path)) monkeypatch.setattr(utils, "require_root", require_root_mock) monkeypatch.setattr(main, "initialize_file_logging", initialize_file_logging_mock) - monkeypatch.setattr(toolopts, "CLI", toolopts_cli_mock) + monkeypatch.setattr(cli, "CLI", cli_cli_mock) monkeypatch.setattr(main, "show_eula", show_eula_mock) monkeypatch.setattr(breadcrumbs, "print_data_collection", print_data_collection_mock) monkeypatch.setattr(system_info, "resolve_system_info", resolve_system_info_mock) @@ -306,7 +306,7 @@ def test_main(monkeypatch, tmp_path): assert main.main() == 0 assert require_root_mock.call_count == 1 assert initialize_file_logging_mock.call_count == 1 - assert toolopts_cli_mock.call_count == 1 + assert cli_cli_mock.call_count == 1 assert show_eula_mock.call_count == 1 assert print_data_collection_mock.call_count == 1 assert resolve_system_info_mock.call_count == 1 @@ -333,21 +333,21 @@ class TestRollbackFromMain: def test_main_rollback_post_cli_phase(self, monkeypatch, caplog, tmp_path): require_root_mock = mock.Mock() initialize_file_logging_mock = mock.Mock() - toolopts_cli_mock = mock.Mock() + cli_cli_mock = mock.Mock() show_eula_mock = mock.Mock(side_effect=Exception) finish_collection_mock = mock.Mock() monkeypatch.setattr(applock, "_DEFAULT_LOCK_DIR", str(tmp_path)) monkeypatch.setattr(utils, "require_root", require_root_mock) monkeypatch.setattr(main, "initialize_file_logging", initialize_file_logging_mock) - monkeypatch.setattr(toolopts, "CLI", toolopts_cli_mock) + monkeypatch.setattr(cli, "CLI", cli_cli_mock) monkeypatch.setattr(main, "show_eula", show_eula_mock) monkeypatch.setattr(breadcrumbs, "finish_collection", finish_collection_mock) assert main.main() == 1 assert require_root_mock.call_count == 1 assert initialize_file_logging_mock.call_count == 1 - assert toolopts_cli_mock.call_count == 1 + assert cli_cli_mock.call_count == 1 assert show_eula_mock.call_count == 1 assert finish_collection_mock.call_count == 1 assert "No changes were made to the system." in caplog.records[-2].message @@ -356,7 +356,7 @@ def test_main_traceback_in_clear_versionlock(self, caplog, monkeypatch, tmp_path monkeypatch.setattr(applock, "_DEFAULT_LOCK_DIR", str(tmp_path)) monkeypatch.setattr(utils, "require_root", RequireRootMocked()) monkeypatch.setattr(main, "initialize_file_logging", InitializeFileLoggingMocked()) - monkeypatch.setattr(toolopts, "CLI", CLIMocked()) + monkeypatch.setattr(cli, "CLI", CLIMocked()) monkeypatch.setattr(main, "show_eula", ShowEulaMocked()) monkeypatch.setattr(breadcrumbs, "print_data_collection", PrintDataCollectionMocked()) monkeypatch.setattr(system_info, "resolve_system_info", ResolveSystemInfoMocked()) @@ -381,7 +381,7 @@ def test_main_traceback_in_clear_versionlock(self, caplog, monkeypatch, tmp_path assert main.main() == 1 assert utils.require_root.call_count == 1 - assert toolopts.CLI.call_count == 1 + assert cli.CLI.call_count == 1 assert main.show_eula.call_count == 1 assert breadcrumbs.print_data_collection.call_count == 1 assert system_info.resolve_system_info.call_count == 1 @@ -401,7 +401,7 @@ def test_main_traceback_in_clear_versionlock(self, caplog, monkeypatch, tmp_path def test_main_traceback_before_action_completion(self, monkeypatch, caplog, tmp_path): require_root_mock = mock.Mock() initialize_file_logging_mock = mock.Mock() - toolopts_cli_mock = mock.Mock() + cli_cli_mock = mock.Mock() show_eula_mock = mock.Mock() print_data_collection_mock = mock.Mock() resolve_system_info_mock = mock.Mock() @@ -422,7 +422,7 @@ def test_main_traceback_before_action_completion(self, monkeypatch, caplog, tmp_ monkeypatch.setattr(applock, "_DEFAULT_LOCK_DIR", str(tmp_path)) monkeypatch.setattr(utils, "require_root", require_root_mock) monkeypatch.setattr(main, "initialize_file_logging", initialize_file_logging_mock) - monkeypatch.setattr(toolopts, "CLI", toolopts_cli_mock) + monkeypatch.setattr(cli, "CLI", cli_cli_mock) monkeypatch.setattr(main, "show_eula", show_eula_mock) monkeypatch.setattr(breadcrumbs, "print_data_collection", print_data_collection_mock) monkeypatch.setattr(system_info, "resolve_system_info", resolve_system_info_mock) @@ -441,7 +441,7 @@ def test_main_traceback_before_action_completion(self, monkeypatch, caplog, tmp_ assert main.main() == 1 assert require_root_mock.call_count == 1 assert initialize_file_logging_mock.call_count == 1 - assert toolopts_cli_mock.call_count == 1 + assert cli_cli_mock.call_count == 1 assert show_eula_mock.call_count == 1 assert print_data_collection_mock.call_count == 1 assert resolve_system_info_mock.call_count == 1 @@ -464,7 +464,7 @@ def test_main_traceback_before_action_completion(self, monkeypatch, caplog, tmp_ def test_main_rollback_pre_ponr_changes_phase(self, monkeypatch, caplog, tmp_path): require_root_mock = mock.Mock() initialize_file_logging_mock = mock.Mock() - toolopts_cli_mock = mock.Mock() + cli_cli_mock = mock.Mock() show_eula_mock = mock.Mock() print_data_collection_mock = mock.Mock() resolve_system_info_mock = mock.Mock() @@ -487,7 +487,7 @@ def test_main_rollback_pre_ponr_changes_phase(self, monkeypatch, caplog, tmp_pat monkeypatch.setattr(applock, "_DEFAULT_LOCK_DIR", str(tmp_path)) monkeypatch.setattr(utils, "require_root", require_root_mock) monkeypatch.setattr(main, "initialize_file_logging", initialize_file_logging_mock) - monkeypatch.setattr(toolopts, "CLI", toolopts_cli_mock) + monkeypatch.setattr(cli, "CLI", cli_cli_mock) monkeypatch.setattr(main, "show_eula", show_eula_mock) monkeypatch.setattr(breadcrumbs, "print_data_collection", print_data_collection_mock) monkeypatch.setattr(system_info, "resolve_system_info", resolve_system_info_mock) @@ -508,7 +508,7 @@ def test_main_rollback_pre_ponr_changes_phase(self, monkeypatch, caplog, tmp_pat assert main.main() == 2 assert require_root_mock.call_count == 1 assert initialize_file_logging_mock.call_count == 1 - assert toolopts_cli_mock.call_count == 1 + assert cli_cli_mock.call_count == 1 assert show_eula_mock.call_count == 1 assert print_data_collection_mock.call_count == 1 assert resolve_system_info_mock.call_count == 1 @@ -538,7 +538,7 @@ def test_main_rollback_analyze_exit_phase_without_subman(self, global_tool_opts, (applock, "_DEFAULT_LOCK_DIR", str(tmp_path)), (utils, "require_root", mock.Mock()), (main, "initialize_file_logging", mock.Mock()), - (toolopts, "CLI", mock.Mock()), + (cli, "CLI", mock.Mock()), (main, "show_eula", mock.Mock()), (breadcrumbs, "print_data_collection", mock.Mock()), (system_info, "resolve_system_info", mock.Mock()), @@ -561,7 +561,7 @@ def test_main_rollback_analyze_exit_phase_without_subman(self, global_tool_opts, assert main.main() == 0 assert utils.require_root.call_count == 1 - assert toolopts.CLI.call_count == 1 + assert cli.CLI.call_count == 1 assert main.show_eula.call_count == 1 assert breadcrumbs.print_data_collection.call_count == 1 assert system_info.resolve_system_info.call_count == 1 @@ -581,7 +581,7 @@ def test_main_rollback_analyze_exit_phase_without_subman(self, global_tool_opts, def test_main_rollback_analyze_exit_phase(self, global_tool_opts, monkeypatch, tmp_path): require_root_mock = mock.Mock() initialize_file_logging_mock = mock.Mock() - toolopts_cli_mock = mock.Mock() + cli_cli_mock = mock.Mock() show_eula_mock = mock.Mock() print_data_collection_mock = mock.Mock() resolve_system_info_mock = mock.Mock() @@ -603,7 +603,7 @@ def test_main_rollback_analyze_exit_phase(self, global_tool_opts, monkeypatch, t monkeypatch.setattr(applock, "_DEFAULT_LOCK_DIR", str(tmp_path)) monkeypatch.setattr(utils, "require_root", require_root_mock) monkeypatch.setattr(main, "initialize_file_logging", initialize_file_logging_mock) - monkeypatch.setattr(toolopts, "CLI", toolopts_cli_mock) + monkeypatch.setattr(cli, "CLI", cli_cli_mock) monkeypatch.setattr(main, "show_eula", show_eula_mock) monkeypatch.setattr(breadcrumbs, "print_data_collection", print_data_collection_mock) monkeypatch.setattr(system_info, "resolve_system_info", resolve_system_info_mock) @@ -624,7 +624,7 @@ def test_main_rollback_analyze_exit_phase(self, global_tool_opts, monkeypatch, t assert main.main() == 0 assert require_root_mock.call_count == 1 assert initialize_file_logging_mock.call_count == 1 - assert toolopts_cli_mock.call_count == 1 + assert cli_cli_mock.call_count == 1 assert show_eula_mock.call_count == 1 assert print_data_collection_mock.call_count == 1 assert resolve_system_info_mock.call_count == 1 @@ -643,7 +643,7 @@ def test_main_rollback_analyze_exit_phase(self, global_tool_opts, monkeypatch, t def test_main_rollback_post_ponr_changes_phase(self, monkeypatch, caplog, tmp_path): require_root_mock = mock.Mock() initialize_file_logging_mock = mock.Mock() - toolopts_cli_mock = mock.Mock() + cli_cli_mock = mock.Mock() show_eula_mock = mock.Mock() print_data_collection_mock = mock.Mock() resolve_system_info_mock = mock.Mock() @@ -667,7 +667,7 @@ def test_main_rollback_post_ponr_changes_phase(self, monkeypatch, caplog, tmp_pa monkeypatch.setattr(applock, "_DEFAULT_LOCK_DIR", str(tmp_path)) monkeypatch.setattr(utils, "require_root", require_root_mock) monkeypatch.setattr(main, "initialize_file_logging", initialize_file_logging_mock) - monkeypatch.setattr(toolopts, "CLI", toolopts_cli_mock) + monkeypatch.setattr(cli, "CLI", cli_cli_mock) monkeypatch.setattr(main, "show_eula", show_eula_mock) monkeypatch.setattr(breadcrumbs, "print_data_collection", print_data_collection_mock) monkeypatch.setattr(system_info, "resolve_system_info", resolve_system_info_mock) @@ -689,7 +689,7 @@ def test_main_rollback_post_ponr_changes_phase(self, monkeypatch, caplog, tmp_pa assert main.main() == 1 assert require_root_mock.call_count == 1 assert initialize_file_logging_mock.call_count == 1 - assert toolopts_cli_mock.call_count == 1 + assert cli_cli_mock.call_count == 1 assert show_eula_mock.call_count == 1 assert print_data_collection_mock.call_count == 1 assert resolve_system_info_mock.call_count == 1 @@ -791,7 +791,7 @@ def test_raise_for_skipped_failures(data, exception, match, activity, global_too def test_main_already_running_conversion(monkeypatch, caplog, tmpdir): - monkeypatch.setattr(toolopts, "CLI", mock.Mock()) + monkeypatch.setattr(cli, "CLI", mock.Mock()) monkeypatch.setattr(utils, "require_root", mock.Mock()) monkeypatch.setattr(applock, "_DEFAULT_LOCK_DIR", str(tmpdir)) monkeypatch.setattr(main, "main_locked", mock.Mock(side_effect=applock.ApplicationLockedError("failed"))) diff --git a/convert2rhel/unit_tests/toolopts_test.py b/convert2rhel/unit_tests/toolopts_test.py index c40a459f68..c310d5dbea 100644 --- a/convert2rhel/unit_tests/toolopts_test.py +++ b/convert2rhel/unit_tests/toolopts_test.py @@ -16,555 +16,9 @@ # along with this program. If not, see . __metaclass__ = type -import os -import sys - -from collections import namedtuple - import pytest -import six - -import convert2rhel.toolopts -import convert2rhel.utils - - -six.add_move(six.MovedModule("mock", "mock", "unittest.mock")) -from six.moves import mock - - -def mock_cli_arguments(args): - """ - Return a list of cli arguments where the first one is always the name of - the executable, followed by 'args'. - """ - return sys.argv[0:1] + args - - -class TestTooloptsParseFromCLI: - def test_cmdline_interactive_username_without_passwd(self, monkeypatch, global_tool_opts): - monkeypatch.setattr(sys, "argv", mock_cli_arguments(["--username", "uname"])) - convert2rhel.toolopts.CLI() - assert global_tool_opts.username == "uname" - - def test_cmdline_interactive_passwd_without_uname(self, monkeypatch, global_tool_opts): - monkeypatch.setattr(sys, "argv", mock_cli_arguments(["--password", "passwd"])) - convert2rhel.toolopts.CLI() - assert global_tool_opts.password == "passwd" - - def test_cmdline_non_interactive_with_credentials(self, monkeypatch, global_tool_opts): - monkeypatch.setattr(sys, "argv", mock_cli_arguments(["--username", "uname", "--password", "passwd"])) - convert2rhel.toolopts.CLI() - assert global_tool_opts.username == "uname" - assert global_tool_opts.password == "passwd" - - def test_cmdline_disablerepo_defaults_to_asterisk(self, monkeypatch, global_tool_opts): - monkeypatch.setattr(sys, "argv", mock_cli_arguments(["--enablerepo", "foo"])) - convert2rhel.toolopts.CLI() - assert global_tool_opts.enablerepo == ["foo"] - assert global_tool_opts.disablerepo == ["*"] - - # Parsing of serverurl - - @pytest.mark.parametrize( - ("serverurl", "hostname", "port", "prefix"), - ( - ("https://rhsm.redhat.com:443/", "rhsm.redhat.com", "443", "/"), - ("https://localhost/rhsm/", "localhost", None, "/rhsm/"), - ("https://rhsm.redhat.com/", "rhsm.redhat.com", None, "/"), - ("https://rhsm.redhat.com", "rhsm.redhat.com", None, None), - ("https://rhsm.redhat.com:8443", "rhsm.redhat.com", "8443", None), - ("subscription.redhat.com", "subscription.redhat.com", None, None), - ), - ) - def test_custom_serverurl(self, monkeypatch, global_tool_opts, serverurl, hostname, port, prefix): - monkeypatch.setattr( - sys, - "argv", - mock_cli_arguments(["--serverurl", serverurl, "--username", "User1", "--password", "Password1"]), - ) - convert2rhel.toolopts.CLI() - assert global_tool_opts.rhsm_hostname == hostname - assert global_tool_opts.rhsm_port == port - assert global_tool_opts.rhsm_prefix == prefix - - def test_no_serverurl(self, monkeypatch, global_tool_opts): - monkeypatch.setattr(sys, "argv", mock_cli_arguments([])) - convert2rhel.toolopts.CLI() - assert global_tool_opts.rhsm_hostname is None - assert global_tool_opts.rhsm_port is None - assert global_tool_opts.rhsm_prefix is None - - @pytest.mark.parametrize( - "serverurl", - ( - "gopher://subscription.rhsm.redhat.com/", - "https:///", - "https://", - "/", - ), - ) - def test_bad_serverurl(self, caplog, monkeypatch, global_tool_opts, serverurl): - monkeypatch.setattr(sys, "argv", mock_cli_arguments(["--serverurl", serverurl, "-o", "MyOrg", "-k", "012335"])) - - with pytest.raises(SystemExit): - convert2rhel.toolopts.CLI() - - message = ( - "Failed to parse a valid subscription-manager server from the --serverurl option.\n" - "Please check for typos and run convert2rhel again with a corrected --serverurl.\n" - "Supplied serverurl: %s\nError: " % serverurl - ) - assert message in caplog.records[-1].message - assert caplog.records[-1].levelname == "CRITICAL" - - def test_serverurl_with_no_rhsm(self, caplog, monkeypatch, global_tool_opts): - monkeypatch.setattr( - sys, "argv", mock_cli_arguments(["--serverurl", "localhost", "--no-rhsm", "--enablerepo", "testrepo"]) - ) - - convert2rhel.toolopts.CLI() - - message = "Ignoring the --serverurl option. It has no effect when --no-rhsm is used." - assert message in caplog.text - - def test_serverurl_with_no_rhsm_credentials(self, caplog, monkeypatch, global_tool_opts): - monkeypatch.setattr(sys, "argv", mock_cli_arguments(["--serverurl", "localhost"])) - - convert2rhel.toolopts.CLI() - - message = ( - "Ignoring the --serverurl option. It has no effect when no credentials to" - " subscribe the system were given." - ) - assert message in caplog.text - - -@pytest.mark.parametrize( - ("argv", "raise_exception", "no_rhsm_value"), - ( - (mock_cli_arguments(["--no-rhsm"]), True, True), - (mock_cli_arguments(["--no-rhsm", "--enablerepo", "test_repo"]), False, True), - ), -) -@mock.patch("convert2rhel.toolopts.tool_opts.no_rhsm", False) -@mock.patch("convert2rhel.toolopts.tool_opts.enablerepo", []) -def test_no_rhsm_option_work(argv, raise_exception, no_rhsm_value, monkeypatch, caplog, global_tool_opts): - monkeypatch.setattr(sys, "argv", argv) - - if raise_exception: - with pytest.raises(SystemExit): - convert2rhel.toolopts.CLI() - assert "The --enablerepo option is required when --no-rhsm is used." in caplog.text - else: - convert2rhel.toolopts.CLI() - - assert convert2rhel.toolopts.tool_opts.no_rhsm == no_rhsm_value - - -@pytest.mark.parametrize( - ("argv", "content", "output", "message"), - ( - pytest.param( - mock_cli_arguments([]), - "[subscription_manager]\nusername=conf_user\npassword=conf_pass\nactivation_key=conf_key\norg=conf_org", - {"username": "conf_user", "password": "conf_pass", "activation_key": "conf_key", "org": "conf_org"}, - None, - id="All values set in config", - ), - ( - mock_cli_arguments([]), - "[subscription_manager]\npassword=conf_pass", - {"password": "conf_pass", "activation_key": None}, - None, - ), - ( - mock_cli_arguments(["-p", "password"]), - "[subscription_manager]\nactivation_key=conf_key", - {"password": "password", "activation_key": None}, - "You have passed either the RHSM password or activation key through both the command line and" - " the configuration file. We're going to use the command line values.", - ), - ( - mock_cli_arguments(["-k", "activation_key", "-o", "org"]), - "[subscription_manager]\nactivation_key=conf_key", - {"password": None, "activation_key": "activation_key"}, - "You have passed either the RHSM password or activation key through both the command line and" - " the configuration file. We're going to use the command line values.", - ), - ( - mock_cli_arguments(["-k", "activation_key", "-o", "org"]), - "[subscription_manager]\npassword=conf_pass", - {"password": "conf_pass", "activation_key": "activation_key"}, - "You have passed either the RHSM password or activation key through both the command line and" - " the configuration file. We're going to use the command line values.", - ), - ( - mock_cli_arguments(["-k", "activation_key", "-p", "password", "-o", "org"]), - "[subscription_manager]\npassword=conf_pass\nactivation_key=conf_key", - {"password": "password", "activation_key": "activation_key"}, - "You have passed either the RHSM password or activation key through both the command line and" - " the configuration file. We're going to use the command line values.", - ), - ( - mock_cli_arguments(["-o", "org"]), - "[subscription_manager]\npassword=conf_pass\nactivation_key=conf_key", - {"password": "conf_pass", "activation_key": "conf_key"}, - "Either a password or an activation key can be used for system registration. We're going to use the" - " activation key.", - ), - ( - mock_cli_arguments(["-u", "McLOVIN"]), - "[subscription_manager]\nusername=NotMcLOVIN\n", - {"username": "McLOVIN"}, - "You have passed the RHSM username through both the command line and" - " the configuration file. We're going to use the command line values.", - ), - ( - mock_cli_arguments(["-o", "some-org"]), - "[subscription_manager]\norg=a-different-org\nactivation_key=conf_key", - {"org": "some-org"}, - "You have passed the RHSM org through both the command line and" - " the configuration file. We're going to use the command line values.", - ), - ), -) -def test_config_file(argv, content, output, message, monkeypatch, tmpdir, caplog, global_tool_opts): - # After each test there were left data from previous - # Re-init needed delete the set data - path = os.path.join(str(tmpdir), "convert2rhel.ini") - with open(path, "w") as file: - file.write(content) - os.chmod(path, 0o600) - - monkeypatch.setattr(sys, "argv", argv) - monkeypatch.setattr(convert2rhel.toolopts, "CONFIG_PATHS", value=[path]) - convert2rhel.toolopts.CLI() - - if "activation_key" in output: - assert convert2rhel.toolopts.tool_opts.activation_key == output["activation_key"] - - if "password" in output: - assert convert2rhel.toolopts.tool_opts.password == output["password"] - - if "username" in output: - assert convert2rhel.toolopts.tool_opts.username == output["username"] - - if "org" in output: - assert convert2rhel.toolopts.tool_opts.org == output["org"] - - if message: - print(caplog.text) - assert message in caplog.text - - -@pytest.mark.parametrize( - ("argv", "content", "message", "output"), - ( - ( - mock_cli_arguments(["--password", "pass", "--config-file"]), - "[subscription_manager]\nactivation_key = key_cnf_file", - "You have passed either the RHSM password or activation key through both the command line and" - " the configuration file. We're going to use the command line values.", - {"password": "pass", "activation_key": None}, - ), - ), -) -def test_multiple_auth_src_combined(argv, content, message, output, caplog, monkeypatch, tmpdir, global_tool_opts): - """Test combination of password file or configuration file and CLI arguments.""" - path = os.path.join(str(tmpdir), "convert2rhel.file") - with open(path, "w") as file: - file.write(content) - os.chmod(path, 0o600) - # The path for file is the last argument - argv.append(path) - - monkeypatch.setattr(sys, "argv", argv) - monkeypatch.setattr(convert2rhel.toolopts, "CONFIG_PATHS", value=[""]) - convert2rhel.toolopts.CLI() - - assert message in caplog.text - assert convert2rhel.toolopts.tool_opts.activation_key == output["activation_key"] - assert convert2rhel.toolopts.tool_opts.password == output["password"] - - -@pytest.mark.parametrize( - ("argv", "message", "output"), - ( - ( - mock_cli_arguments(["--password", "pass", "--activationkey", "key", "-o", "org"]), - "Either a password or an activation key can be used for system registration." - " We're going to use the activation key.", - {"password": "pass", "activation_key": "key"}, - ), - ), -) -def test_multiple_auth_src_cli(argv, message, output, caplog, monkeypatch, global_tool_opts): - """Test both auth methods in CLI.""" - monkeypatch.setattr(sys, "argv", argv) - monkeypatch.setattr(convert2rhel.toolopts, "CONFIG_PATHS", value=[""]) - convert2rhel.toolopts.CLI() - - assert message in caplog.text - assert convert2rhel.toolopts.tool_opts.activation_key == output["activation_key"] - assert convert2rhel.toolopts.tool_opts.password == output["password"] - - -@pytest.mark.parametrize( - ("content", "expected_message"), - ( - ( - """ - [subscription_manager] - incorect_option = yes - """, - "Unsupported option", - ), - ( - """ - [invalid_header] - username = correct_username - """, - "Couldn't find header", - ), - ), -) -def test_options_from_config_files_invalid_head_and_options(content, expected_message, tmpdir, caplog): - path = os.path.join(str(tmpdir), "convert2rhel.ini") - - with open(path, "w") as file: - file.write(content) - os.chmod(path, 0o600) - - opts = convert2rhel.toolopts.options_from_config_files(path) - - assert not opts - assert expected_message in caplog.text - - -@pytest.mark.parametrize( - ("content", "expected_message"), - ( - ( - """ - [subscription_manager] - """, - "No options found for subscription_manager. It seems to be empty or commented.", - ), - ), -) -def test_options_from_config_files_commented_out_options(content, expected_message, tmpdir, caplog): - path = os.path.join(str(tmpdir), "convert2rhel.ini") - - with open(path, "w") as file: - file.write(content) - os.chmod(path, 0o600) - - opts = convert2rhel.toolopts.options_from_config_files(path) - - assert not opts - assert expected_message in caplog.text - - -@pytest.mark.parametrize( - ("content", "output"), - ( - ( - """ - [subscription_manager] - username = correct_username - """, - {"username": "correct_username"}, - ), - # Test if we will unquote this correctly - ( - """ - [subscription_manager] - username = "correct_username" - """, - {"username": "correct_username"}, - ), - ( - """ - [subscription_manager] - password = correct_password - """, - {"password": "correct_password"}, - ), - ( - """ - [subscription_manager] - activation_key = correct_key - password = correct_password - username = correct_username - org = correct_org - """, - { - "username": "correct_username", - "password": "correct_password", - "activation_key": "correct_key", - "org": "correct_org", - }, - ), - ( - """ - [subscription_manager] - org = correct_org - """, - {"org": "correct_org"}, - ), - ( - """ - [settings] - incomplete_rollback = 1 - """, - {"incomplete_rollback": "1"}, - ), - ( - """ - [subscription_manager] - org = correct_org - - [settings] - incomplete_rollback = 1 - """, - {"org": "correct_org", "incomplete_rollback": "1"}, - ), - ( - """ - [settings] - incomplete_rollback = 1 - tainted_kernel_module_check_skip = 1 - outdated_package_check_skip = 1 - allow_older_version = 1 - allow_unavailable_kmods = 1 - configure_host_metering = 1 - skip_kernel_currency_check = 1 - """, - { - "incomplete_rollback": "1", - "tainted_kernel_module_check_skip": "1", - "outdated_package_check_skip": "1", - "allow_older_version": "1", - "allow_unavailable_kmods": "1", - "configure_host_metering": "1", - "skip_kernel_currency_check": "1", - }, - ), - ), -) -def test_options_from_config_files_default(content, output, monkeypatch, tmpdir): - """Test config files in default path.""" - path = os.path.join(str(tmpdir), "convert2rhel.ini") - - with open(path, "w") as file: - file.write(content) - os.chmod(path, 0o600) - - paths = ["/nonexisting/path", path] - monkeypatch.setattr(convert2rhel.toolopts, "CONFIG_PATHS", value=paths) - opts = convert2rhel.toolopts.options_from_config_files(None) - for key in ["username", "password", "activation_key", "org"]: - if key in opts: - assert opts[key] == output[key] - - -@pytest.mark.parametrize( - ("content", "output", "content_lower_priority"), - ( - ( - """ - [subscription_manager] - username = correct_username - activation_key = correct_key - """, - {"username": "correct_username", "password": None, "activation_key": "correct_key", "org": None}, - """ - [subscription_manager] - username = low_prior_username - """, - ), - ( - """ - [subscription_manager] - username = correct_username - activation_key = correct_key - """, - {"username": "correct_username", "password": None, "activation_key": "correct_key", "org": None}, - """ - [subscription_manager] - activation_key = low_prior_key - """, - ), - ( - """ - [subscription_manager] - activation_key = correct_key - org = correct_org""", - {"username": None, "password": None, "activation_key": "correct_key", "org": "correct_org"}, - """ - [subscription_manager] - org = low_prior_org - """, - ), - ( - """ - [subscription_manager] - activation_key = correct_key - Password = correct_password - """, - {"username": None, "password": "correct_password", "activation_key": "correct_key", "org": None}, - """ - [subscription_manager] - password = low_prior_pass - """, - ), - ( - """ - [subscription_manager] - activation_key = correct_key - Password = correct_password - """, - {"username": None, "password": "correct_password", "activation_key": "correct_key", "org": None}, - """ - [INVALID_HEADER] - password = low_prior_pass - """, - ), - ( - """ - [subscription_manager] - activation_key = correct_key - Password = correct_password - """, - {"username": None, "password": "correct_password", "activation_key": "correct_key", "org": None}, - """ - [subscription_manager] - incorrect_option = incorrect_option - """, - ), - ), -) -def test_options_from_config_files_specified(content, output, content_lower_priority, monkeypatch, tmpdir, caplog): - """Test user specified path for config file.""" - path_higher_priority = os.path.join(str(tmpdir), "convert2rhel.ini") - with open(path_higher_priority, "w") as file: - file.write(content) - os.chmod(path_higher_priority, 0o600) - - path_lower_priority = os.path.join(str(tmpdir), "convert2rhel_lower.ini") - with open(path_lower_priority, "w") as file: - file.write(content_lower_priority) - os.chmod(path_lower_priority, 0o600) - - paths = [path_higher_priority, path_lower_priority] - monkeypatch.setattr(convert2rhel.toolopts, "CONFIG_PATHS", value=paths) - - opts = convert2rhel.toolopts.options_from_config_files(None) - - for key in ["username", "password", "activation_key", "org"]: - if key in opts: - assert opts[key] == output[key] +from convert2rhel import toolopts @pytest.mark.parametrize( @@ -586,262 +40,10 @@ def test_options_from_config_files_specified(content, output, content_lower_prio ), ) def test_set_opts(supported_opts, global_tool_opts): - convert2rhel.toolopts.ToolOpts.set_opts(global_tool_opts, supported_opts) + toolopts.ToolOpts.set_opts(global_tool_opts, supported_opts) assert global_tool_opts.username == supported_opts["username"] assert global_tool_opts.password == supported_opts["password"] assert global_tool_opts.activation_key == supported_opts["activation_key"] assert global_tool_opts.org == supported_opts["org"] assert not hasattr(global_tool_opts, "invalid_key") - - -UrlParts = namedtuple("UrlParts", ("scheme", "hostname", "port")) - - -@pytest.mark.parametrize( - ("url_parts", "message"), - ( - ( - UrlParts("gopher", "localhost", None), - "Subscription manager must be accessed over http or https. gopher is not valid", - ), - (UrlParts("http", None, None), "A hostname must be specified in a subscription-manager serverurl"), - (UrlParts("http", "", None), "A hostname must be specified in a subscription-manager serverurl"), - ), -) -def test_validate_serverurl_parsing(url_parts, message): - with pytest.raises(ValueError, match=message): - convert2rhel.toolopts._validate_serverurl_parsing(url_parts) - - -def test_log_command_used(caplog, monkeypatch): - obfuscation_string = "*" * 5 - input_command = mock_cli_arguments( - ["--username", "uname", "--password", "123", "--activationkey", "456", "--org", "789"] - ) - expected_command = mock_cli_arguments( - [ - "--username", - obfuscation_string, - "--password", - obfuscation_string, - "--activationkey", - obfuscation_string, - "--org", - obfuscation_string, - ] - ) - monkeypatch.setattr(sys, "argv", input_command) - convert2rhel.toolopts._log_command_used() - - assert " ".join(expected_command) in caplog.records[-1].message - - -@pytest.mark.parametrize( - ("argv", "message"), - ( - # The message is a log of used command - (mock_cli_arguments(["-o", "org", "-k", "key"]), "-o ***** -k *****"), - ( - mock_cli_arguments(["-o", "org"]), - "Either the --org or the --activationkey option is missing. You can't use one without the other.", - ), - ( - mock_cli_arguments(["-k", "key"]), - "Either the --org or the --activationkey option is missing. You can't use one without the other.", - ), - ), -) -def test_org_activation_key_specified(argv, message, monkeypatch, caplog, global_tool_opts): - monkeypatch.setattr(sys, "argv", argv) - - try: - convert2rhel.toolopts.CLI() - except SystemExit: - # Don't care about the exception, focus on output message - pass - assert message in caplog.text - - -@pytest.mark.parametrize( - ("argv", "expected"), - ( - (mock_cli_arguments(["convert"]), "conversion"), - (mock_cli_arguments(["analyze"]), "analysis"), - (mock_cli_arguments([]), "conversion"), - ), -) -def test_pre_assessment_set(argv, expected, monkeypatch, global_tool_opts): - monkeypatch.setattr(sys, "argv", argv) - - convert2rhel.toolopts.CLI() - - assert global_tool_opts.activity == expected - - -@pytest.mark.parametrize( - ("argv", "expected"), - ( - ( - mock_cli_arguments(["--disablerepo", "*", "--enablerepo", "*"]), - "Duplicate repositories were found across disablerepo and enablerepo options", - ), - ( - mock_cli_arguments( - ["--disablerepo", "*", "--disablerepo", "rhel-7-extras-rpm", "--enablerepo", "rhel-7-extras-rpm"] - ), - "Duplicate repositories were found across disablerepo and enablerepo options", - ), - ( - mock_cli_arguments(["--disablerepo", "test", "--enablerepo", "test"]), - "Duplicate repositories were found across disablerepo and enablerepo options", - ), - ), -) -def test_disable_and_enable_repos_has_same_repo(argv, expected, monkeypatch, caplog, global_tool_opts): - monkeypatch.setattr(sys, "argv", argv) - convert2rhel.toolopts.CLI() - - assert expected in caplog.records[-1].message - - -@pytest.mark.parametrize( - ("argv", "expected"), - ( - ( - mock_cli_arguments(["--disablerepo", "*", "--enablerepo", "test"]), - "Duplicate repositories were found across disablerepo and enablerepo options", - ), - ( - mock_cli_arguments(["--disablerepo", "test", "--enablerepo", "test1"]), - "Duplicate repositories were found across disablerepo and enablerepo options", - ), - ), -) -def test_disable_and_enable_repos_with_different_repos(argv, expected, monkeypatch, caplog, global_tool_opts): - monkeypatch.setattr(sys, "argv", argv) - convert2rhel.toolopts.CLI() - - assert expected not in caplog.records[-1].message - - -@pytest.mark.parametrize( - ("argv", "expected"), - ( - ([], ["convert"]), - (["--debug"], ["convert", "--debug"]), - (["analyze", "--debug"], ["analyze", "--debug"]), - (["--password=convert", "--debug"], ["convert", "--password=convert", "--debug"]), - ), -) -def test_add_default_command(argv, expected, monkeypatch): - monkeypatch.setattr(sys, "argv", argv) - assert convert2rhel.toolopts._add_default_command(argv) == expected - - -@pytest.mark.parametrize( - ("username", "password", "organization", "activation_key", "no_rhsm", "expected"), - ( - ("User1", "Password1", None, None, False, True), - (None, None, "My Org", "12345ABC", False, True), - ("User1", "Password1", "My Org", "12345ABC", False, True), - (None, None, None, None, True, False), - ("User1", None, None, "12345ABC", False, False), - (None, None, None, None, False, False), - ("User1", "Password1", None, None, True, False), - ), -) -def test_should_subscribe(username, password, organization, activation_key, no_rhsm, expected): - t_opts = convert2rhel.toolopts.ToolOpts() - t_opts.username = username - t_opts.password = password - t_opts.org = organization - t_opts.activation_key = activation_key - t_opts.no_rhsm = no_rhsm - - assert convert2rhel.toolopts._should_subscribe(t_opts) is expected - - -@pytest.mark.parametrize( - ("argv", "env_var", "expected", "message"), - ( - ( - ["analyze", "--no-rpm-va"], - False, - False, - "We will proceed with ignoring the --no-rpm-va option as running rpm -Va in the analysis mode is essential for a complete rollback to the original system state at the end of the analysis.", - ), - ( - ["analyze", "--no-rpm-va"], - True, - False, - "We will proceed with ignoring the --no-rpm-va option as running rpm -Va in the analysis mode is essential for a complete rollback to the original system state at the end of the analysis.", - ), - ( - ["--no-rpm-va"], - False, - False, - "We need to run the 'rpm -Va' command to be able to perform a complete rollback of changes done to the system during the pre-conversion analysis. If you accept the risk of an incomplete rollback, set the CONVERT2RHEL_INCOMPLETE_ROLLBACK=1 environment variable. Otherwise, remove the --no-rpm-va option.", - ), - (["--no-rpm-va"], True, True, ""), - ), -) -def test_setting_no_rpm_va(argv, env_var, expected, message, monkeypatch, global_tool_opts, caplog): - monkeypatch.setattr(sys, "argv", mock_cli_arguments(argv)) - if env_var: - monkeypatch.setenv("CONVERT2RHEL_INCOMPLETE_ROLLBACK", "1") - - try: - convert2rhel.toolopts.CLI() - except SystemExit: - pass - - assert global_tool_opts.no_rpm_va == expected - if message: - assert caplog.records[-1].message == message - - -@pytest.mark.parametrize( - ("argv", "message"), - ( - # The message is a log of used command - (mock_cli_arguments(["-u", "user", "-p", "pass"]), "-u ***** -p *****"), - ( - mock_cli_arguments(["-p", "pass"]), - "You have passed the RHSM password without an associated username. Please provide a username together with the password", - ), - ( - mock_cli_arguments(["-u", "user"]), - "You have passed the RHSM username without an associated password. Please provide a password together with the username", - ), - ), -) -def test_cli_userpass_specified(argv, message, monkeypatch, caplog, global_tool_opts): - monkeypatch.setattr(sys, "argv", argv) - - try: - convert2rhel.toolopts.CLI() - except SystemExit: - # Don't care about the exception, focus on output message - pass - assert message in caplog.text - - -@pytest.mark.parametrize( - ("activation_key", "organization", "argv"), - ( - ("activation_key", "org", []), - ("activation_key", "org", ["analyze", "-u name", "-p pass"]), - (None, None, ["analyze", "-u name", "-p pass"]), - ), -) -def test_cli_args_config_file_cornercase(activation_key, organization, argv, monkeypatch): - monkeypatch.setattr(sys, "argv", mock_cli_arguments(argv)) - t_opts = convert2rhel.toolopts.ToolOpts() - t_opts.org = organization - t_opts.activation_key = activation_key - t_opts.no_rhsm = True - monkeypatch.setattr(convert2rhel.toolopts, "tool_opts", t_opts) - - # Make sure it doesn't raise an exception - convert2rhel.toolopts.CLI() diff --git a/convert2rhel/utils/subscription.py b/convert2rhel/utils/subscription.py new file mode 100644 index 0000000000..cf44226074 --- /dev/null +++ b/convert2rhel/utils/subscription.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# +# Copyright(C) 2024 Red Hat, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__metaclass__ = type + +import logging +import re + +from six.moves import urllib + + +loggerinst = logging.getLogger(__name__) + + +def setup_rhsm_parts(opts): + rhsm_parts = {} + + if opts.serverurl: + if opts["no_rhsm"]: + loggerinst.warning("Ignoring the --serverurl option. It has no effect when --no-rhsm is used.") + + # WARNING: We cannot use the following helper until after no_rhsm, + # username, password, activation_key, and organization have been + # set. + elif not _should_subscribe(opts): + loggerinst.warning( + "Ignoring the --serverurl option. It has no effect when no credentials to subscribe the system were given." + ) + else: + # Parse the serverurl and save the components. + try: + url_parts = _parse_subscription_manager_serverurl(opts.serverurl) + url_parts = _validate_serverurl_parsing(url_parts) + except ValueError as e: + # If we fail to parse, fail the conversion. The reason for + # this harsh treatment is that we will be submitting + # credentials to the server parsed from the serverurl. If + # the user is specifying an internal subscription-manager + # server but typo the url, we would fallback to the + # public red hat subscription-manager server. That would + # mean the user thinks the credentials are being passed + # to their internal subscription-manager server but it + # would really be passed externally. That's not a good + # security practice. + loggerinst.critical( + "Failed to parse a valid subscription-manager server from the --serverurl option.\n" + "Please check for typos and run convert2rhel again with a corrected --serverurl.\n" + "Supplied serverurl: %s\nError: %s" % (opts["serverurl"], e) + ) + + rhsm_parts["rhsm_hostname"] = url_parts.hostname + + if url_parts.port: + # urllib.parse.urlsplit() converts this into an int but we + # always use it as a str + rhsm_parts["rhsm_port"] = str(url_parts.port) + + if url_parts.path: + rhsm_parts["rhsm_prefix"] = url_parts.path + + return rhsm_parts + + +def _parse_subscription_manager_serverurl(serverurl): + """Parse a url string in a manner mostly compatible with subscription-manager --serverurl.""" + # This is an adaptation of what subscription-manager's cli enforces: + # https://github.com/candlepin/subscription-manager/blob/main/src/rhsm/utils.py#L112 + + # Don't modify http:// and https:// as they are fine + if not re.match("https?://[^/]+", serverurl): + # Anthing that looks like a malformed scheme is immediately discarded + if re.match("^[^:]+:/.+", serverurl): + raise ValueError("Unable to parse --serverurl. Make sure it starts with http://HOST or https://HOST") + + # If there isn't a scheme, add one now + serverurl = "https://%s" % serverurl + + url_parts = urllib.parse.urlsplit(serverurl, allow_fragments=False) + + return url_parts + + +def _validate_serverurl_parsing(url_parts): + """ + Perform some tests that we parsed the subscription-manager serverurl successfully. + + :arg url_parts: The parsed serverurl as returned by urllib.parse.urlsplit() + :raises ValueError: If any of the checks fail. + :returns: url_parts If the check was successful. + """ + if url_parts.scheme not in ("https", "http"): + raise ValueError( + "Subscription manager must be accessed over http or https. %s is not valid" % url_parts.scheme + ) + + if not url_parts.hostname: + raise ValueError("A hostname must be specified in a subscription-manager serverurl") + + return url_parts + + +def _should_subscribe(opts): + """ + Whether we should subscribe the system with subscription-manager. + + If there are no ways to authenticate to subscription-manager, then we will + attempt to convert without subscribing the system. The assumption is that + the user has already subscribed the system or that this machine does not + need to subscribe to rhsm in order to get the RHEL rpm packages. + """ + # No means to authenticate with rhsm. + if not (opts.username and opts.password) and not (opts.activation_key and opts.org): + return False + + # --no-rhsm means that there is no need to use any part of rhsm to + # convert this host. (Usually used when you configure + # your RHEL repos another way, like a local mirror and telling + # convert2rhel about it using --enablerepo) + if opts.no_rhsm: + return False + + return True diff --git a/man/__init__.py b/man/__init__.py index 6c229de033..69e9a7ac2f 100644 --- a/man/__init__.py +++ b/man/__init__.py @@ -15,7 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from convert2rhel import toolopts +from convert2rhel import cli def get_parser(): @@ -27,7 +27,7 @@ def get_parser(): $ python -c 'from convert2rhel import toolopts; print("[synopsis]\n."+toolopts.CLI.usage())' > man/synopsis $ PYTHONPATH=. argparse-manpage --pyfile man/__init__.py --function get_parser --manual-title="General Commands Manual" --description="Automates the conversion of Red Hat Enterprise Linux derivative distributions to Red Hat Enterprise Linux." --project-name "convert2rhel " --prog="convert2rhel" --include man/distribution --include man/synopsis > man/convert2rhel.8 """ - parser = toolopts.CLI()._parser + parser = cli.CLI()._parser # Description taken out of our Confluence page. parser.description = (