diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 64cf0adb..434441ac 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -74,11 +74,11 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install setuptools wheel twine + pip install build twine - name: Build and publish env: TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} run: | - python setup.py sdist bdist_wheel + python -m build twine upload dist/* diff --git a/MANIFEST.in b/MANIFEST.in index 2af1bd1d..f2d3fbf6 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -17,6 +17,7 @@ recursive-include example Makefile recursive-include tests *.py recursive-include tests *.rst recursive-include tests *.txt +recursive-include tests *.json exclude example/.env.example exclude .pylintrc diff --git a/README.rst b/README.rst index 0bc594cb..c31b92f9 100644 --- a/README.rst +++ b/README.rst @@ -92,8 +92,6 @@ Example of custom credentials for the plugin: coverity_credentials = { 'hostname': 'scan.coverity.com', - 'port': '8080', - 'transport': 'http', 'username': 'reporter', 'password': 'coverity', 'stream': 'some_coverity_stream', @@ -134,8 +132,6 @@ The plugin itself holds a default config that can be used for any Coverity proje coverity_credentials = { 'hostname': 'scan.coverity.com', - 'port': '8080', - 'transport': 'http', 'username': 'reporter', 'password': 'coverity', 'stream': 'some_coverity_stream', diff --git a/example/Makefile b/example/Makefile index a41609a8..0e78d882 100644 --- a/example/Makefile +++ b/example/Makefile @@ -8,10 +8,17 @@ PYTHONWARNINGS?= default::DeprecationWarning PAPER ?= BUILDDIR ?= _build +# logging variables +DEBUG ?= 0 +LOGLEVEL =? WARNING + # Internal variables. PAPEROPT_a4 = -D latex_paper_size=a4 PAPEROPT_letter = -D latex_paper_size=letter ALLSPHINXOPTS = -E -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . +ifeq (${DEBUG}, 1) +ALLSPHINXOPTS += -T +endif # the i18n builder cannot share the environment and doctrees with the others I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . @@ -43,6 +50,7 @@ clean: -rm -rf $(BUILDDIR)/* html: + export LOGLEVEL=$(LOGLEVEL) $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html @echo @echo "Build finished. The HTML pages are in $(BUILDDIR)/html." diff --git a/example/conf.py b/example/conf.py index cddf0583..acdc9cf7 100644 --- a/example/conf.py +++ b/example/conf.py @@ -17,14 +17,16 @@ import mlx.coverity import mlx.traceability from decouple import config +import logging +from sphinx.util.logging import getLogger from pkg_resources import get_distribution -pkg_version = get_distribution('mlx.coverity').version +pkg_version = get_distribution("mlx.coverity").version # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. -sys.path.insert(0, os.path.abspath('../mlx')) +sys.path.insert(0, os.path.abspath("../mlx")) # -- General configuration ----------------------------------------------------- @@ -34,34 +36,34 @@ # Add any Sphinx extension module names here, as strings. They can be extensions # coming with Sphinx (named 'sphinx.ext.*') or your custom ones. extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.doctest', - 'sphinx.ext.coverage', - 'sphinx.ext.ifconfig', - 'sphinx.ext.viewcode', - 'sphinx.ext.graphviz', - 'mlx.traceability', - 'mlx.coverity', - 'sphinx_selective_exclude.eager_only', - 'sphinx_selective_exclude.modindex_exclude', - 'sphinx_selective_exclude.search_auto_exclude' + "sphinx.ext.autodoc", + "sphinx.ext.doctest", + "sphinx.ext.coverage", + "sphinx.ext.ifconfig", + "sphinx.ext.viewcode", + "sphinx.ext.graphviz", + "mlx.traceability", + "mlx.coverity", + "sphinx_selective_exclude.eager_only", + "sphinx_selective_exclude.modindex_exclude", + "sphinx_selective_exclude.search_auto_exclude", ] # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # The suffix of source filenames. -source_suffix = '.rst' +source_suffix = ".rst" # The encoding of source files. #source_encoding = 'utf-8-sig' # The master toctree document. -master_doc = 'index' +master_doc = "index" # General information about the project. -project = u'Example' -copyright = u'2017, Stein Heselmans' +project = "Example" +copyright = "2017, Stein Heselmans" # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the @@ -84,7 +86,7 @@ # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. -exclude_patterns = ['_build'] +exclude_patterns = ["_build"] # The reST default role (used for this markup: `text`) to use for all documents. #default_role = None @@ -101,7 +103,7 @@ #show_authors = False # The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' +pygments_style = "sphinx" # A list of ignored prefixes for module index sorting. #modindex_common_prefix = [] @@ -111,7 +113,7 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. -html_theme = 'sphinx_rtd_theme' +html_theme = "sphinx_rtd_theme" # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the @@ -140,8 +142,8 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] -html_static_path.append(os.path.join(os.path.dirname(mlx.traceability.__file__), 'assets')) +html_static_path = ["_static"] +html_static_path.append(os.path.join(os.path.dirname(mlx.traceability.__file__), "assets")) # If not '', a 'Last updated on:' timestamp is inserted at every page bottom, # using the given strftime format. @@ -185,25 +187,22 @@ #html_file_suffix = None # Output file base name for HTML help builder. -htmlhelp_basename = 'Exampledoc' +htmlhelp_basename = "Exampledoc" # -- Options for LaTeX output -------------------------------------------------- latex_elements = { # The paper size ('letterpaper' or 'a4paper'). - 'papersize': 'a4paper', - + "papersize": "a4paper", # The font size ('10pt', '11pt' or '12pt'). - 'pointsize': '10pt', - + "pointsize": "10pt", # Additional stuff for the LaTeX preamble. - 'preamble': r'\setcounter{tocdepth}{3}\usepackage{pdflscape}', + "preamble": r"\setcounter{tocdepth}{3}\usepackage{pdflscape}", } # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, author, documentclass [howto/manual]). latex_documents = [ - ('index', 'Example.tex', u'Example Documentation', - u'Crt Mori', 'manual'), + ("index", "Example.tex", "Example Documentation", "Crt Mori", "manual"), ] # The name of an image file (relative to this directory) to place at the top of @@ -231,10 +230,7 @@ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). -man_pages = [ - ('index', 'example', u'Example Documentation', - [u'Crt Mori'], 1) -] +man_pages = [("index", "example", "Example Documentation", ["Crt Mori"], 1)] # If true, show URL addresses after external links. #man_show_urls = False @@ -246,9 +242,15 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - ('index', 'Example', u'Example Documentation', - u'Crt Mori', 'Example', 'One line description of project.', - 'Miscellaneous'), + ( + "index", + "Example", + "Example Documentation", + "Crt Mori", + "Example", + "One line description of project.", + "Miscellaneous", + ), ] # Documents to append as an appendix to all manuals. @@ -264,10 +266,10 @@ # -- Options for Epub output --------------------------------------------------- # Bibliographic Dublin Core info. -epub_title = u'Example' -epub_author = u'Crt Mori' -epub_publisher = u'Melexis' -epub_copyright = u'2018, Crt Mori' +epub_title = "Example" +epub_author = "Crt Mori" +epub_publisher = "Melexis" +epub_copyright = "2018, Crt Mori" # The language of the text. It defaults to the language option # or en if the language is not set. @@ -305,17 +307,25 @@ # Example configuration for intersphinx: refer to the Python standard library. -intersphinx_mapping = {'http://docs.python.org/': None} +intersphinx_mapping = {"http://docs.python.org/": None} # -- Options for coverity extension ---------------------------------------- coverity_credentials = { - 'transport': 'http', - 'port': '8080', - 'hostname': 'coverity.melexis.com', - 'username': config('COVERITY_USERNAME'), - 'password': config('COVERITY_PASSWORD'), - 'stream': config('COVERITY_STREAM'), + "hostname": "coverity.melexis.com", + "username": config("COVERITY_USERNAME"), + "password": config("COVERITY_PASSWORD"), + "stream": config("COVERITY_STREAM"), } TRACEABILITY_ITEM_ID_REGEX = r"([A-Z_]+-[A-Z0-9_]+)" TRACEABILITY_ITEM_RELINK = {} + +log_level = os.environ.get('LOGLEVEL', None) +if log_level: + try: + numeric_level = getattr(logging, log_level.upper(), None) + logger = getLogger("mlx.coverity_logging") + logger.setLevel(level=numeric_level) + except: + raise ValueError(f"Invalid log level: {log_level}") + diff --git a/mlx/__init__.py b/mlx/__init__.py index 7933d729..622bca20 100644 --- a/mlx/__init__.py +++ b/mlx/__init__.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- -__import__('pkg_resources').declare_namespace(__name__) +__import__("pkg_resources").declare_namespace(__name__) diff --git a/mlx/coverity.py b/mlx/coverity.py index bfc6e000..5e198246 100644 --- a/mlx/coverity.py +++ b/mlx/coverity.py @@ -1,81 +1,80 @@ # -*- coding: utf-8 -*- -''' +""" Coverity plugin Sphinx extension for restructured text that adds Coverity reporting to documentation. See README.rst for more details. -''' +""" + from getpass import getpass from urllib.error import URLError, HTTPError +from docutils import nodes import pkg_resources from mlx.coverity_logging import report_info, report_warning -from mlx.coverity_services import CoverityConfigurationService, CoverityDefectService -from mlx.coverity_directives.coverity_defect_list import CoverityDefect, CoverityDefectListDirective +from mlx.coverity_services import CoverityDefectService +from mlx.coverity_directives.coverity_defect_list import ( + CoverityDefect, + CoverityDefectListDirective, +) -class SphinxCoverityConnector(): +class SphinxCoverityConnector: """ Class containing functions and variables for Sphinx to access in specific stages of the documentation build. """ - project_name = '' - coverity_service = None def __init__(self): """ Initialize the object by setting error variable to false """ self.coverity_login_error = False - self.coverity_login_error_msg = '' - self.stream = '' + self.coverity_login_error_msg = "" def initialize_environment(self, app): """ Perform initializations needed before the build process starts. """ # LaTeX-support: since we generate empty tags, we need to relax the verbosity of that error - if 'preamble' not in app.config.latex_elements: - app.config.latex_elements['preamble'] = '' - app.config.latex_elements['preamble'] += '''\ + if "preamble" not in app.config.latex_elements: + app.config.latex_elements["preamble"] = "" + app.config.latex_elements["preamble"] += r""" \\makeatletter \\let\@noitemerr\\relax - \\makeatother''' + \\makeatother""" - self.stream = app.config.coverity_credentials['stream'] + self.stream = app.config.coverity_credentials["stream"] # Login to Coverity and obtain stream information try: self.input_credentials(app.config.coverity_credentials) - report_info('Login to Coverity server... ', True) - coverity_conf_service = CoverityConfigurationService(app.config.coverity_credentials['transport'], - app.config.coverity_credentials['hostname'], - app.config.coverity_credentials['port']) - coverity_conf_service.login(app.config.coverity_credentials['username'], - app.config.coverity_credentials['password']) - report_info('done') - - report_info('obtaining stream information... ', True) - stream = coverity_conf_service.get_stream(self.stream) - if stream is None: - raise ValueError('No such Coverity stream [%s] found on [%s]' % - (self.stream, coverity_conf_service.get_service_url())) - report_info('done') - - # Get Stream's project name - report_info('obtaining project name from stream... ', True) - self.project_name = coverity_conf_service.get_project_name(stream) - report_info('done') - self.coverity_service = CoverityDefectService(coverity_conf_service) - self.coverity_service.login(app.config.coverity_credentials['username'], - app.config.coverity_credentials['password']) + report_info("Initialize a session on Coverity server... ", True) + self.coverity_service = CoverityDefectService( + app.config.coverity_credentials["hostname"], + ) + self.coverity_service.login( + app.config.coverity_credentials["username"], app.config.coverity_credentials["password"] + ) + report_info("done") + report_info("Verify the given stream name... ", True) + self.coverity_service.validate_stream(self.stream) + report_info("done") + # Get all column keys + report_info("obtaining all column keys... ", True) + self.coverity_service.retrieve_column_keys() + report_info("done") + # Get all checkers + report_info("obtaining all checkers... ", True) + self.coverity_service.retrieve_checkers() + report_info("done") except (URLError, HTTPError, Exception, ValueError) as error_info: # pylint: disable=broad-except if isinstance(error_info, EOFError): self.coverity_login_error_msg = "Coverity credentials are not configured." else: self.coverity_login_error_msg = str(error_info) - report_info('failed with: %s' % error_info) + report_info("failed with: %s" % error_info) self.coverity_login_error = True # ----------------------------------------------------------------------------- @@ -91,7 +90,7 @@ def process_coverity_nodes(self, app, doctree, fromdocname): for node in doctree.traverse(CoverityDefect): top_node = node.create_top_node("Failed to connect to Coverity Server") node.replace_self(top_node) - report_warning('Connection failed: %s' % self.coverity_login_error_msg, fromdocname) + report_warning("Connection failed: %s" % self.coverity_login_error_msg, fromdocname) return # Item matrix: @@ -101,75 +100,88 @@ def process_coverity_nodes(self, app, doctree, fromdocname): # Get items from server try: defects = self.get_filtered_defects(node) + node.perform_replacement(defects, self, app, fromdocname) except (URLError, AttributeError, Exception) as err: # pylint: disable=broad-except - report_warning('failed with %s' % err, fromdocname) + error_message = f"failed to process coverity-list with {err!r}" + report_warning(error_message, fromdocname, lineno=node["line"]) + top_node = node.create_top_node(node["title"]) + top_node += nodes.paragraph(text=error_message) + node.replace_self(top_node) continue - node.perform_replacement(defects, self, app, fromdocname) # ----------------------------------------------------------------------------- # Helper functions of event handlers @staticmethod def input_credentials(config_credentials): - """ Ask user to input username and/or password if they haven't been configured yet. + """Ask user to input username and/or password if they haven't been configured yet. Args: config_credentials (dict): Dictionary to store the user's credentials. """ - if not config_credentials['username']: - config_credentials['username'] = input("Coverity username: ") - if not config_credentials['password']: - config_credentials['password'] = getpass("Coverity password: ") + if not config_credentials["username"]: + config_credentials["username"] = input("Coverity username: ") + if not config_credentials["password"]: + config_credentials["password"] = getpass("Coverity password: ") def get_filtered_defects(self, node): - """ Fetch defects from suds using filters stored in the given CoverityDefect object. + """Fetch defects from REST API using filters stored in the given CoverityDefect object. Args: node (CoverityDefect): CoverityDefect object with zero or more filters stored. Returns: - (suds.sudsobject.mergedDefectsPageDataObj) Suds mergedDefectsPageDataObj object containing filtered defects. + dict: The content of the request to retrieve defects. This has a structure like: + { + "offset": 0, + "totalRows": 2720, + "columns": [list of column keys] + "rows": [list of dictionaries {"key": , "value": }] + } """ - report_info('obtaining defects... ', True) - defects = self.coverity_service.get_defects(self.project_name, self.stream, node['filters']) - report_info("%d received" % (defects['totalNumberOfRecords'])) + report_info("obtaining defects... ", True) + column_names = set(node["col"]) + if "chart_attribute" in node and node["chart_attribute"].upper() in node.column_map: + column_names.add(node["chart_attribute"]) + defects = self.coverity_service.get_defects(self.stream, node["filters"], column_names) + report_info("%d received" % (defects["totalRows"])) report_info("building defects table and/or chart... ", True) return defects # Extension setup def setup(app): - '''Extension setup''' + """Extension setup""" # Create default configuration. Can be customized in conf.py - app.add_config_value('coverity_credentials', - { - 'hostname': 'scan.coverity.com', - 'port': '8080', - 'transport': 'http', - 'username': 'reporter', - 'password': 'coverity', - 'stream': 'some_coverty_stream', - }, - 'env') - - app.add_config_value('TRACEABILITY_ITEM_ID_REGEX', r"([A-Z_]+-[A-Z0-9_]+)", 'env') - app.add_config_value('TRACEABILITY_ITEM_RELINK', {}, 'env') + app.add_config_value( + "coverity_credentials", + { + "hostname": "scan.coverity.com", + "username": "reporter", + "password": "coverity", + "stream": "some_stream", + }, + "env", + ) + + app.add_config_value("TRACEABILITY_ITEM_ID_REGEX", r"([A-Z_]+-[A-Z0-9_]+)", "env") + app.add_config_value("TRACEABILITY_ITEM_RELINK", {}, "env") app.add_node(CoverityDefect) sphinx_coverity_connector = SphinxCoverityConnector() - app.add_directive('coverity-list', CoverityDefectListDirective) + app.add_directive("coverity-list", CoverityDefectListDirective) - app.connect('doctree-resolved', sphinx_coverity_connector.process_coverity_nodes) + app.connect("doctree-resolved", sphinx_coverity_connector.process_coverity_nodes) - app.connect('builder-inited', sphinx_coverity_connector.initialize_environment) + app.connect("builder-inited", sphinx_coverity_connector.initialize_environment) try: - version = pkg_resources.require('mlx.coverity')[0].version + version = pkg_resources.require("mlx.coverity")[0].version except LookupError: - version = 'dev' + version = "dev" return { - 'version': version, - 'parallel_read_safe': True, - 'parallel_write_safe': True, + "version": version, + "parallel_read_safe": True, + "parallel_write_safe": True, } diff --git a/mlx/coverity_directives/coverity_defect_list.py b/mlx/coverity_directives/coverity_defect_list.py index 777fbf58..89e2bcd1 100644 --- a/mlx/coverity_directives/coverity_defect_list.py +++ b/mlx/coverity_directives/coverity_defect_list.py @@ -1,4 +1,5 @@ -""" Module for the CoverityDefect class along with its directive. """ +"""Module for the CoverityDefect class along with its directive.""" + from hashlib import sha256 from os import environ, path from pathlib import Path @@ -6,8 +7,9 @@ from docutils import nodes from docutils.parsers.rst import Directive, directives import matplotlib as mpl -if not environ.get('DISPLAY'): - mpl.use('Agg') + +if not environ.get("DISPLAY"): + mpl.use("Agg") import matplotlib.pyplot as plt from mlx.coverity_logging import report_info, report_warning @@ -15,57 +17,58 @@ def pct_wrapper(sizes): - """ Helper function for matplotlib which returns the percentage and the absolute size of the slice. + """Helper function for matplotlib which returns the percentage and the absolute size of the slice. Args: sizes (list): List containing the amount of elements per slice. """ + def make_pct(pct): absolute = int(round(pct / 100 * sum(sizes))) return "{:.0f}%\n({:d})".format(pct, absolute) + return make_pct class CoverityDefect(ItemElement): """Coverity defect""" - stream = '' + stream = "" coverity_service = None tbody = None chart_labels = {} filters = { - 'checker': None, - 'impact': None, - 'kind': None, - 'classification': None, - 'action': None, - 'component': None, - 'cwe': None, - 'cid': None, + "checker": None, + "impact": None, + "kind": None, + "classification": None, + "action": None, + "component": None, + "cwe": None, + "cid": None, } column_map = { - 'CID': 'cid', - 'CATEGORY': 'displayCategory', - 'IMPACT': 'displayImpact', - 'ISSUE': 'displayIssueKind', - 'TYPE': 'displayType', - 'CHECKER': 'checkerName', - 'COMPONENT': 'componentName', + "CID": "cid", + "CATEGORY": "displayCategory", + "IMPACT": "displayImpact", + "ISSUE": "displayIssueKind", + "TYPE": "displayType", + "CHECKER": "checker", + "COMPONENT": "displayComponent", } defect_states_map = { - 'COMMENT': 'Comment', - 'REFERENCE': 'Ext. Reference', - 'CLASSIFICATION': 'Classification', - 'ACTION': 'Action', - 'STATUS': 'DefectStatus', + "COMMENT": "lastTriageComment", + "REFERENCE": "externalReference", + "CLASSIFICATION": "classification", + "ACTION": "action", + "STATUS": "status", } def perform_replacement(self, defects, connector, app, fromdocname): - """ Replaces the empty node with a fully built CoverityDefect based on the given defects. + """Replaces the empty node with a fully built CoverityDefect based on the given defects. Args: - defects (suds.sudsobject.mergedDefectsPageDataObj): Suds mergedDefectsPageDataObj object containing filtered - defects. + defects (dict): filtered defects connector (SphinxCoverityConnector): Object containing the stream and CoverityDefectService object in use. app (sphinx.application.Sphinx): Sphinx' application object. fromdocname (str): Relative path to the document in which the error occured, without extension. @@ -73,56 +76,56 @@ def perform_replacement(self, defects, connector, app, fromdocname): env = app.builder.env self.stream = connector.stream self.coverity_service = connector.coverity_service - top_node = self.create_top_node(self['title']) + top_node = self.create_top_node(self["title"]) # Initialize table and dictionaries to store counters and labels for pie chart - if self['col']: + if self["col"]: table = self.initialize_table() - if isinstance(self['chart'], list): - combined_labels = self.initialize_labels(self['chart'], fromdocname) + if isinstance(self["chart"], list): + combined_labels = self.initialize_labels(self["chart"], fromdocname) # Fill table and increase counters for pie chart try: - self.fill_table_and_count_attributes(defects['mergedDefects'], app, fromdocname) + self.fill_table_and_count_attributes(defects["rows"], self.coverity_service.columns, app, fromdocname) except AttributeError as err: - report_info('No issues matching your query or empty stream. %s' % err) - top_node += nodes.paragraph(text='No issues matching your query or empty stream') + report_info("No issues matching your query or empty stream. %s" % err) + top_node += nodes.paragraph(text="No issues matching your query or empty stream") # don't generate empty pie chart image self.replace_self(top_node) return - if self['col']: + if self["col"]: top_node += table - if isinstance(self['chart'], list): - self._prepare_labels_and_values(combined_labels, defects['totalNumberOfRecords']) + if isinstance(self["chart"], list): + self._prepare_labels_and_values(combined_labels, defects["totalRows"]) top_node += self.build_pie_chart(env) report_info("done") self.replace_self(top_node) def initialize_table(self): - """ Initializes a table node. + """Initializes a table node. Returns: (nodes.table) A table node initialized with column widths and a table header. """ table = nodes.table() - table['classes'].append('longtable') - if self['widths'] == 'auto': - table['classes'].append('colwidths-auto') - elif self['widths']: # "grid" or list of integers - table['classes'].append('colwidths-given') + table["classes"].append("longtable") + if self["widths"] == "auto": + table["classes"].append("colwidths-auto") + elif self["widths"]: # "grid" or list of integers + table["classes"].append("colwidths-given") tgroup = nodes.tgroup() - for _ in self['col']: + for _ in self["col"]: tgroup += [nodes.colspec(colwidth=5)] - tgroup += nodes.thead('', self.create_row(self['col'])) + tgroup += nodes.thead("", self.create_row(self["col"])) - if isinstance(self['widths'], list): - colspecs = [child for child in tgroup.children if child.tagname == 'colspec'] - for colspec, col_width in zip(colspecs, self['widths']): - colspec['colwidth'] = col_width + if isinstance(self["widths"], list): + colspecs = [child for child in tgroup.children if child.tagname == "colspec"] + for colspec, col_width in zip(colspecs, self["widths"]): + colspec["colwidth"] = col_width self.tbody = nodes.tbody() tgroup += self.tbody @@ -145,36 +148,42 @@ def initialize_labels(self, labels, docname): self.chart_labels = {} combined_labels = {} for label in labels: - attr_values = label.split('+') + attr_values = label.split("+") for attr_val in attr_values: if attr_val in self.chart_labels: - report_warning("Attribute value '%s' should be unique in chart option." % attr_val, docname) + report_warning( + "Attribute value '%s' should be unique in chart option." % attr_val, + docname, + ) self.chart_labels[attr_val] = 0 if len(attr_values) > 1: combined_labels[label] = attr_values return combined_labels - def fill_table_and_count_attributes(self, defects, *args): + def fill_table_and_count_attributes(self, defects, valid_columns, *args): """ Fills the table body of the col option is in use, and counts the amount of each attribute value of the chart option is in use. Args: - defects (list): List of defect objects (mergedDefectDataObj). + rows (list[list]): Data rows, each with a defect + valid_columns (dict): All valid/available columns. The name of the column as key and column key as value. """ for defect in defects: - if self['col']: - self.tbody += self.get_filled_row(defect, self['col'], *args) + simplified_defect = {item["key"]: item["value"] for item in defect} + if self["col"]: + self.tbody += self.get_filled_row(simplified_defect, self["col"], valid_columns, *args) - if isinstance(self['chart'], list): - self.increase_attribute_value_count(defect) + if isinstance(self["chart"], list): + self.increase_attribute_value_count(simplified_defect, valid_columns) - def get_filled_row(self, defect, columns, *args): - """ Goes through each column and decides if it is there or prints empty cell. + def get_filled_row(self, defect, columns, valid_columns, *args): + """Goes through each column and decides if it is there or prints empty cell. Args: - defect (suds.sudsobject.mergedDefectDataObj): Defect object from suds. + defect (dict): The defect where the keys are column keys and the values are the corresponding defect values columns (list): List of column names (str). + valid_columns (dict): All valid/available columns. The name of the column as key and column key as value. Returns: (nodes.row) Filled row node. @@ -182,30 +191,39 @@ def get_filled_row(self, defect, columns, *args): row = nodes.row() for item_col in columns: item_col = item_col.upper() - if item_col == 'CID': + if item_col == "CID": # CID is default and even if it is in disregard - row += self.create_cell(str(defect['cid']), - url=self.coverity_service.get_defect_url(self.stream, str(defect['cid']))) - elif item_col == 'LOCATION': - info = self.coverity_service.get_defect(str(defect['cid']), - self.stream) - linenum = info[-1]['defectInstances'][-1]['events'][-1]['lineNumber'] - row += self.create_cell("{}#L{}".format(defect['filePathname'], linenum)) + row += self.create_cell( + str(defect["cid"]), url=self.coverity_service.defect_url(self.stream, str(defect["cid"])) + ) + elif item_col == "LOCATION": + linenum = defect["lineNumber"] + row += self.create_cell("{}#L{}".format(defect["displayFile"], linenum)) elif item_col in self.column_map: - row += self.create_cell(defect[self.column_map[item_col]]) - elif item_col in ('COMMENT', 'REFERENCE'): - row += nodes.entry('', self.create_paragraph_with_links(defect, - self.defect_states_map[item_col], - *args)) + row += self.cov_attribute_value_to_col(defect, self.column_map[item_col]) + elif item_col == "COMMENT": + row += nodes.entry( + "", + self.create_paragraph_with_links(defect, "lastTriageComment", *args), + ) + elif item_col == "REFERENCE": + row += nodes.entry( + "", + self.create_paragraph_with_links(defect, "externalReference", *args), + ) elif item_col in self.defect_states_map: row += self.cov_attribute_value_to_col(defect, self.defect_states_map[item_col]) else: # generic check which, if it is missing, prints empty cell anyway - row += self.cov_attribute_value_to_col(defect, item_col) + if item_col.lower() in valid_columns: + row += self.cov_attribute_value_to_col(defect, valid_columns[item_col.lower()]) + break + else: + row += self.create_cell("") return row def _prepare_labels_and_values(self, combined_labels, total_count): - """ Prepares the labels and values to be used to build the pie chart. + """Prepares the labels and values to be used to build the pie chart. Args: combined_labels (dict): Dictionary with the label_set arguments as keys and a list of associated attribute @@ -219,13 +237,14 @@ def _prepare_labels_and_values(self, combined_labels, total_count): self.chart_labels[new_label] = count # add combined count under new_label # only keep those labels that comply with the min_slice_size requirement - self.chart_labels = {label: count for label, count in self.chart_labels.items() - if count >= self['min_slice_size']} + self.chart_labels = { + label: count for label, count in self.chart_labels.items() if count >= self["min_slice_size"] + } total_labeled = sum(list(self.chart_labels.values())) other_count = total_count - total_labeled if other_count: - self.chart_labels['Other'] = other_count + self.chart_labels["Other"] = other_count def build_pie_chart(self, env): """ @@ -242,41 +261,45 @@ def build_pie_chart(self, env): fig, axes = plt.subplots() fig.set_size_inches(7, 4) _, texts, autotexts = axes.pie(sizes, labels=labels, autopct=pct_wrapper(sizes), startangle=90) - axes.axis('equal') - Path(env.app.srcdir, '_images').mkdir(mode=0o777, parents=True, exist_ok=True) + axes.axis("equal") + Path(env.app.srcdir, "_images").mkdir(mode=0o777, parents=True, exist_ok=True) hash_string = str(texts) + str(autotexts) hash_value = sha256(hash_string.encode()).hexdigest() # create hash value based on chart parameters - rel_file_path = path.join('_images', 'piechart-{}.png'.format(hash_value)) + rel_file_path = path.join("_images", "piechart-{}.png".format(hash_value)) if rel_file_path not in env.images: - fig.savefig(path.join(env.app.srcdir, rel_file_path), format='png') + fig.savefig(path.join(env.app.srcdir, rel_file_path), format="png") # store file name in build env - env.images[rel_file_path] = ['_images', path.split(rel_file_path)[-1]] + env.images[rel_file_path] = ["_images", path.split(rel_file_path)[-1]] image_node = nodes.image() - image_node['uri'] = rel_file_path - image_node['candidates'] = '*' # look at uri value for source path, relative to the srcdir folder + image_node["uri"] = rel_file_path + image_node["candidates"] = "*" # look at uri value for source path, relative to the srcdir folder return image_node - def increase_attribute_value_count(self, defect): - """ Increases the counter for a chart attribute value belonging to the defect. + def increase_attribute_value_count(self, defect, valid_columns): + """Increases the counter for a chart attribute value belonging to the defect. Args: - defect (suds.sudsobject.mergedDefectDataObj): Defect object from suds. + defect (dict): The defect. + valid_columns (dict): All valid/available columns. The name of the column as key and column key as value. """ - if self['chart_attribute'].upper() in self.column_map: - attribute_value = str(defect[self.column_map[self['chart_attribute'].upper()]]) + if self["chart_attribute"].upper() in self.column_map: + attribute_value = str(defect[self.column_map[self["chart_attribute"].upper()]]) else: - col = self.cov_attribute_value_to_col(defect, self['chart_attribute']) + if self["chart_attribute"].lower() in valid_columns: + col = self.cov_attribute_value_to_col(defect, valid_columns[self["chart_attribute"].lower()]) + else: + col = self.create_cell("") attribute_value = str(col.children[0].children[0]) # get text in paragraph of column if attribute_value in self.chart_labels: self.chart_labels[attribute_value] += 1 - elif not self['chart']: # remove those that don't comply with min_slice_length + elif not self["chart"]: # remove those that don't comply with min_slice_length self.chart_labels[attribute_value] = 1 class CoverityDefectListDirective(Directive): - """ Directive to generate a list of defects. + """Directive to generate a list of defects. Syntax:: @@ -293,24 +316,25 @@ class CoverityDefectListDirective(Directive): :cwe: filter for only these CWE rating :cid: filter only these cid """ + # Optional argument: title (whitespace allowed) optional_arguments = 1 final_argument_whitespace = True # Options - option_spec = {'class': directives.class_option, - 'col': directives.unchanged, - 'widths': directives.value_or(('auto', 'grid'), - directives.positive_int_list), - 'chart': directives.unchanged, - 'checker': directives.unchanged, - 'impact': directives.unchanged, - 'kind': directives.unchanged, - 'classification': directives.unchanged, - 'action': directives.unchanged, - 'component': directives.unchanged, - 'cwe': directives.unchanged, - 'cid': directives.unchanged, - } + option_spec = { + "class": directives.class_option, + "col": directives.unchanged, + "widths": directives.value_or(("auto", "grid"), directives.positive_int_list), + "chart": directives.unchanged, + "checker": directives.unchanged, + "impact": directives.unchanged, + "kind": directives.unchanged, + "classification": directives.unchanged, + "action": directives.unchanged, + "component": directives.unchanged, + "cwe": directives.unchanged, + "cid": directives.unchanged, + } # Content disallowed has_content = False @@ -318,49 +342,54 @@ def run(self): """ Processes the contents of the directive """ + env = self.state.document.settings.env + item_list_node = CoverityDefect() + item_list_node["document"] = env.docname + item_list_node["line"] = self.lineno # Process title (optional argument) - item_list_node['title'] = self.arguments[0] if self.arguments else 'Coverity report' + item_list_node["title"] = self.arguments[0] if self.arguments else "Coverity report" # Process ``col`` option - if 'col' in self.options: - item_list_node['col'] = self.options['col'].split(',') - elif 'chart' not in self.options: - item_list_node['col'] = 'CID,Classification,Action,Comment'.split(',') # use default colums + if "col" in self.options: + item_list_node["col"] = self.options["col"].split(",") + elif "chart" not in self.options: + item_list_node["col"] = "CID,Classification,Action,Comment".split(",") # use default colums else: - item_list_node['col'] = [] # don't display a table if the ``chart`` option is present without ``col`` + item_list_node["col"] = [] # don't display a table if the ``chart`` option is present without ``col`` # Process ``widths`` option - item_list_node['widths'] = self.options['widths'] if 'widths' in self.options else '' + item_list_node["widths"] = self.options["widths"] if "widths" in self.options else "" # Process ``chart`` option - if 'chart' in self.options: + if "chart" in self.options: self._process_chart_option(item_list_node) else: - item_list_node['chart'] = '' + item_list_node["chart"] = "" # Process the optional filters - item_list_node['filters'] = {k: (self.options[k] if k in self.options else v) - for (k, v) in item_list_node.filters.items()} + item_list_node["filters"] = { + k: (self.options[k] if k in self.options else v) for (k, v) in item_list_node.filters.items() + } return [item_list_node] def _process_chart_option(self, node): - """ Processes the `chart` option. + """Processes the `chart` option. Args: node (CoverityDefect): CoverityDefect object used to store this directive's options and their parameters. """ - if ':' in self.options['chart']: - node['chart_attribute'] = self.options['chart'].split(':')[0].capitalize() + if ":" in self.options["chart"]: + node["chart_attribute"] = self.options["chart"].split(":")[0].capitalize() else: - node['chart_attribute'] = 'Classification' + node["chart_attribute"] = "Classification" - parameters = self.options['chart'].split(':')[-1] # str - node['chart'] = parameters.split(',') # list + parameters = self.options["chart"].split(":")[-1] # str + node["chart"] = parameters.split(",") # list # try to convert parameters to int, in case a min slice size is defined instead of filter options try: - node['min_slice_size'] = int(node['chart'][0]) - node['chart'] = [] # only when a min slice size is defined + node["min_slice_size"] = int(node["chart"][0]) + node["chart"] = [] # only when a min slice size is defined except ValueError: - node['min_slice_size'] = 1 + node["min_slice_size"] = 1 diff --git a/mlx/coverity_item_element.py b/mlx/coverity_item_element.py index cc6af276..65b6137a 100644 --- a/mlx/coverity_item_element.py +++ b/mlx/coverity_item_element.py @@ -1,4 +1,5 @@ -""" Module for the Coverity node base class. """ +"""Module for the Coverity node base class.""" + from re import findall from docutils import nodes @@ -9,11 +10,11 @@ class ItemElement(nodes.General, nodes.Element): - """ Base class for Coverity nodes. """ + """Base class for Coverity nodes.""" @staticmethod def create_ref_node(contents, url): - """ Creates reference node inside a paragraph. + """Creates reference node inside a paragraph. Args: contents (str): Text to be displayed. @@ -24,17 +25,17 @@ def create_ref_node(contents, url): """ p_node = nodes.paragraph() itemlink = nodes.reference() - itemlink['refuri'] = url + itemlink["refuri"] = url itemlink.append(nodes.Text(contents)) targetid = nodes.make_id(contents) - target = nodes.target('', '', ids=[targetid]) + target = nodes.target("", "", ids=[targetid]) p_node += target p_node += itemlink return p_node @staticmethod def create_top_node(title): - """ Creates a container node containing an admonition with the given title inside. + """Creates a container node containing an admonition with the given title inside. Args: title (str): Title text to be displayed. @@ -67,10 +68,10 @@ def create_cell(self, contents, url=None): else: contents = nodes.paragraph(text=contents) - return nodes.entry('', contents) + return nodes.entry("", contents) def create_row(self, cells): - """ Creates a table row node containing the given strings inside entry nodes. + """Creates a table row node containing the given strings inside entry nodes. Args: cells (list): List of strings to each be divided into cells. @@ -78,39 +79,41 @@ def create_row(self, cells): Returns: (nodes.row) Row node containing all given entry nodes. """ - return nodes.row('', *[self.create_cell(c) for c in cells]) + return nodes.row("", *[self.create_cell(c) for c in cells]) def cov_attribute_value_to_col(self, defect, name): """ - Search defects array and return value for name + Create cell with the value in the defect for the given name. + + Args: + defect (dict): The defect where the keys are column keys and the values are the corresponding defect values + name (str): The key name of the attribute + + Returns: + (nodes.entry) Entry node containing a paragraph with the given contents """ - col = self.create_cell(" ") - - for attribute in defect['defectStateAttributeValues']: - if attribute['attributeDefinitionId'][0] == name: - try: - col = self.create_cell(attribute['attributeValueId'][0]) - except (AttributeError, IndexError): - col = self.create_cell(" ") + if name in defect: + col = self.create_cell(defect[name]) + else: + col = self.create_cell(" ") return col - def create_paragraph_with_links(self, defect, col_name, *args): + def create_paragraph_with_links(self, defect, col_key, *args): """ Create a paragraph with the provided text. Hyperlinks are made interactive, and traceability item IDs get linked to their definition. Args: - defect (suds.sudsobject.mergedDefectDataObj): Defect object from suds. - col_name (str): Column name according to suds. + defect (dict): The defect where the keys are column keys and the values are the corresponding defect values + col_key (str): Column key according to Coverity Connect. Returns: (nodes.paragraph) Paragraph node filled with column contents for the given defect. Item IDs and hyperlinks have been made interactive. """ - text = str(self.cov_attribute_value_to_col(defect, col_name).children[0].children[0]) - cid = str(defect['cid']) + remaining_text = str(defect[col_key]) + cid = str(defect["cid"]) contents = nodes.paragraph() - remaining_text = text self.link_to_urls(contents, remaining_text, cid, *args) return contents @@ -118,6 +121,10 @@ def create_paragraph_with_links(self, defect, col_name, *args): def link_to_urls(contents, text, *args): """ Makes URLs interactive and passes other text to link_to_item_ids, which treats the item IDs. + + Args: + contents (nodes.paragraph): The paragraph + text (str): The text to parse """ remaining_text = text extractor = URLExtract() @@ -128,11 +135,11 @@ def link_to_urls(contents, text, *args): link_to_item_ids(contents, text_before, *args) ref_node = nodes.reference() - ref_node['refuri'] = url + ref_node["refuri"] = url ref_node.append(nodes.Text(url)) contents += ref_node - remaining_text = remaining_text.replace(text_before + url, '', 1) + remaining_text = remaining_text.replace(text_before + url, "", 1) if remaining_text: link_to_item_ids(contents, text, *args) @@ -141,6 +148,13 @@ def link_to_urls(contents, text, *args): def link_to_item_ids(contents, text, cid, app, docname): """ Makes a link of item IDs when they are found in a traceability collection and adds all other text to the paragraph. + + Args: + contents (nodes.paragraph): The paragraph + text (str): The text to parse + cid (str): CID of the item + app (sphinx.application.Sphinx): Sphinx' application object. + docname (str): Relative path to the document in which the error occured, without extension. """ if not app.config.TRACEABILITY_ITEM_ID_REGEX: return # empty string as regex to disable traceability link generation @@ -150,7 +164,7 @@ def link_to_item_ids(contents, text, cid, app, docname): text_before = remaining_text.split(item)[0] if text_before: contents.append(nodes.Text(text_before)) - remaining_text = remaining_text.replace(text_before + item, '', 1) + remaining_text = remaining_text.replace(text_before + item, "", 1) if item in app.config.TRACEABILITY_ITEM_RELINK: item = app.config.TRACEABILITY_ITEM_RELINK[item] @@ -163,23 +177,33 @@ def link_to_item_ids(contents, text, cid, app, docname): contents.append(nodes.Text(remaining_text)) # no URL or item ID in this text -def make_internal_item_ref(app, fromdocname, item, cid): +def make_internal_item_ref(app, fromdocname, item_id, cid): """ Creates and returns a reference node for an item or returns None when the item cannot be found in the traceability collection. A warning is raised when a traceability collection exists, but an item ID cannot be found in it. + + Args: + app (sphinx.application.Sphinx): Sphinx' application object. + fromdocname (str): Relative path to the document in which the error occured, without extension. + item_id (str): Item ID + cid (str): CID of the item + + Returns: + (nodes.reference/None): The reference node for the given item. + None if the given item cannot be found in the traceablity collection. """ env = app.builder.env - if not hasattr(env, 'traceability_collection'): + if not hasattr(env, "traceability_collection"): return None - item_info = env.traceability_collection.get_item(item) + item_info = env.traceability_collection.get_item(item_id) if not item_info: - report_warning("CID %s: Could not find item ID '%s' in traceability collection." % (cid, item), fromdocname) + report_warning("CID %s: Could not find item ID '%s' in traceability collection." % (cid, item_id), fromdocname) return None - ref_node = nodes.reference('', '') - ref_node['refdocname'] = item_info.docname + ref_node = nodes.reference("", "") + ref_node["refdocname"] = item_info.docname try: - ref_node['refuri'] = app.builder.get_relative_uri(fromdocname, item_info.docname) + '#' + item + ref_node["refuri"] = app.builder.get_relative_uri(fromdocname, item_info.docname) + "#" + item_id except NoUri: return None - ref_node.append(nodes.Text(item)) + ref_node.append(nodes.Text(item_id)) return ref_node diff --git a/mlx/coverity_logging.py b/mlx/coverity_logging.py index 2d05469e..31e67e9d 100644 --- a/mlx/coverity_logging.py +++ b/mlx/coverity_logging.py @@ -1,28 +1,31 @@ -""" Module to provide functions that accommodate logging. """ +"""Module to provide functions that accommodate logging.""" + from sphinx.util.logging import getLogger +from logging import WARNING + +LOGGER = getLogger(__name__) +LOGGER.setLevel(WARNING) def report_warning(msg, docname, lineno=None): - '''Convenience function for logging a warning + """Convenience function for logging a warning Args: msg (str): Message of the warning docname (str): Name of the document in which the error occurred lineno (str): Line number in the document on which the error occurred - ''' - logger = getLogger(__name__) + """ if lineno is not None: - logger.warning(msg, location=(docname, lineno)) + LOGGER.warning(msg, location=(docname, lineno)) else: - logger.warning(msg, location=docname) + LOGGER.warning(msg, location=docname) def report_info(msg, nonl=False): - '''Convenience function for information printing + """Convenience function for information printing Args: msg (str): Message of the warning nonl (bool): True when no new line at end - ''' - logger = getLogger(__name__) - logger.info(msg, nonl=nonl) + """ + LOGGER.info(msg, nonl=nonl) diff --git a/mlx/coverity_services.py b/mlx/coverity_services.py index 706b1218..5e2ef0df 100644 --- a/mlx/coverity_services.py +++ b/mlx/coverity_services.py @@ -1,614 +1,377 @@ #!/usr/bin/python -'''Services and other utilities for Coverity scripting''' +"""Services and other utilities for Coverity scripting""" -# General import csv -import logging import re -from urllib.error import URLError +from collections import namedtuple +from urllib.parse import urlencode +import requests +from sphinx.util.logging import getLogger -# For Coverity - SOAP -from suds.client import Client -from suds.wsse import Security, UsernameToken - -# -- Default values -- and global settings - -DEFAULT_WS_VERSION = 'v9' +from mlx.coverity_logging import report_info # Coverity built in Impact statuses -IMPACT_LIST = {'High', 'Medium', 'Low'} +IMPACT_LIST = ["High", "Medium", "Low"] -KIND_LIST = {'QUALITY', 'SECURITY', 'TEST'} +KIND_LIST = ["QUALITY", "SECURITY", "TEST"] # Coverity built in Classifications -CLASSIFICATION_LIST = {'Unclassified', 'Pending', 'False Positive', 'Intentional', 'Bug', 'Untested', 'No Test Needed'} +CLASSIFICATION_LIST = [ + "Unclassified", + "Pending", + "False Positive", + "Intentional", + "Bug", + "Untested", + "No Test Needed", +] # Coverity built in Actions -ACTION_LIST = {'Undecided', 'Fix Required', 'Fix Submitted', 'Modeling Required', 'Ignore', 'On hold', - 'For Interest Only'} - -ISSUE_KIND_2_LABEL = {'QUALITY': 'Quality', 'SECURITY': 'Security', 'Various': 'Quality/Security', 'TEST': 'Testing'} - - -# names of Coverity Triage/Attribute fields -EXT_REFERENCE_ATTR_NAME = "Ext. Reference" -DEFECT_STATUS_ATTR_NAME = "DefectStatus" -CLASSIFICATION_ATTR_NAME = "Classification" -ACTION_ATTR_NAME = "Action" -COMMENT_ATTR_NAME = "Comment" - - -def parse_two_part_term(term, delim=','): - '''Parse a term assuming [ [part1],[part2] ]''' - valid = False - part1 = "" - part2 = "" - if term.find(delim) != -1: - valid = True - field1 = term.split(delim, 1)[0] - if bool(field1): - part1 = field1 - field2 = term.rsplit(delim, 1)[-1] - if bool(field2): - part2 = field2 - return valid, part1, part2 - - -def compare_strings(str_a, str_b): - '''Compare strings for equivalence - - some leniency allowed such as spaces and casing - ''' - if re.match(str_b, str_a, flags=re.IGNORECASE): - return True - # ignore embedded spaces and some odd punctuation characters ("todo" = "To-Do") - str_a2 = re.sub(r'[.:\-_ ]', '', str_a) - str_b2 = re.sub(r'[:\-_ ]', '', str_b) # don't remove dot (part of regex?) - if re.match(str_b2, str_a2, flags=re.IGNORECASE): - return True - return False - - -class Service: - ''' - Basic endpoint Service - ''' - - def __init__(self, transport, hostname, port, ws_version=DEFAULT_WS_VERSION): - self.set_transport(transport) - self.set_hostname(hostname) - self.set_port(port) - self.set_ws_version(ws_version) - self.client = None - - def set_transport(self, transport): - '''Set transport protocol''' - self.transport = transport - - def get_transport(self): - '''Get transport protocol''' - return self.transport - - def set_hostname(self, hostname): - '''Set hostname for service''' - self.hostname = hostname - - def get_hostname(self): - '''Get hostname for service''' - return self.hostname - - def set_port(self, port): - '''Set port for service''' - self.port = port - - def get_port(self): - '''Get port for service''' - return self.port - - def set_ws_version(self, ws_version): - '''Set WS version for service''' - self.ws_version = ws_version - - def get_ws_version(self): - '''Get WS version for service''' - return self.ws_version - - def get_service_url(self, path='', add_port=True): - '''Get Service url with given path''' - url = self.transport + '://' + self.hostname - if self.port and add_port: - url += ':' + self.port - if path: - url += path - return url - - def get_ws_url(self, service): - '''Get WS url with given service''' - return self.get_service_url('/ws/' + self.ws_version + '/' + service + '?wsdl') +ACTION_LIST = [ + "Undecided", + "Fix Required", + "Fix Submitted", + "Modeling Required", + "Ignore", + "On hold", + "For Interest Only", +] + + +class CoverityDefectService: + """ + Convenience class for retrieving data from the Coverity REST API + """ + + _version = "v2" + + def __init__(self, hostname): + hostname = hostname.strip('/') + self._base_url = f"https://{hostname}" + self._api_endpoint = f"https://{hostname}/api/{self.version}" + self._checkers = [] + self._columns = {} + self.logger = getLogger("mlx.coverity_logging") + + @property + def base_url(self): + """str: The base URL of the service.""" + return self._base_url + + @property + def api_endpoint(self): + """str: The API endpoint of the service.""" + return self._api_endpoint + + @property + def version(self): + """str: The API version""" + return self._version + + @property + def checkers(self): + """list[str]: All valid checkers available""" + return self._checkers + + @property + def columns(self): + """list[dict]: A list of dictionaries where the keys of each dictionary: + - columnKey: The key of the column + - name: The name of the column + """ + return self._columns - def login(self, username, password): - '''Login to Coverity using given username and password''' - security = Security() - token = UsernameToken(username, password) - security.tokens.append(token) - self.client.set_options(wsse=security) - - def validate_presence(self, url, service_name): - '''Initializes the client attribute while validating the presence of the service''' - try: - self.client = Client(url) - logging.info("Validated presence of %s [%s]", service_name, url) - except URLError: - self.client = None - logging.critical("No such %s [%s]", service_name, url) - raise - - -class CoverityConfigurationService(Service): - ''' - Coverity Configuration Service (WebServices) - ''' - - def __init__(self, transport, hostname, port, ws_version=DEFAULT_WS_VERSION): - super(CoverityConfigurationService, self).__init__(transport, hostname, port, ws_version) - self.checkers = None - url = self.get_ws_url('configurationservice') - logging.getLogger('suds.client').setLevel(logging.CRITICAL) - self.validate_presence(url, 'Coverity Configuration Service') + def column_keys(self, column_names): + """The column keys corresponding to the given column names in the `col` option - def login(self, username, password): - '''Login to Coverity Configuration service using given username and password''' - super(CoverityConfigurationService, self).login(username, password) - version = self.get_version() - if version is None: - raise RuntimeError("Authentication to [%s] FAILED for [%s] account - check password" - % (self.get_service_url(), username)) - else: - logging.info("Authentication to [%s] using [%s] account was OK - version [%s]", - self.get_service_url(), username, version.externalVersion) - - def get_version(self): - '''Get the version of the service, can be used as a means to validate access permissions''' - try: - return self.client.service.getVersion() - except URLError: - return None - - @staticmethod - def get_project_name(stream): - '''Get the project name from the stream object''' - return stream.primaryProjectId.name - - @staticmethod - def get_triage_store(stream): - '''Get the name of the triaging store from the stream object''' - return stream.triageStoreId.name - - def get_stream(self, stream_name): - '''Get the stream object from the stream name''' - filter_spec = self.client.factory.create('streamFilterSpecDataObj') - - # use stream name as an initial glob pattern - filter_spec.namePattern = stream_name - - # get all the streams that match - streams = self.client.service.getStreams(filter_spec) - - # find the one with a matching name - for stream in streams: - if compare_strings(stream.id.name, stream_name): - return stream - return None - - # get a list of the snapshots in a named stream - def get_snapshot_for_stream(self, stream_name): - '''Get snapshot object for given stream name''' - stream_id = self.client.factory.create('streamIdDataObj') - stream_id.name = stream_name - # optional filter specification - filter_spec = self.client.factory.create('snapshotFilterSpecDataObj') - # return a list of snapshotDataObj - return self.client.service.getSnapshotsForStream(stream_id, filter_spec) - - @staticmethod - def get_snapshot_id(snapshots, idx=1): - '''Get the nth snapshot (base 1) - minus numbers to count from the end backwards (-1 = last)''' - if bool(idx): - num_snapshots = len(snapshots) - if idx < 0: - required = num_snapshots + idx + 1 + Args: + column_names (list[str]): The column names given by the `col` option + """ + special_columns = { + "location": {"lineNumber", "displayFile"}, + "comment": {"lastTriageComment"}, + "reference": {"externalReference"} + } + column_keys = {"cid"} + + for column_name in column_names: + column_name_lower = column_name.lower() + if column_name_lower in special_columns: + column_keys.update(special_columns[column_name_lower]) + elif column_name_lower in self.columns: + column_keys.add(self.columns[column_name_lower]) else: - required = idx - - if abs(required) > 0 and abs(required) <= num_snapshots: - # base zero - return snapshots[required - 1].id - return 0 - - def get_snapshot_detail(self, snapshot_id): - '''Get detailed information about a single snapshot''' - snapshot = self.client.factory.create('snapshotIdDataObj') - snapshot.id = snapshot_id - # return a snapshotInfoDataObj - return self.client.service.getSnapshotInformation(snapshot) - - def get_checkers(self): - '''Get a list of checkers from the service''' - if not self.checkers: - self.checkers = self.client.service.getCheckerNames() - return self.checkers + self.logger.warning(f"Invalid column name {column_name!r}") + return column_keys - @staticmethod - def add_filter_rqt(name, req_csv, valid_list, filter_list, allow_regex=False): - '''Lookup the list of given filter possibility, add to filter spec and return a validated list''' - logging.info('Validate required %s [%s]', name, req_csv) - validated = "" - delim = "" - for field in req_csv.split(','): - if not valid_list or field in valid_list: - logging.info('Classification [%s] is valid', field) - filter_list.append(field) - validated += delim + field - delim = "," - elif allow_regex: - pattern = re.compile(field) - for element in valid_list: - if pattern.search(element) and element not in filter_list: - filter_list.append(element) - validated += delim + element - delim = "," - else: - logging.error('Invalid %s filter: %s', name, field) - return validated - - -class CoverityDefectService(Service): - ''' - Coverity Defect Service (WebServices) - ''' - - def __init__(self, config_service): - '''Create a Defect Service, bound to the given Configuration Service''' - super(CoverityDefectService, self).__init__(config_service.get_transport(), - config_service.get_hostname(), - config_service.get_port(), - config_service.get_ws_version()) - self.config_service = config_service - self.filters = "" - # logging.getLogger('suds.client').setLevel(logging.DEBUG) - url = self.get_ws_url('defectservice') - self.validate_presence(url, 'Coverity Defect Service') - - def get_defects(self, project, stream, filters, custom=None): - """ Gets a list of defects for given stream, with some query criteria. + def login(self, username, password): + """Authenticate a session using the given username and password . Args: - project (str): Name of the project to query - stream (str): Name of the stream to query - filters (dict): Dictionary with attribute names as keys and CSV lists of attribute values to query as values - custom (str): A custom query - - Returns: - (suds.sudsobject.mergedDefectsPageDataObj) Suds mergedDefectsPageDataObj object containing filtered defects + username (str): Username to log in + password (str): Password to log in """ - logging.info('Querying Coverity for defects in project [%s] stream [%s] ...', project, stream) + self.session = requests.Session() + self.session.auth = (username, password) - # define the project - project_id = self.client.factory.create('projectIdDataObj') - project_id.name = project + def validate_stream(self, stream): + """Validate stream by retrieving the specified stream. + When the request fails, the stream does not exist or the user does not have acces to it. - # and the stream - stream_id = self.client.factory.create('streamIdDataObj') - stream_id.name = stream + Args: + stream (str): The stream name + """ + url = f"{self.api_endpoint}/streams/{stream}" + self._request(url) - # create filter spec - filter_spec = self.client.factory.create('snapshotScopeDefectFilterSpecDataObj') + def retrieve_issues(self, filters): + """Retrieve issues from the server (Coverity Connect). - # only for this stream - filter_spec.streamIncludeNameList.append(stream_id) + Args: + filters (dict): The filters for the query - # apply any filter on checker names - if filters['checker']: - self.config_service.get_checkers() - self.handle_attribute_filter(filters['checker'], - 'Checker', - self.config_service.checkers, - filter_spec.checkerList, - allow_regex=True) + Returns: + dict: The response + """ + params = { + "includeColumnLabels": "true", + "offset": 0, + "queryType": "bySnapshot", + "rowCount": -1, + "sortOrder": "asc", + } + url = f"{self.api_endpoint}/issues/search?{urlencode(params)}" + return self._request(url, filters) + + def retrieve_column_keys(self): + """Retrieves the column keys and associated display names. - # apply any filter on impact status - if filters['impact']: - self.handle_attribute_filter(filters['impact'], 'Impact', IMPACT_LIST, filter_spec.impactNameList) + Returns: + dict: All available column names with respective column keys. + """ + if not self._columns: + params = { + "queryType": "bySnapshot", + "retrieveGroupByColumns": "false" + } + url = f"{self.api_endpoint}/issues/columns?{urlencode(params)}" + columns = self._request(url) + if columns: + self._columns = requests.structures.CaseInsensitiveDict( + ((column["name"], column["columnKey"]) for column in columns) + ) + return self.columns + + def retrieve_checkers(self): + """Retrieve the list of checkers from the server. - # apply any filter on issue kind - if filters['kind']: - self.handle_attribute_filter(filters['kind'], 'Kind', KIND_LIST, filter_spec.issueKindList) + Returns: + list[str]: The list of valid checkers + """ + if not self.checkers: + url = f"{self.api_endpoint}/checkerAttributes/checker" + checkers = self._request(url) + if checkers and "checkerAttributedata" in checkers: + self._checkers = [checker["key"] for checker in checkers["checkerAttributedata"]] + return self.checkers - # apply any filter on classification - if filters['classification']: - self.handle_attribute_filter(filters['classification'], - 'Classification', - CLASSIFICATION_LIST, - filter_spec.classificationNameList) + def _request(self, url, data=None): + """Perform a POST or GET request to the specified url. + Uses a GET request when data is `None`, uses a POST request otherwise - # apply any filter on action - if filters['action']: - self.handle_attribute_filter(filters['action'], 'Action', ACTION_LIST, filter_spec.actionNameList) + Args: + url (str): The URL for the request + data (dict): Optional data to send - # apply any filter on Components - if filters['component']: - self.handle_component_filter(filters['component'], filter_spec) + Returns: + dict: the content of server's response - # apply any filter on CWE values - if filters['cwe']: - self.handle_attribute_filter(filters['cwe'], 'CWE', None, filter_spec.cweList) + Raises: + requests.HTTPError + """ + if data: + response = self.session.post(url, json=data) + else: + response = self.session.get(url) + if response.ok: + return response.json() + try: + err_msg = response.json()["message"] + except (requests.exceptions.JSONDecodeError, KeyError): + err_msg = response.content.decode() + self.logger.warning(err_msg) + return response.raise_for_status() - # apply any filter on CID values - if filters['cid']: - self.handle_attribute_filter(filters['cid'], 'CID', None, filter_spec.cidList) + def assemble_query_filter(self, column_name, filter_values, matcher_type): + """Assemble a filter for a specific column - # if a special custom attribute value requirement - if custom: - self.handle_custom_filter_attribute(custom, filter_spec) + Args: + column_name (str): The column name in lowercase + filter_values (list[str]): The list of valid values to filter on + matcher_type (str): The type of the matcher (nameMatcher, idMatcher or keyMatcher) - # create page spec - page_spec = self.client.factory.create('pageSpecDataObj') - page_spec.pageSize = 9999 - page_spec.sortAscending = True - page_spec.startIndex = 0 + Returns: + dict: New filter for API queries + """ + matchers = [] + for filter_ in filter_values: + matcher = {"type": matcher_type} + if matcher_type == "nameMatcher": + matcher["name"] = filter_ + matcher["class"] = "Component" + assert column_name == "component" + elif matcher_type == "idMatcher": + matcher["id"] = filter_ + else: + matcher["key"] = filter_ + matchers.append(matcher) - # create snapshot scope - snapshot_scope = self.client.factory.create('snapshotScopeSpecDataObj') + if column_name not in self.columns: + self.logger.warning(f"Invalid column name {column_name!r}; Retrieve column keys first.") - snapshot_scope.showOutdatedStreams = False - snapshot_scope.compareOutdatedStreams = False + return { + "columnKey": self.columns[column_name], + "matchMode": "oneOrMoreMatch", + "matchers": matchers + } - snapshot_scope.showSelector = 'last()' - snapshot_scope.compareSelector = 'last()' + def get_defects(self, stream, filters, column_names): + """Gets a list of defects for given stream, filters and column names. + If a column name does not match the name of the `columns` property, the column can not be obtained because + it need the correct corresponding column key. + Column key `cid` is always obtained to use later in other functions. - logging.info('Running Coverity query...') - return self.client.service.getMergedDefectsForSnapshotScope(project_id, filter_spec, - page_spec, snapshot_scope) + Args: + stream (str): Name of the stream to query + filters (dict): Dictionary with attribute names as keys and CSV lists of attribute values to query as values + column_names (list[str]): The column names - def handle_attribute_filter(self, attribute_values, name, *args, **kwargs): - """ Applies any filter on an attribute's values. + Returns: + dict: The content of the request. This has a structure like: + { + "offset": 0, + "totalRows": 2720, + "columns": [list of column keys] + "rows": list of [list of dictionaries {"key": , "value": }] + } + """ + report_info(f"Querying Coverity for defects in stream [{stream}] ...",) + query_filters = [ + { + "columnKey": "streams", + "matchMode": "oneOrMoreMatch", + "matchers": [ + { + "class": "Stream", + "name": stream, + "type": "nameMatcher" + } + ] + } + ] + + Filter = namedtuple("Filter", "name matcher_type values allow_regex", defaults=[[], False]) + filter_options = { + "checker": Filter("Checker", "keyMatcher", self.checkers, True), + "impact": Filter("Impact", "keyMatcher", IMPACT_LIST), + "kind": Filter("Issue Kind", "keyMatcher", KIND_LIST), + "classification": Filter("Classification", "keyMatcher", CLASSIFICATION_LIST), + "action": Filter("Action", "keyMatcher", ACTION_LIST), + "cwe": Filter("CWE", "idMatcher"), + "cid": Filter("CID", "idMatcher") + } + + for option, filter in filter_options.items(): + if (filter_option := filters[option]) and (filter_values := self.handle_attribute_filter( + filter_option, filter.name, filter.values, filter.allow_regex)): + if filter_values: + query_filters.append(self.assemble_query_filter(filter.name, filter_values, filter.matcher_type)) + + if (filter := filters["component"]) and (filter_values := self.handle_component_filter(filter)): + query_filters.append(self.assemble_query_filter("Component", filter_values, "nameMatcher")) + + data = { + "filters": query_filters, + "columns": list(self.column_keys(column_names)), + "snapshotScope": { + "show": { + "scope": "last()", + "includeOutdatedSnapshots": False + }, + "compareTo": { + "scope": "last()", + "includeOutdatedSnapshots": False + } + } + } + + report_info("Running Coverity query...") + return self.retrieve_issues(data) + + def handle_attribute_filter(self, attribute_values, name, valid_attributes, allow_regex=False): + """Process the given CSV list of attribute values by filtering out the invalid ones while logging an error. + The CSV list can allow regular expressions when `allow_regex` is set to True. Args: attribute_values (str): A CSV list of attribute values to query. name (str): String representation of the attribute. + valid_attributes (list/dict): All valid/possible attribute values. + allow_regex (bool): True to treat filter values as regular expressions, False to require exact matches + + Returns: + set[str]: The attributes values to query with """ - logging.info('Using %s filter [%s]', name, attribute_values) - validated = self.config_service.add_filter_rqt(name, attribute_values, *args, **kwargs) - logging.info('Resolves to [%s]', validated) - if validated: - self.filters += ("<%s(%s)> " % (name, validated)) + report_info(f"Using {name} filter [{attribute_values}]") + filter_values = set() + for field in attribute_values.split(","): + if not valid_attributes or field in valid_attributes: + report_info("Classification [{field}] is valid") + filter_values.add(field) + elif allow_regex: + pattern = re.compile(field) + for element in valid_attributes: + if pattern.search(element): + filter_values.add(element) + else: + self.logger.error(f"Invalid {name} filter: {field}") + return filter_values - def handle_component_filter(self, attribute_values, filter_spec): - """ Applies any filter on the component attribute's values. + def handle_component_filter(self, attribute_values): + """Applies any filter on the component attribute's values. Args: attribute_values (str): A CSV list of attribute values to query. - filter_spec (sudsobject.Factory): Object to store filter attributes. + + Returns: + list[str]: The list of attributes """ - logging.info('Using Component filter [%s]', attribute_values) + report_info(f"Using Component filter [{attribute_values}]") parser = csv.reader([attribute_values]) - + filter_values = [] for fields in parser: for _, field in enumerate(fields): field = field.strip() - component_id = self.client.factory.create('componentIdDataObj') - component_id.name = field - filter_spec.componentIdList.append(component_id) - self.filters += (" " % (attribute_values)) + filter_values.append(field) + return filter_values - def handle_custom_filter_attribute(self, custom, filter_spec): - """ Handles a custom attribute definition, and adds it to the filter spec if it's valid. + def defect_url(self, stream, cid): + """Get URL for given defect CID + https://machine1.eng.company.com/query/defects.htm?stream=StreamA&cid=1234 Args: - custom (str): A custom query. - filter_spec (sudsobject.Factory): Object to store filter attributes. + stream (str): The name of the stream + cid (int): The CID of the given defect - Raises: - ValueError: Invalid custom attribute definition. + Returns: + str: The URL to the requested defect """ - logging.info('Using attribute filter [%s]', custom) - # split the name:value[;name:value1[,value2]] - for fields in csv.reader([custom], delimiter=';'): - for i, name_value_pair in enumerate(fields): - name_value_pair = name_value_pair.strip() - valid, name, values = parse_two_part_term(name_value_pair, ':') - if valid: - logging.info("attr (%d) [%s] = any of ...", i + 1, name) - - attribute_definition_id = self.client.factory.create('attributeDefinitionIdDataObj') - attribute_definition_id.name = name - - filter_map = self.client.factory.create('attributeDefinitionValueFilterMapDataObj') - filter_map.attributeDefinitionId = attribute_definition_id - - self._append_multiple_values(values, filter_map) - - filter_spec.attributeDefinitionValueFilterMap.append(filter_map) - else: - raise ValueError('Invalid custom attribute definition [%s]' % name_value_pair) - self.filters += (" " % custom) - - def _append_multiple_values(self, values, filter_map): - '''Append multiple values if there are multiple values delimited with comma''' - for value_fields in csv.reader([values], delimiter=','): - for value in value_fields: - logging.info(" [%s]", value) - - attribute_value_id = self.client.factory.create('attributeValueIdDataObj') - attribute_value_id.name = value - - filter_map.attributeValueIds.append(attribute_value_id) - - def get_defect(self, cid, stream): - '''Get the details pertaining a specific CID - it may not have defect instance details if newly eliminated - (fixed)''' - logging.info('Fetching data for CID [%s] in stream [%s] ...', cid, stream) - - merged_defect_id = self.client.factory.create('mergedDefectIdDataObj') - merged_defect_id.cid = cid - - filter_spec = self.client.factory.create('streamDefectFilterSpecDataObj') - filter_spec.includeDefectInstances = True - filter_spec.includeHistory = True - - stream_id = self.client.factory.create('streamIdDataObj') - stream_id.name = stream - filter_spec.streamIdList.append(stream_id) - - return self.client.service.getStreamDefects(merged_defect_id, filter_spec) - - def add_attribute_name_and_value(self, defect_state_spec, attr_name, attr_value): - '''Add attribute name and value to given defect state specification''' - - # name value pair to update - attribute_definition_id = self.client.factory.create('attributeDefinitionIdDataObj') - attribute_definition_id.name = attr_name - - attribute_value_id = self.client.factory.create('attributeValueIdDataObj') - attribute_value_id.name = attr_value - - # wrap the name/value pair - defect_state_attr_value = self.client.factory.create('defectStateAttributeValueDataObj') - defect_state_attr_value.attributeDefinitionId = attribute_definition_id - defect_state_attr_value.attributeValueId = attribute_value_id - - # add to our list - defect_state_spec.defectStateAttributeValues.append(defect_state_attr_value) - - # update the external reference id to a third party - def update_ext_reference_attribute(self, cid, triage_store, ext_ref_id, ccomment=None): - '''Update external reference attribute for given CID''' - logging.info('Updating Coverity: CID [%s] in TS [%s] with Ext Ref [%s]', cid, triage_store, ext_ref_id) - - # triage store identifier - triage_store_id = self.client.factory.create('triageStoreIdDataObj') - triage_store_id.name = triage_store - - # CID to update - merged_defect_id = self.client.factory.create('mergedDefectIdDataObj') - merged_defect_id.cid = cid - - # if an ext ref id value supplied - if bool(ext_ref_id): - attr_value = ext_ref_id - comment_value = 'Automatically recorded reference to new JIRA ticket.' - else: - # set to a space - which works as a blank without the WS complaining :-) - attr_value = " " - comment_value = 'Automatically cleared former JIRA ticket reference.' - - # if a Coverity comment to tag on the end - if bool(ccomment): - comment_value += " " + ccomment - logging.info('Comment = [%s]', comment_value) - - defect_state_spec = self.client.factory.create('defectStateSpecDataObj') - - # name value pairs to add to this update - self.add_attribute_name_and_value(defect_state_spec, EXT_REFERENCE_ATTR_NAME, attr_value) - self.add_attribute_name_and_value(defect_state_spec, COMMENT_ATTR_NAME, comment_value) - - # apply the update - return self.client.service.updateTriageForCIDsInTriageStore(triage_store_id, merged_defect_id, - defect_state_spec) - - @staticmethod - def get_instance_impact(stream_defect, instance_number=1): - '''Get the current impact of the 'nth' incident of this issue (High/Medium/Low)''' - counter = instance_number - for instance in stream_defect.defectInstances: - counter -= 1 - if counter == 0: - return instance.impact.name - return "" - - @staticmethod - def get_value_for_named_attribute(stream_defect, attr_name): - '''Lookup the value of a named attribute''' - logging.info('Get value for cov attribute [%s]', attr_name) - for attr_value in stream_defect.defectStateAttributeValues: - if compare_strings(attr_value.attributeDefinitionId.name, attr_name): - logging.info('Resolves to [%s]', attr_value.attributeValueId.name) - return str(attr_value.attributeValueId.name) - logging.warning('Value for attribute [%s] not found', attr_name) - return "" - - @staticmethod - def get_event_attribute_value(defect_state, name, value=None): - '''Get specified attribute was set to given matching value''' - if bool(value): - logging.info('Searching for attribute [%s] with value [%s]', name, value) - else: - logging.info('Searching for attribute [%s]', name) - - for attr_value in defect_state.defectStateAttributeValues: - # check if we have the named attribute - if compare_strings(attr_value.attributeDefinitionId.name, name): - # if any value supplied or it matches requirement - if bool(attr_value.attributeValueId.name) and\ - (not value or compare_strings(attr_value.attributeValueId.name, value)): - logging.info('Found [%s] = [%s]', - attr_value.attributeDefinitionId.name, attr_value.attributeValueId.name) - return True, attr_value.attributeValueId.name - # break attribute name search - either no value or it doesn't match - break - logging.warning('Event for attribute [%s] not found', name) - return False, None - - def seek_nth_match(self, event_history, nth_event, attr_name, attr_value): - '''Seek for a given attribute name-value pair in the triaging history''' - num_match = 0 - for defect_state in event_history: - # look for the attribute name-value pair in this triage event - req_event_found, req_attr_value = self.get_event_attribute_value(defect_state, attr_name, attr_value) - if req_event_found: - num_match += 1 - # correct one? - if num_match == nth_event: - return True, defect_state, req_attr_value - return False, None, None - - def get_event_for_attribute_change(self, stream_defect, nth_term, attr_name, attr_value=None): - '''Get event when specified attribute was set to given matching value''' - logging.info('Searching for triage event n=[%d] where attribute [%s] is set to [%s]', - nth_term, attr_name, attr_value) - - if nth_term > 0: - found, defect_state, value = self.seek_nth_match(stream_defect.history, nth_term, attr_name, attr_value) - else: - found, defect_state, value = self.seek_nth_match(reversed(stream_defect.history), abs(int(nth_term)), - attr_name, attr_value) - - return found, defect_state, value - - def get_ext_reference_id(self, stream_defect): - '''Get external reference ID attribute value for given defect''' - return self.get_value_for_named_attribute(stream_defect, EXT_REFERENCE_ATTR_NAME) - - def get_defect_status(self, stream_defect): - '''Get defect status attribute value for given defect''' - return self.get_value_for_named_attribute(stream_defect, DEFECT_STATUS_ATTR_NAME) - - def get_classification(self, stream_defect): - '''Get classification attribute value for given defect''' - return self.get_value_for_named_attribute(stream_defect, CLASSIFICATION_ATTR_NAME) - - def get_action(self, stream_defect): - '''Get action attribute value for given defect''' - return self.get_value_for_named_attribute(stream_defect, ACTION_ATTR_NAME) - - def get_defect_url(self, stream, cid): - '''Get URL for given defect CID - http://machine1.eng.company.com/query/defects.htm?stream=StreamA&cid=1234 - ''' - return self.get_service_url('/query/defects.htm?stream=%s&cid=%s' % (stream, str(cid)), add_port=False) + params = { + 'stream': stream, + 'cid': cid + } + return f"{self.base_url}/query/defects.htm?{urlencode(params)}" -if __name__ == '__main__': +if __name__ == "__main__": print("Sorry, no main here") diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..17cc83aa --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools-scm", "setuptools"] +build-backend = "setuptools.build_meta" diff --git a/setup.py b/setup.py index 76b93458..eb7c1247 100644 --- a/setup.py +++ b/setup.py @@ -2,23 +2,28 @@ from setuptools import setup, find_packages -project_url = 'https://github.com/melexis/sphinx-coverity-extension' +project_url = "https://github.com/melexis/sphinx-coverity-extension" -requires = ['Sphinx>=2.1', 'docutils', 'setuptools_scm', 'matplotlib', 'mlx.traceability', 'suds-py3', - 'urlextract'] +requires = [ + "Sphinx>=2.1", + "docutils", + "setuptools_scm", + "matplotlib", + "mlx.traceability", + "urlextract", +] setup( - name='mlx.coverity', - setup_requires=['setuptools-scm'], + name="mlx.coverity", use_scm_version=True, url=project_url, - license='GNU General Public License v3 (GPLv3)', - author='Crt Mori', - author_email='cmo@melexis.com', - description='Sphinx coverity extension from Melexis', + license="GNU General Public License v3 (GPLv3)", + author="Crt Mori", + author_email="cmo@melexis.com", + description="Sphinx coverity extension from Melexis", long_description=open("README.rst").read(), - long_description_content_type='text/x-rst', + long_description_content_type="text/x-rst", zip_safe=False, classifiers=[ 'Development Status :: 5 - Production/Stable', @@ -38,19 +43,19 @@ 'Topic :: Documentation :: Sphinx', 'Topic :: Utilities', ], - platforms='any', - packages=find_packages(exclude=['tests', 'example']), + platforms="any", + packages=find_packages(exclude=["tests", "example"]), include_package_data=True, install_requires=requires, python_requires='>=3.8', namespace_packages=['mlx'], keywords=[ - 'coverity', - 'reporting', - 'reStructuredText coverity report', - 'sphinx', - 'ASPICE', - 'ISO26262', - 'ASIL', + "coverity", + "reporting", + "reStructuredText coverity report", + "sphinx", + "ASPICE", + "ISO26262", + "ASIL", ], ) diff --git a/tests/columns_keys.json b/tests/columns_keys.json new file mode 100644 index 00000000..cba3df72 --- /dev/null +++ b/tests/columns_keys.json @@ -0,0 +1,178 @@ +[ + { + "columnKey": "cid", + "name": "CID" + }, + { + "columnKey": "checker", + "name": "Checker" + }, + { + "columnKey": "displayImpact", + "name": "Impact" + }, + { + "columnKey": "displayCategory", + "name": "Category" + }, + { + "columnKey": "displayType", + "name": "Type" + }, + { + "columnKey": "cwe", + "name": "CWE" + }, + { + "columnKey": "displayIssueKind", + "name": "Issue Kind" + }, + { + "columnKey": "status", + "name": "Status" + }, + { + "columnKey": "firstDetected", + "name": "First Detected" + }, + { + "columnKey": "owner", + "name": "Owner" + }, + { + "columnKey": "ownerFullName", + "name": "Owner Name" + }, + { + "columnKey": "externalReference", + "name": "External Reference" + }, + { + "columnKey": "classification", + "name": "Classification" + }, + { + "columnKey": "severity", + "name": "Severity" + }, + { + "columnKey": "action", + "name": "Action" + }, + { + "columnKey": "fixTarget", + "name": "Fix Target" + }, + { + "columnKey": "legacy", + "name": "Legacy" + }, + { + "columnKey": "displayComponent", + "name": "Component" + }, + { + "columnKey": "displayFile", + "name": "File" + }, + { + "columnKey": "displayFunction", + "name": "Function" + }, + { + "columnKey": "functionMergeName", + "name": "Function Merge Name" + }, + { + "columnKey": "mergeExtra", + "name": "Merge Extra" + }, + { + "columnKey": "mergeKey", + "name": "Merge Key" + }, + { + "columnKey": "fileLanguage", + "name": "Language" + }, + { + "columnKey": "lastTriaged", + "name": "Last Triaged" + }, + { + "columnKey": "lastTriagedUser", + "name": "Last Triaged User" + }, + { + "columnKey": "occurrenceCount", + "name": "Count" + }, + { + "columnKey": "displayComparison", + "name": "Comparison" + }, + { + "columnKey": "firstSnapshotDate", + "name": "First Snapshot Date" + }, + { + "columnKey": "firstSnapshotId", + "name": "First Snapshot" + }, + { + "columnKey": "firstSnapshotVersion", + "name": "First Snapshot Version" + }, + { + "columnKey": "firstSnapshotTarget", + "name": "First Snapshot Target" + }, + { + "columnKey": "firstSnapshotDescription", + "name": "First Snapshot Description" + }, + { + "columnKey": "firstSnapshotStream", + "name": "First Snapshot Stream" + }, + { + "columnKey": "lastDetected", + "name": "Last Snapshot Date" + }, + { + "columnKey": "lastDetectedId", + "name": "Last Snapshot" + }, + { + "columnKey": "lastDetectedVersion", + "name": "Last Snapshot Version" + }, + { + "columnKey": "lastDetectedTarget", + "name": "Last Snapshot Target" + }, + { + "columnKey": "lastDetectedDescription", + "name": "Last Snapshot Description" + }, + { + "columnKey": "lastDetectedStream", + "name": "Last Snapshot Stream" + }, + { + "columnKey": "score", + "name": "Score" + }, + { + "columnKey": "lineNumber", + "name": "Line Number" + }, + { + "columnKey": "lastTriageComment", + "name": "Last Triage Comment" + }, + { + "columnKey": "project", + "name": "Project" + } +] diff --git a/tests/filters.py b/tests/filters.py new file mode 100644 index 00000000..95711ea3 --- /dev/null +++ b/tests/filters.py @@ -0,0 +1,142 @@ +# filters, columns_names, response data in apart python bestand + +from collections import namedtuple + +Filter = namedtuple("Filter", "filters column_names request_data") + +# Test with no filters and no column names +test_defect_filter_0 = Filter( + { + "checker": None, + "impact": None, + "kind": None, + "classification": None, + "action": None, + "component": None, + "cwe": None, + "cid": None, + }, + [], + { + "filters": [ + { + "columnKey": "streams", + "matchMode": "oneOrMoreMatch", + "matchers": [{"class": "Stream", "name": "test_stream", "type": "nameMatcher"}], + } + ], + "columns": ["cid"], + "snapshotScope": { + "show": {"scope": "last()", "includeOutdatedSnapshots": False}, + "compareTo": {"scope": "last()", "includeOutdatedSnapshots": False}, + }, + }, +) + +test_defect_filter_1 = Filter( + { + "checker": "MISRA", + "impact": None, + "kind": None, + "classification": "Intentional,Bug,Pending,Unclassified", + "action": None, + "component": None, + "cwe": None, + "cid": None, + }, + ["CID", "Classification", "Checker", "Comment"], + { + "filters": [ + { + "columnKey": "streams", + "matchMode": "oneOrMoreMatch", + "matchers": [{"class": "Stream", "name": "test_stream", "type": "nameMatcher"}], + }, + { + "columnKey": "checker", + "matchMode": "oneOrMoreMatch", + "matchers": [ + {"type": "keyMatcher", "key": "MISRA 2 KEY"}, + {"type": "keyMatcher", "key": "MISRA 1"}, + {"type": "keyMatcher", "key": "MISRA 3"}, + ], + }, + { + "columnKey": "classification", + "matchMode": "oneOrMoreMatch", + "matchers": [ + {"type": "keyMatcher", "key": "Bug"}, + {"type": "keyMatcher", "key": "Pending"}, + {"type": "keyMatcher", "key": "Unclassified"}, + {"type": "keyMatcher", "key": "Intentional"}, + ], + }, + ], + "columns": ["cid", "checker", "lastTriageComment", "classification"], + "snapshotScope": { + "show": {"scope": "last()", "includeOutdatedSnapshots": False}, + "compareTo": {"scope": "last()", "includeOutdatedSnapshots": False}, + }, + }, +) + +test_defect_filter_2 = Filter( + { + "checker": None, + "impact": None, + "kind": None, + "classification": None, + "action": None, + "component": None, + "cwe": None, + "cid": None, + }, + ["CID", "Checker", "Status", "Comment"], + { + "filters": [ + { + "columnKey": "streams", + "matchMode": "oneOrMoreMatch", + "matchers": [{"class": "Stream", "name": "test_stream", "type": "nameMatcher"}], + } + ], + "columns": ["status", "cid", "checker", "lastTriageComment"], + "snapshotScope": { + "show": {"scope": "last()", "includeOutdatedSnapshots": False}, + "compareTo": {"scope": "last()", "includeOutdatedSnapshots": False}, + }, + }, +) + +test_defect_filter_3 = Filter( + { + "checker": None, + "impact": None, + "kind": None, + "classification": "Unclassified", + "action": None, + "component": None, + "cwe": None, + "cid": None, + }, + ["CID", "Classification", "Action"], + { + "filters": [ + { + "columnKey": "streams", + "matchMode": "oneOrMoreMatch", + "matchers": [{"class": "Stream", "name": "test_stream", "type": "nameMatcher"}], + }, + { + "columnKey": "classification", + "matchMode": "oneOrMoreMatch", + "matchers": [{"type": "keyMatcher", "key": "Unclassified"}], + }, + ], + "columns": ["cid", "classification", "action"], + "snapshotScope": { + "show": {"scope": "last()", "includeOutdatedSnapshots": False}, + "compareTo": {"scope": "last()", "includeOutdatedSnapshots": False}, + }, + }, +) diff --git a/tests/test_coverity.py b/tests/test_coverity.py index b7e91598..208f622e 100644 --- a/tests/test_coverity.py +++ b/tests/test_coverity.py @@ -1,126 +1,208 @@ from unittest import TestCase -try: - from unittest.mock import MagicMock, patch - # from unittest.mock import call -except ImportError: - from mock import MagicMock, patch - # from mock import call -import mlx.coverity as cov -import mlx.coverity_services as covservices +from unittest.mock import MagicMock, patch -# For Coverity - SOAP -from suds.client import Client -from suds.wsse import Security, UsernameToken -from lxml import objectify +import json +import requests +import requests_mock +from urllib.parse import urlencode +from pathlib import Path +from parameterized import parameterized +from mlx.coverity import SphinxCoverityConnector, CoverityDefect +from mlx.coverity_services import CoverityDefectService +from .filters import test_defect_filter_0, test_defect_filter_1, test_defect_filter_2, test_defect_filter_3 -class TestCoverity(TestCase): +TEST_FOLDER = Path(__file__).parent + + +def ordered(obj): + if isinstance(obj, dict): + return sorted((k, ordered(v)) for k, v in obj.items()) + if isinstance(obj, list): + return sorted(ordered(x) for x in obj) + else: + return obj + +class TestCoverity(TestCase): def setUp(self): - ''' SetUp to be run before each test to provide clean working env ''' - - @patch('mlx.coverity_services.UsernameToken') - @patch('mlx.coverity_services.Security') - @patch('mlx.coverity_services.Client') - def test_configuration_service_login(self, suds_client_mock, suds_security_mock, suds_username_mock): - ''' Test login function of CoverityConfigurationService ''' - suds_client_mock.return_value = MagicMock(spec=Client) - suds_client_mock.return_value.service = MagicMock(spec=covservices.Service) - suds_client_mock.return_value.service.getVersion = MagicMock() - suds_security_mock.return_value = MagicMock(spec=Security) - suds_security_mock.return_value.tokens = [] - suds_username_mock.return_value = MagicMock(spec=UsernameToken, return_value="bljah") - - # Login to Coverity and obtain stream information - coverity_conf_service = cov.CoverityConfigurationService('http', 'scan.coverity.com', '8080') - suds_client_mock.assert_called_once_with('http://scan.coverity.com:8080/ws/v9/configurationservice?wsdl') - - coverity_conf_service.login('user', 'password') - suds_security_mock.assert_called_once() - suds_username_mock.assert_called_once_with('user', 'password') - # suds_security_mock.tokens.assert_called_once_with("bljah") - # suds_client_mock.set_options.assert_called_once_with(wsse=suds_security_mock) - - @patch('mlx.coverity_services.UsernameToken') - @patch('mlx.coverity_services.Security') - @patch('mlx.coverity_services.Client') - def test_defect_service_login(self, suds_client_mock, suds_security_mock, suds_username_mock): - ''' Test login function of CoverityDefectService ''' - suds_client_mock.return_value = MagicMock(spec=Client) - suds_client_mock.return_value.service = MagicMock(spec=covservices.Service) - suds_client_mock.return_value.service.getVersion = MagicMock() - suds_security_mock.return_value = MagicMock(spec=Security) - suds_security_mock.return_value.tokens = [] - suds_username_mock.return_value = MagicMock(spec=UsernameToken, return_value="bljah") - - # Login to Coverity and obtain stream information - coverity_conf_service = cov.CoverityConfigurationService('http', 'scan.coverity.com', '8080') - suds_client_mock.assert_called_once_with('http://scan.coverity.com:8080/ws/v9/configurationservice?wsdl') - - # Test CoverityDefectService - coverity_service = cov.CoverityDefectService(coverity_conf_service) - suds_client_mock.assert_called_with('http://scan.coverity.com:8080/ws/v9/defectservice?wsdl') - - coverity_service.login('user', 'password') - suds_security_mock.assert_called_once() - suds_username_mock.assert_called_once_with('user', 'password') - - @patch('mlx.coverity_services.UsernameToken') - @patch('mlx.coverity_services.Security') - @patch('mlx.coverity_services.Client') - def test_defect_service_defects(self, suds_client_mock, suds_security_mock, suds_username_mock): - ''' Test login function of CoverityDefectService ''' - suds_client_mock.return_value = MagicMock(spec=Client) - suds_client_mock.return_value.service = MagicMock(spec=covservices.Service) - suds_client_mock.return_value.service.getVersion = MagicMock() - with open('tests/defect_soap.xml', 'rb') as xmlfile: - defect_soap = objectify.fromstring(xmlfile.read()) - suds_client_mock.return_value.service.getMergedDefectsForSnapshotScope = MagicMock(spec=defect_soap, - return_value=defect_soap) - suds_client_mock.return_value.factory = MagicMock() - suds_security_mock.return_value = MagicMock(spec=Security) - suds_security_mock.return_value.tokens = [] - suds_username_mock.return_value = MagicMock(spec=UsernameToken, return_value="bljah") - - # Login to Coverity and obtain stream information - coverity_conf_service = cov.CoverityConfigurationService('http', 'scan.coverity.com', '8080') - suds_client_mock.assert_called_once_with('http://scan.coverity.com:8080/ws/v9/configurationservice?wsdl') - - # Test CoverityDefectService - coverity_service = cov.CoverityDefectService(coverity_conf_service) - suds_client_mock.assert_called_with('http://scan.coverity.com:8080/ws/v9/defectservice?wsdl') - - coverity_service.login('user', 'password') - suds_security_mock.assert_called_once() - suds_username_mock.assert_called_once_with('user', 'password') - - filters = { - 'checker': None, - 'impact': None, - 'kind': None, - 'classification': None, - 'action': None, - 'component': None, - 'cwe': None, - 'cid': None, + """SetUp to be run before each test to provide clean working env""" + self.fake_stream = "test_stream" + + def initialize_coverity_service(self, login=False): + """Logs in Coverity Service and initializes the urls used for REST API. + + Returns: + CoverityDefectService: The coverity defect service + """ + coverity_service = CoverityDefectService("scan.coverity.com/") + + if login: + # Login to Coverity + coverity_service.login("user", "password") + + # urls that are used in GET or POST requests + endpoint = coverity_service.api_endpoint + params = { + "queryType": "bySnapshot", + "retrieveGroupByColumns": "false" + } + self.column_keys_url = f"{endpoint}/issues/columns?{urlencode(params)}" + self.checkers_url = f"{endpoint}/checkerAttributes/checker" + self.stream_url = f"{endpoint}/streams/{self.fake_stream}" + params = { + "includeColumnLabels": "true", + "offset": 0, + "queryType": "bySnapshot", + "rowCount": -1, + "sortOrder": "asc", + } + self.issues_url = f"{endpoint}/issues/search?{urlencode(params)}" + + return coverity_service + + def test_session_by_stream_validation(self): + """To test the session authentication, the function `validate_stream` is used.""" + coverity_service = self.initialize_coverity_service(login=False) + with requests_mock.mock() as mocker: + mocker.get(self.stream_url, json={}) + # Login to Coverity + coverity_service.login("user", "password") + coverity_service.validate_stream(self.fake_stream) + stream_request = mocker.last_request + assert stream_request.headers["Authorization"] == requests.auth._basic_auth_str("user", "password") + + @patch("mlx.coverity_services.requests") + def test_stream_validation(self, mock_requests): + """Test if the function `validate_stream` is called once with the correct url""" + mock_requests.return_value = MagicMock(spec=requests) + + # Get the base url + coverity_service = CoverityDefectService("scan.coverity.com/") + # Login to Coverity + coverity_service.login("user", "password") + with patch.object(CoverityDefectService, "_request") as mock_method: + # Validate stream name + coverity_service.validate_stream(self.fake_stream) + mock_method.assert_called_once() + mock_method.assert_called_with("https://scan.coverity.com/api/v2/streams/test_stream") + + def test_retrieve_columns(self): + """Test the function `retrieve_column_keys`. + Check if the the columns property is correctly initialized by checking if the name of a column returns + the correct key.""" + with open(f"{TEST_FOLDER}/columns_keys.json", "r") as content: + column_keys = json.loads(content.read()) + # initialize what needed for the REST API + coverity_service = self.initialize_coverity_service(login=True) + with requests_mock.mock() as mocker: + mocker.get(self.column_keys_url, json=column_keys) + coverity_service.retrieve_column_keys() + assert mocker.call_count == 1 + mock_request = mocker.last_request + assert mock_request.method == "GET" + assert mock_request.url == self.column_keys_url + assert mock_request.verify + assert coverity_service.columns["Issue Kind"] == "displayIssueKind" + assert coverity_service.columns["CID"] == "cid" + + def test_retrieve_checkers(self): + """Test the function `retrieve_checkers`. Check if the returned list of the checkers property is equal to the + keys of checkerAttributedata of the returned data of the request.""" + self.fake_checkers = { + "checkerAttribute": {"name": "checker", "displayName": "Checker"}, + "checkerAttributedata": [ + {"key": "MISRA", "value": "M"}, + {"key": "CHECKER", "value": "C"} + ], + } + # initialize what needed for the REST API + coverity_service = self.initialize_coverity_service(login=True) + + with requests_mock.mock() as mocker: + mocker.get(self.checkers_url, json=self.fake_checkers) + coverity_service.retrieve_checkers() + assert mocker.call_count == 1 + mock_request = mocker.last_request + assert mock_request.method == "GET" + assert mock_request.url == self.checkers_url + assert mock_request.verify + assert coverity_service.checkers == ["MISRA", "CHECKER"] + + @parameterized.expand([ + test_defect_filter_0, + test_defect_filter_1, + test_defect_filter_2, + test_defect_filter_3, + ]) + def test_get_defects(self, filters, column_names, request_data): + """Check get defects with different filters. Check if the response of `get_defects` is the same as expected. + The data is obtained from the filters.py file. + Due to the usage of set in `get_defects` (column_keys), the function `ordered` is used to compare the returned + data of the request where order does not matter.""" + with open(f"{TEST_FOLDER}/columns_keys.json", "r") as content: + column_keys = json.loads(content.read()) + self.fake_checkers = { + "checkerAttribute": {"name": "checker", "displayName": "Checker"}, + "checkerAttributedata": [ + {"key": "MISRA 1", "value": "M 1"}, + {"key": "MISRA 2 KEY", "value": "MISRA 2 VALUE"}, + {"key": "MISRA 3", "value": "M 3"}, + {"key": "C 1", "value": "CHECKER 1"}, + {"key": "C 2", "value": "CHECKER 2"} + ], + } + # initialize what needed for the REST API + coverity_service = self.initialize_coverity_service(login=True) + + with requests_mock.mock() as mocker: + mocker.get(self.column_keys_url, json=column_keys) + mocker.get(self.checkers_url, json=self.fake_checkers) + # Retrieve checkers; required for get_defects() + coverity_service.retrieve_checkers() + # Retreive columns; required for get_defects() + coverity_service.retrieve_column_keys() + # Get defects + with patch.object(CoverityDefectService, "retrieve_issues") as mock_method: + coverity_service.get_defects(self.fake_stream, filters, column_names) + data = mock_method.call_args[0][0] + mock_method.assert_called_once() + assert ordered(data) == ordered(request_data) + + def test_get_filtered_defects(self): + """Test `get_filtered_defects` of SphinxCoverityConnector. Check if `get_defects` is called once with the + correct arguments. + Tests also when `chart_attribute` of the node exists, the name will be added to column_names.""" + sphinx_coverity_connector = SphinxCoverityConnector() + sphinx_coverity_connector.coverity_service = self.initialize_coverity_service(login=False) + sphinx_coverity_connector.stream = self.fake_stream + node_filters = { + "checker": "MISRA", "impact": None, "kind": None, + "classification": "Intentional,Bug,Pending,Unclassified", "action": None, "component": None, + "cwe": None, "cid": None } - coverity_service.get_defects('projectname', 'somestream', filters) - - @patch('mlx.coverity_services.UsernameToken') - @patch('mlx.coverity_services.Security') - @patch('mlx.coverity_services.Client') - def test_configuration_service_login_no_username_error(self, suds_client_mock, suds_security_mock, - suds_username_mock): - ''' Test login function of CoverityConfigurationService when error occurs''' - suds_client_mock.return_value = MagicMock(spec=Client) - suds_client_mock.return_value.service = MagicMock(spec=covservices.Service) - suds_client_mock.return_value.service.getVersion = MagicMock() - suds_security_mock.return_value = MagicMock(spec=Security) - suds_security_mock.return_value.tokens = [] - - # Login to Coverity and obtain stream information - coverity_conf_service = cov.CoverityConfigurationService('http', 'scan.coverity.com', '8080') - suds_client_mock.assert_called_once_with('http://scan.coverity.com:8080/ws/v9/configurationservice?wsdl') - - suds_client_mock.side_effect = Exception((401, 'Unauthorized')) - coverity_conf_service.login('', '') + column_names = {"Comment", "Classification", "CID"} + fake_node = CoverityDefect() + fake_node["col"] = column_names + fake_node["filters"] = node_filters + with patch.object(CoverityDefectService, "get_defects") as mock_method: + sphinx_coverity_connector.get_filtered_defects(fake_node) + mock_method.assert_called_once_with(self.fake_stream, fake_node["filters"], column_names) + fake_node["chart_attribute"] = "Checker" + column_names.add("Checker") + sphinx_coverity_connector.get_filtered_defects(fake_node) + mock_method.assert_called_with(self.fake_stream, fake_node["filters"], column_names) + + def test_failed_login(self): + """Test a failed login by mocking the status code when validating the stream.""" + coverity_conf_service = CoverityDefectService("scan.coverity.com/") + stream_url = f"{coverity_conf_service.api_endpoint}/streams/{self.fake_stream}" + + with requests_mock.mock() as mocker: + mocker.get(stream_url, headers={"Authorization": "Basic fail"}, status_code=401) + # Login to Coverity + coverity_conf_service.login("user", "password") + # Validate stream name + with self.assertRaises(requests.HTTPError) as err: + coverity_conf_service.validate_stream(self.fake_stream) + self.assertEqual(err.exception.response.status_code, 401) diff --git a/tox.ini b/tox.ini index a48d3098..1a5c0504 100644 --- a/tox.ini +++ b/tox.ini @@ -29,16 +29,18 @@ passenv = * usedevelop = false deps= + build mock pytest pytest-cov + requests_mock coverage reportlab sphinx-testing >= 0.5.2 sphinx_selective_exclude sphinx_rtd_theme + parameterized python-decouple - suds-py3 urlextract setuptools_scm matplotlib @@ -53,8 +55,8 @@ whitelist_externals = mlx-warnings commands= test: {posargs:py.test --cov=mlx.coverity --cov-report=term-missing -vv tests/} - html: mlx-warnings --config warnings_config.json --command make -C example html - latexpdf: mlx-warnings --config warnings_config.json --command make -C example latexpdf + html: mlx-warnings --config warnings_config.yml --command make -C example html + latexpdf: mlx-warnings --config warnings_config.yml --command make -C example latexpdf [testenv:check] deps = @@ -66,7 +68,7 @@ deps = pygments skip_install = true commands = - python setup.py sdist + python -m build twine check dist/* check-manifest {toxinidir} -u flake8 --ignore=W605,W391 mlx tests setup.py diff --git a/warnings_config.json b/warnings_config.json deleted file mode 100644 index 888ab4b0..00000000 --- a/warnings_config.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "sphinx":{ - "enabled": true, - "min": 0, - "max": 0, - "exclude": [ - "WARNING: Connection failed: ", - "WARNING: Connection failed: ", - "CRITICAL:root:No such Coverity Configuration Service", - "WARNING: cannot cache unpickable configuration value: 'traceability_attributes_sort' \\(because it contains a function, class, or module object\\)" - ] - } -} diff --git a/warnings_config.yml b/warnings_config.yml new file mode 100644 index 00000000..fa28d1dc --- /dev/null +++ b/warnings_config.yml @@ -0,0 +1,21 @@ +sphinx: + enabled: true + min: 0 + max: 0 + exclude: + - 'WARNING: Connection failed: HTTPSConnectionPool\(host=.+, port=\d+\): Max retries exceeded with url: /api/v2/[\w/?=&\\]+ \(Caused by NameResolutionError\(\": Failed to resolve .+ \(\[Errno -2\] Name or service not known\)\"\)\)' + - 'WARNING: Connection failed: HTTPSConnectionPool\(host=.+, port=\d+\): Max retries exceeded with url: /api/v2/[\w/?=&\\]+ \(Caused by NameResolutionError\(\": Failed to resolve .+ \(\[Errno -3\] Temporary failure in name resolution\)\"\)\)' + - 'WARNING: CID \d+: Could not find item ID .+ in traceability collection.' + - 'WARNING: cannot cache unpickable configuration value: .traceability_attributes_sort. \(because it contains a function, class, or module object\)' +doxygen: + enabled: false +junit: + enabled: false +xmlrunner: + enabled: false +coverity: + enabled: false +robot: + enabled: false +polyspace: + enabled: false