diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 1a463066..434441ac 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -14,7 +14,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} diff --git a/mlx/coverity/coverity_services.py b/mlx/coverity/coverity_services.py index 3974ee67..498dbad7 100644 --- a/mlx/coverity/coverity_services.py +++ b/mlx/coverity/coverity_services.py @@ -2,17 +2,14 @@ """Services and other utilities for Coverity scripting""" -# General -from collections import namedtuple import csv import logging import re +from collections import namedtuple from urllib.parse import urlencode -import requests.structures -from sphinx.util.logging import getLogger -# For Coverity - REST API import requests +from sphinx.util.logging import getLogger # Coverity built in Impact statuses IMPACT_LIST = ["High", "Medium", "Low"] @@ -132,7 +129,7 @@ def retrieve_issues(self, filters): """Retrieve issues from the server (Coverity Connect). Args: - filters (json): The filters as json + filters (dict): The filters for the query Returns: dict: The response @@ -206,36 +203,6 @@ def _request(self, url, data=None): self.logger.warning(err_msg) return response.raise_for_status() - @staticmethod - def validate_filter_option(name, req_csv, valid_attributes, allow_regex=False): - """Add filter when the attribute is valid. If `valid_attributes` is empty or falsy, - all attributes of the CSV list are valid. - The CSV list can allow regular expressions when `allow_regex` is set to True. - - Args: - name (str): String representation of the attribute. - req_csv (str): A CSV list of attribute values to query. - valid_attributes (list/dict): The valid attributes. - allow_regex (bool): True to treat filter values as regular expressions, False to require exact matches - - Returns: - list[str]: The list of valid attributes - """ - logging.info("Validate required %s [%s]", name, req_csv) - filter_values = [] - for field in req_csv.split(","): - if not valid_attributes or field in valid_attributes: - logging.info("Classification [%s] is valid", field) - filter_values.append(field) - elif allow_regex: - pattern = re.compile(field) - for element in valid_attributes: - if pattern.search(element) and element not in filter_values: - filter_values.append(element) - else: - logging.error("Invalid %s filter: %s", name, field) - return filter_values - def assemble_query_filter(self, column_name, filter_values, matcher_type): """Assemble a filter for a specific column @@ -259,6 +226,10 @@ def assemble_query_filter(self, column_name, filter_values, matcher_type): else: matcher["key"] = filter_ matchers.append(matcher) + + if column_name not in self.columns: + self.logger.warning(f"Invalid column name {column_name!r}; Retrieve column keys first.") + return { "columnKey": self.columns[column_name], "matchMode": "oneOrMoreMatch", @@ -300,7 +271,7 @@ def get_defects(self, stream, filters, column_names): } ] - Filter = namedtuple("Filter", "name matcher_type list allow_regex", defaults=[None, False]) + Filter = namedtuple("Filter", "name matcher_type values allow_regex", defaults=[[], False]) filter_options = { "checker": Filter("Checker", "keyMatcher", self.checkers, True), "impact": Filter("Impact", "keyMatcher", IMPACT_LIST), @@ -312,17 +283,13 @@ def get_defects(self, stream, filters, column_names): } for option, filter in filter_options.items(): - if filters[option]: - filter_values = self.handle_attribute_filter( - filters[option], filter.name, filter.list, filter.allow_regex - ) + if (filter_option := filters[option]) and (filter_values := self.handle_attribute_filter( + filter_option, filter.name, filter.values, filter.allow_regex)): if filter_values: query_filters.append(self.assemble_query_filter(filter.name, filter_values, filter.matcher_type)) - if filters["component"]: - filter_values = self.handle_component_filter(filters["component"]) - if filter_values: - query_filters.append(self.assemble_query_filter("Component", filter_values, "nameMatcher")) + if (filter := filters["component"]) and (filter_values := self.handle_component_filter(filter)): + query_filters.append(self.assemble_query_filter("Component", filter_values, "nameMatcher")) data = { "filters": query_filters, @@ -342,18 +309,32 @@ def get_defects(self, stream, filters, column_names): logging.info("Running Coverity query...") return self.retrieve_issues(data) - def handle_attribute_filter(self, attribute_values, name, *args, **kwargs): - """Applies any filter on an attribute's values. + def handle_attribute_filter(self, attribute_values, name, valid_attributes, allow_regex=False): + """Process the given CSV list of attribute values by filtering out the invalid ones while logging an error. + The CSV list can allow regular expressions when `allow_regex` is set to True. Args: attribute_values (str): A CSV list of attribute values to query. name (str): String representation of the attribute. + valid_attributes (list/dict): All valid/possible attribute values. + allow_regex (bool): True to treat filter values as regular expressions, False to require exact matches Returns: - list[str]: The list of valid attributes + set[str]: The attributes values to query with """ logging.info("Using %s filter [%s]", name, attribute_values) - filter_values = self.validate_filter_option(name, attribute_values, *args, **kwargs) + filter_values = set() + for field in attribute_values.split(","): + if not valid_attributes or field in valid_attributes: + logging.info("Classification [%s] is valid", field) + filter_values.add(field) + elif allow_regex: + pattern = re.compile(field) + for element in valid_attributes: + if pattern.search(element): + filter_values.add(element) + else: + logging.error("Invalid %s filter: %s", name, field) return filter_values def handle_component_filter(self, attribute_values): diff --git a/setup.py b/setup.py index 41942990..17ba81dc 100644 --- a/setup.py +++ b/setup.py @@ -25,28 +25,30 @@ long_description_content_type="text/x-rst", zip_safe=False, classifiers=[ - "Development Status :: 5 - Production/Stable", - "Environment :: Console", - "Environment :: Web Environment", - "Framework :: Sphinx :: Extension", - "Intended Audience :: Developers", - "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", - "Operating System :: OS Independent", - "Programming Language :: Python", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Topic :: Documentation", - "Topic :: Documentation :: Sphinx", - "Topic :: Utilities", + 'Development Status :: 5 - Production/Stable', + 'Environment :: Console', + 'Environment :: Web Environment', + 'Framework :: Sphinx :: Extension', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)', + 'Operating System :: OS Independent', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', + 'Topic :: Documentation', + 'Topic :: Documentation :: Sphinx', + 'Topic :: Utilities', ], platforms="any", packages=find_namespace_packages(where="."), package_dir={"": "."}, include_package_data=True, install_requires=requires, + python_requires='>=3.8', + namespace_packages=['mlx'], keywords=[ "coverity", "reporting", diff --git a/tests/filters.py b/tests/filters.py new file mode 100644 index 00000000..95711ea3 --- /dev/null +++ b/tests/filters.py @@ -0,0 +1,142 @@ +# filters, columns_names, response data in apart python bestand + +from collections import namedtuple + +Filter = namedtuple("Filter", "filters column_names request_data") + +# Test with no filters and no column names +test_defect_filter_0 = Filter( + { + "checker": None, + "impact": None, + "kind": None, + "classification": None, + "action": None, + "component": None, + "cwe": None, + "cid": None, + }, + [], + { + "filters": [ + { + "columnKey": "streams", + "matchMode": "oneOrMoreMatch", + "matchers": [{"class": "Stream", "name": "test_stream", "type": "nameMatcher"}], + } + ], + "columns": ["cid"], + "snapshotScope": { + "show": {"scope": "last()", "includeOutdatedSnapshots": False}, + "compareTo": {"scope": "last()", "includeOutdatedSnapshots": False}, + }, + }, +) + +test_defect_filter_1 = Filter( + { + "checker": "MISRA", + "impact": None, + "kind": None, + "classification": "Intentional,Bug,Pending,Unclassified", + "action": None, + "component": None, + "cwe": None, + "cid": None, + }, + ["CID", "Classification", "Checker", "Comment"], + { + "filters": [ + { + "columnKey": "streams", + "matchMode": "oneOrMoreMatch", + "matchers": [{"class": "Stream", "name": "test_stream", "type": "nameMatcher"}], + }, + { + "columnKey": "checker", + "matchMode": "oneOrMoreMatch", + "matchers": [ + {"type": "keyMatcher", "key": "MISRA 2 KEY"}, + {"type": "keyMatcher", "key": "MISRA 1"}, + {"type": "keyMatcher", "key": "MISRA 3"}, + ], + }, + { + "columnKey": "classification", + "matchMode": "oneOrMoreMatch", + "matchers": [ + {"type": "keyMatcher", "key": "Bug"}, + {"type": "keyMatcher", "key": "Pending"}, + {"type": "keyMatcher", "key": "Unclassified"}, + {"type": "keyMatcher", "key": "Intentional"}, + ], + }, + ], + "columns": ["cid", "checker", "lastTriageComment", "classification"], + "snapshotScope": { + "show": {"scope": "last()", "includeOutdatedSnapshots": False}, + "compareTo": {"scope": "last()", "includeOutdatedSnapshots": False}, + }, + }, +) + +test_defect_filter_2 = Filter( + { + "checker": None, + "impact": None, + "kind": None, + "classification": None, + "action": None, + "component": None, + "cwe": None, + "cid": None, + }, + ["CID", "Checker", "Status", "Comment"], + { + "filters": [ + { + "columnKey": "streams", + "matchMode": "oneOrMoreMatch", + "matchers": [{"class": "Stream", "name": "test_stream", "type": "nameMatcher"}], + } + ], + "columns": ["status", "cid", "checker", "lastTriageComment"], + "snapshotScope": { + "show": {"scope": "last()", "includeOutdatedSnapshots": False}, + "compareTo": {"scope": "last()", "includeOutdatedSnapshots": False}, + }, + }, +) + +test_defect_filter_3 = Filter( + { + "checker": None, + "impact": None, + "kind": None, + "classification": "Unclassified", + "action": None, + "component": None, + "cwe": None, + "cid": None, + }, + ["CID", "Classification", "Action"], + { + "filters": [ + { + "columnKey": "streams", + "matchMode": "oneOrMoreMatch", + "matchers": [{"class": "Stream", "name": "test_stream", "type": "nameMatcher"}], + }, + { + "columnKey": "classification", + "matchMode": "oneOrMoreMatch", + "matchers": [{"type": "keyMatcher", "key": "Unclassified"}], + }, + ], + "columns": ["cid", "classification", "action"], + "snapshotScope": { + "show": {"scope": "last()", "includeOutdatedSnapshots": False}, + "compareTo": {"scope": "last()", "includeOutdatedSnapshots": False}, + }, + }, +) diff --git a/tests/test_coverity.py b/tests/test_coverity.py index 3924ec48..7b0b9397 100644 --- a/tests/test_coverity.py +++ b/tests/test_coverity.py @@ -1,81 +1,56 @@ from unittest import TestCase - -try: - from unittest.mock import MagicMock, patch -except ImportError: - from mock import MagicMock, patch +from unittest.mock import MagicMock, patch import json import requests import requests_mock from urllib.parse import urlencode +from pathlib import Path +from parameterized import parameterized -import mlx.coverity.coverity -import mlx.coverity.coverity_services +from mlx.coverity import SphinxCoverityConnector +from mlx.coverity_services import CoverityDefectService +from .filters import test_defect_filter_0, test_defect_filter_1, test_defect_filter_2, test_defect_filter_3 -class TestCoverity(TestCase): - def setUp(self): - """SetUp to be run before each test to provide clean working env""" +TEST_FOLDER = Path(__file__).parent - @patch("mlx.coverity.coverity_services.requests") - def test_session_login(self, mock_requests): - """Test login function of CoverityDefectService""" - mock_requests.return_value = MagicMock(spec=requests) - # Get the base url - coverity_conf_service = mlx.coverity.coverity_services.CoverityDefectService("scan.coverity.com/") - self.assertEqual("https://scan.coverity.com/api/v2", coverity_conf_service.api_endpoint) +def ordered(obj): + if isinstance(obj, dict): + return sorted((k, ordered(v)) for k, v in obj.items()) + if isinstance(obj, list): + return sorted(ordered(x) for x in obj) + else: + return obj - # Login to Coverity - coverity_conf_service.login("user", "password") - mock_requests.Session.assert_called_once() - @patch.object(mlx.coverity.coverity_services.requests.Session, "get") - def test_retrieve_checkers(self, mock_get): - """Test retrieving checkers (CoverityDefectService)""" - coverity_conf_service = mlx.coverity.coverity_services.CoverityDefectService("scan.coverity.com/") +class TestCoverity(TestCase): + def setUp(self): + """SetUp to be run before each test to provide clean working env""" + self.fake_stream = "test_stream" + + def initialize_coverity_service(self, login=False): + """Logs in Coverity Service and initializes the urls used for REST API. - # Login to Coverity - coverity_conf_service.login("user", "password") - - with open("tests/columns_keys.json", "rb") as content: - mock_get.return_value.content = content.read() - mock_get.return_value.ok = True - coverity_conf_service.retrieve_checkers() - mock_get.assert_called_once() - - def test_get_defects(self): - filters = { - "checker": None, - "impact": None, - "kind": None, - "classification": None, - "action": None, - "component": None, - "cwe": None, - "cid": None, - } - fake_json = {"test": "succes"} - fake_checkers = { - "checkerAttribute": {"name": "checker", "displayName": "Checker"}, - "checkerAttributedata": [{"key": "checker_key", "value": "checker_value"}], - } - fake_stream = "test_stream" - coverity_conf_service = mlx.coverity.coverity.CoverityDefectService("scan.coverity.com/") + Returns: + CoverityDefectService: The coverity defect service + """ + coverity_service = CoverityDefectService("scan.coverity.com/") - # Login to Coverity - coverity_conf_service.login("user", "password") + if login: + # Login to Coverity + coverity_service.login("user", "password") # urls that are used in GET or POST requests - endpoint = coverity_conf_service.api_endpoint + endpoint = coverity_service.api_endpoint params = { "queryType": "bySnapshot", "retrieveGroupByColumns": "false" } - column_keys_url = f"{endpoint}/issues/columns?{urlencode(params)}" - checkers_url = f"{endpoint}/checkerAttributes/checker" - stream_url = f"{endpoint}/streams/{fake_stream}" + self.column_keys_url = f"{endpoint}/issues/columns?{urlencode(params)}" + self.checkers_url = f"{endpoint}/checkerAttributes/checker" + self.stream_url = f"{endpoint}/streams/{self.fake_stream}" params = { "includeColumnLabels": "true", "offset": 0, @@ -83,45 +58,127 @@ def test_get_defects(self): "rowCount": -1, "sortOrder": "asc", } - issues_url = f"{endpoint}/issues/search?{urlencode(params)}" + self.issues_url = f"{endpoint}/issues/search?{urlencode(params)}" + + return coverity_service + def test_session_by_stream_validation(self): + coverity_service = self.initialize_coverity_service(login=False) with requests_mock.mock() as mocker: - with open("tests/columns_keys.json", "r") as content: - column_keys = json.loads(content.read()) - mocker.get(column_keys_url, json=column_keys) - mocker.get(checkers_url, json=fake_checkers) - mocker.get(stream_url, json={"stream": "valid"}) - mocker.post(issues_url, json=fake_json) + mocker.get(self.stream_url, json={}) + # Login to Coverity + coverity_service.login("user", "password") + coverity_service.validate_stream(self.fake_stream) + stream_request = mocker.last_request + assert stream_request.headers["Authorization"] == requests.auth._basic_auth_str("user", "password") + + @patch("mlx.coverity_services.requests") + def test_stream_validation(self, mock_requests): + mock_requests.return_value = MagicMock(spec=requests) + # Get the base url + coverity_service = CoverityDefectService("scan.coverity.com/") + # Login to Coverity + coverity_service.login("user", "password") + with patch.object(CoverityDefectService, "_request") as mock_method: # Validate stream name - coverity_conf_service.validate_stream(fake_stream) - # Retrieve column keys - assert coverity_conf_service.retrieve_column_keys()["Issue Kind"] == "displayIssueKind" - assert coverity_conf_service.retrieve_column_keys()["CID"] == "cid" - # Retrieve checkers - assert coverity_conf_service.retrieve_checkers() == ["checker_key"] + coverity_service.validate_stream(self.fake_stream) + mock_method.assert_called_once() + mock_method.assert_called_with("https://scan.coverity.com/api/v2/streams/test_stream") + + def test_retrieve_columns(self): + with open(f"{TEST_FOLDER}/columns_keys.json", "r") as content: + column_keys = json.loads(content.read()) + # initialize what needed for the REST API + coverity_service = self.initialize_coverity_service(login=True) + with requests_mock.mock() as mocker: + mocker.get(self.column_keys_url, json=column_keys) + coverity_service.retrieve_column_keys() + assert mocker.call_count == 1 + mock_request = mocker.last_request + assert mock_request.method == "GET" + assert mock_request.url == self.column_keys_url + assert mock_request.verify + assert coverity_service.columns["Issue Kind"] == "displayIssueKind" + assert coverity_service.columns["CID"] == "cid" + + def test_retrieve_checkers(self): + self.fake_checkers = { + "checkerAttribute": {"name": "checker", "displayName": "Checker"}, + "checkerAttributedata": [ + {"key": "MISRA", "value": "MISRA"}, + {"key": "CHECKER", "value": "CHECKER"} + ], + } + # initialize what needed for the REST API + coverity_service = self.initialize_coverity_service(login=True) + + with requests_mock.mock() as mocker: + mocker.get(self.checkers_url, json=self.fake_checkers) + coverity_service.retrieve_checkers() + assert mocker.call_count == 1 + mock_request = mocker.last_request + assert mock_request.method == "GET" + assert mock_request.url == self.checkers_url + assert mock_request.verify + assert coverity_service.checkers == ["MISRA", "CHECKER"] + + @parameterized.expand([ + test_defect_filter_0, + test_defect_filter_1, + test_defect_filter_2, + test_defect_filter_3, + ]) + def test_get_defects(self, filters, column_names, request_data): + with open(f"{TEST_FOLDER}/columns_keys.json", "r") as content: + column_keys = json.loads(content.read()) + self.fake_checkers = { + "checkerAttribute": {"name": "checker", "displayName": "Checker"}, + "checkerAttributedata": [ + {"key": "MISRA 1", "value": "M 1"}, + {"key": "MISRA 2 KEY", "value": "MISRA 2 VALUE"}, + {"key": "MISRA 3", "value": "M 3"}, + {"key": "C 1", "value": "CHECKER 1"}, + {"key": "C 2", "value": "CHECKER 2"} + ], + } + # initialize what needed for the REST API + coverity_service = self.initialize_coverity_service(login=True) + + with requests_mock.mock() as mocker: + mocker.get(self.column_keys_url, json=column_keys) + mocker.get(self.checkers_url, json=self.fake_checkers) + # Retrieve checkers; required for get_defects() + coverity_service.retrieve_checkers() + # Retreive columns; required for get_defects() + coverity_service.retrieve_column_keys() # Get defects - assert coverity_conf_service.get_defects(fake_stream, filters, column_names=["CID"]) == fake_json - # Total amount of request are 4 => column keys, checkers, stream and defects/issues - assert mocker.call_count == 4 - - # check get requests - get_urls = [stream_url, column_keys_url, checkers_url] - for index in range(len(get_urls)): - mock_req = mocker.request_history[index] - assert mock_req.url == get_urls[index] - assert mock_req.verify - assert mock_req.headers["Authorization"] == requests.auth._basic_auth_str("user", "password") - # check post request (last request) - assert mocker.last_request.url == issues_url - assert mocker.last_request.verify - assert mocker.last_request.headers["Authorization"] == requests.auth._basic_auth_str("user", "password") + with patch.object(CoverityDefectService, "retrieve_issues") as mock_method: + coverity_service.get_defects(self.fake_stream, filters, column_names) + data = mock_method.call_args[0][0] + mock_method.assert_called_once() + assert ordered(data) == ordered(request_data) + + def test_get_filtered_defects(self): + sphinx_coverity_connector = SphinxCoverityConnector() + sphinx_coverity_connector.coverity_service = self.initialize_coverity_service(login=False) + sphinx_coverity_connector.stream = self.fake_stream + node_filters = { + "checker": "MISRA", "impact": None, "kind": None, + "classification": "Intentional,Bug,Pending,Unclassified", "action": None, "component": None, + "cwe": None, "cid": None + } + column_names = {"Comment", "Checker", "Classification", "CID"} + fake_node = {"col": column_names, + "filters": node_filters} - def test_failed_login(self): - fake_stream = "test_stream" + with patch.object(CoverityDefectService, "get_defects") as mock_method: + sphinx_coverity_connector.get_filtered_defects(fake_node) + mock_method.assert_called_once_with(self.fake_stream, fake_node["filters"], column_names) - coverity_conf_service = mlx.coverity.coverity.CoverityDefectService("scan.coverity.com/") - stream_url = f"{coverity_conf_service.api_endpoint.rstrip('/')}/streams/{fake_stream}" + def test_failed_login(self): + coverity_conf_service = CoverityDefectService("scan.coverity.com/") + stream_url = f"{coverity_conf_service.api_endpoint}/streams/{self.fake_stream}" with requests_mock.mock() as mocker: mocker.get(stream_url, headers={"Authorization": "Basic fail"}, status_code=401) @@ -129,5 +186,5 @@ def test_failed_login(self): coverity_conf_service.login("user", "password") # Validate stream name with self.assertRaises(requests.HTTPError) as err: - coverity_conf_service.validate_stream(fake_stream) + coverity_conf_service.validate_stream(self.fake_stream) self.assertEqual(err.exception.response.status_code, 401) diff --git a/tox.ini b/tox.ini index 3ab6cb68..1a5c0504 100644 --- a/tox.ini +++ b/tox.ini @@ -1,26 +1,26 @@ [tox] envlist = - py37, py38, py39, py310, py311 + py38, py39, py310, py311, py312 clean, check, [gh-actions] python = - 3.7: py37 3.8: py38 3.9: py39 3.10: py310 3.11: py311 + 3.12: py312 [testenv] basepython = py: python3 pypy: {env:TOXPYTHON:pypy} - py37: {env:TOXPYTHON:python3.7} py38: {env:TOXPYTHON:python3.8} py39: {env:TOXPYTHON:python3.9} py310: {env:TOXPYTHON:python3.10} py311: {env:TOXPYTHON:python3.11} + py312: {env:TOXPYTHON:python3.12} {clean,test,html,latexpdf,check,report,coverage}: python3 setenv = PYTHONPATH={toxinidir}/tests @@ -39,6 +39,7 @@ deps= sphinx-testing >= 0.5.2 sphinx_selective_exclude sphinx_rtd_theme + parameterized python-decouple urlextract setuptools_scm