From e7cf793acbf2380c4dcaee03f282252ba7af6ae1 Mon Sep 17 00:00:00 2001 From: John Andersen Date: Sun, 7 Nov 2021 13:26:50 -0800 Subject: [PATCH] util: testing: manifest: shim: Initial commit Signed-off-by: John Andersen --- dffml/util/testing/manifest/shim.py | 301 ++++++++++++++++++++++++++++ 1 file changed, 301 insertions(+) create mode 100755 dffml/util/testing/manifest/shim.py diff --git a/dffml/util/testing/manifest/shim.py b/dffml/util/testing/manifest/shim.py new file mode 100755 index 0000000000..00c033d42f --- /dev/null +++ b/dffml/util/testing/manifest/shim.py @@ -0,0 +1,301 @@ +#!/usr/bin/env python +""" +Manifest/TPS Report Shim +======================== + +Validate and parse a TPS Report (manifest). Execute something for the next stage +of parsing. + +This file is used as a shim to bridge the gap between the parsing for the +TPS manifest format and the next action to taken after parsing. This file allows +for registration of phase 2 parsers via environment variables. + +The purpose of this script is to preform the initial validation and parsing of +the TPS manifest. It's responsibility is to then call the appropriate next phase +manifest parser. It will pass the manifest's data in a format the next phase +understands, and execute the next phase using capabilities defined within this +file. + +Updates +------- + +This file has been vendored into multiple locations. Please be sure to track +progress as the format evolves upstream. Upstream URL: +https://github.com/intel/dffml/blob/manifest/dffml/util/testing/manifest/shim.py + +Pull Request for discussion: + +Contributing +------------ + +This section is documentation for contributing to the TPS Report (manifest) +shim. + +We want this shim to be usable on a default format which we'll work to define as +a community upstream. + +Design Goals +```````````` + +This shim MUST + +- Work with arbitrary manifest formats + +- Discover verification mechanisms + +- Verify the manifest (think secure boot) + +- Parse the manifest + +- Discover phase 2 parsers + +- Output the manifest in a format the phase 2 parser can understand + +- Execute the phase 2 parser + +Format +`````` + +We need to come up with a format that allows us to evolve it as we move +forward. + +To make sure we have forwards / backwards compatibility we should +include information which allows us to identify what format the document +is in, and what version of that format it is. This will likely also feed +into our input dataflow requirements as we'll need to have the ability +to check an arbitrary input to see if we might have an applicable +converter. + +Let's learn from JSON Schema and include a URL where we might be able +to find the schema for the document. We can double up on our previous +needs by asking that the filename of the URL can help us identify our +document format (we'll provide fallback for if we don't have control +over the filename via the ``document_format`` and ``$document_version`` +keys). We'll parse the URL for the filename component. When we parse it +we'll split on ``.``. If the first part is eff (Extensible Format +Format) we'll treat the rest up until the semantic version as the format +name. Then the semantic version is the version of the format. Then the +rest should be the extension which is associated with the format which +we can use to validate the contents of the document, such as JSON +schema. + +``$schema: "https://example.com/eff.my.document.format.0.0.0.schema.json"`` + +TODO +---- + +- Verification of the manifest. Idea: Developer generates manifest. + Signs manifest with public asymmetric key. Prepends base64 encoded + signature as a valid key, ``$signature``. This means you have to + parse the YAML before you have verified the signature, which is not + ideal. However, it's one method available to us and a simple parse + without the use of a full YAML parser could be done. Or we could + distribute out of band and verify the document before the conversion + stage, in the loading stage. + +- Verification of references within manifest. Do we support public + portion of CA key embedded in the document various places? We + could then use it for things like verification of git repos where + the CA must sign all developer keys which are in the repo history. + This will apply to anything that is an external reference in the + document. There should be a way for the document to include an HMAC or + something like that or something more dynamic like a CA. + +Notes +----- + +- https://github.com/mjg59/ssh_pki + + - Should we use this? No. Are we going to? Yes. +""" +import os +import sys +import pathlib +import importlib +import contextlib +import dataclasses +from typing import Dict + +with contextlib.suppress((ImportError, ModuleNotFoundError)): + import yaml + + +def parse(contents: str): + r''' + Given the contents of the manifest file as a string, parse the contents into + a dictionary object. + + :param str conents: string containing the manifest file's contents + :return: a dictionary representing the manifest + :rtype: dict + + >>> import textwrap + >>> from dffml.util.testing.manifest.shim import parse + >>> + >>> parse( + ... textwrap.dedent( + ... """\ + ... $document_format: tps.manifest + ... $document_version: 0.0.1 + ... testplan: + ... - git: + ... repo: https://example.com/my-repo.git + ... branch: main + ... file: my_test.py + ... """ + ... ) + ... ) + {'$document_format': 'tps.manifest', '$document_version': '0.0.1', 'testplan': [{'git': {'repo': 'https://example.com/my-repo.git', 'branch': 'main', 'file': 'my_test.py'}}]} + ''' + try: + return json.loads(contents) + except Exception as json_parse_error: + if "yaml" not in sys.modules[__name__].__dict__: + raise + try: + return yaml.safe_load(contents) + except Exception as yaml_parse_error: + raise yaml_parse_error from json_parse_error + + from pprint import pprint + + # Known parser mapping + parse = { + ( + "tps.manifest", + "0.0.0", + "dataflow", + ): self.parse_my_document_format_0_0_0_dataflow + } + # Grab mapped parser + document_format_version_output_mode = ( + doc.get("$document_format", None), + doc.get("$document_version", None), + doc.get("$document_version", None), + ) + parser = parse.get(document_format_version, None) + + if parser is None: + raise Exception( + f"Unknown document format/version pair: {document_format_version}" + ) + + print() + pprint(doc) + print() + parser(doc) + + def parse_my_document_format_0_0_0_dataflow(self, doc): + pass + + +@dataclasses.dataclass +class ManifestFormatParser: + """ + Read in configuration to determine what the next phase of parsing is. + + args holds arguments passed to target. + """ + + format_name: str + version: str + output: str + action: str + target: str + args: str = "" + + +ENV_PREFIX = "TPS_MANIFEST_" + + +def environ_discover_dataclass( + dataclass, + environ: Dict[str, str] = None, + *, + prefix: str = ENV_PREFIX, + dataclass_key: str = None, +): + r""" + >>> import dataclasses + >>> from dffml.util.testing.manifest.shim import environ_discover_dataclass + >>> + >>> @dataclasses.dataclass + ... class MyDataclass: + ... name: str + ... version: str + >>> + >>> environ_discover_dataclass( + ... MyDataclass, + ... { + ... "MYPREFIX_NAME_EXAMPLE_FORMAT": "Example Format", + ... "MYPREFIX_VERSION_EXAMPLE_FORMAT": "0.0.1", + ... }, + ... prefix="MYPREFIX_", + ... ) + {'example_format': MyDataclass(name='Example Format', version='0.0.1')} + >>> + >>> environ_discover_dataclass( + ... MyDataclass, + ... { + ... "MYPREFIX_VERSION_EXAMPLE_FORMAT": "0.0.1", + ... }, + ... prefix="MYPREFIX_", + ... dataclass_key="name", + ... ) + {'example_format': MyDataclass(name='example_format', version='0.0.1')} + """ + if environ is None: + environ = os.environ + discovered_parsers = {} + for key, value in environ.items(): + if not key.startswith(prefix): + continue + metadata_key, parser_name = ( + key[len(prefix) :].lower().split("_", maxsplit=1) + ) + discovered_parsers.setdefault(parser_name, {}) + discovered_parsers[parser_name][metadata_key] = value + # Ensure they are loaded into the correct class + for key, value in discovered_parsers.items(): + if dataclass_key is not None: + value[dataclass_key] = key + discovered_parsers[key] = dataclass(**value) + return discovered_parsers + + +def shim(manifest: str, lockdown: bool, strict: bool): + parsers = environ_discover_dataclass( + ManifestFormatParser, dataclass_key="format_name", environ=os.environ + ) + print(parsers) + + +def make_parser(): + parser = argparse.ArgumentParser( + prog="shim.py", + formatter_class=argparse.RawDescriptionHelpFormatter, + description=__doc__, + ) + + parser.add_argument( + "-l", "--lockdown", type=bool, action="store_true", default=False, + ) + parser.add_argument( + "-s", "--strict", type=argparse.FileType("r"), default=sys.stdin + ) + parser.add_argument( + "-i", "--input", type=argparse.FileType("r"), default=sys.stdin + ) + parser.add_argument( + "-o", "--output", type=argparse.FileType("w"), default=sys.stdout + ) + parser.add_argument("-n", "--name", help="Name of function to replace") + return parser + + +def main(): + parser = make_parser() + args = parser.parse_args() + args.output.write( + replace_function(args.input.read(), args.name, args.func.read()) + "\n" + )