This repository has been archived by the owner on Aug 25, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 138
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
util: testing: manifest: shim: Initial commit
Signed-off-by: John Andersen <[email protected]>
- Loading branch information
Showing
1 changed file
with
301 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,301 @@ | ||
#!/usr/bin/env python | ||
""" | ||
Manifest/TPS Report Shim | ||
======================== | ||
Validate and parse a TPS Report (manifest). Execute something for the next stage | ||
of parsing. | ||
This file is used as a shim to bridge the gap between the parsing for the | ||
TPS manifest format and the next action to taken after parsing. This file allows | ||
for registration of phase 2 parsers via environment variables. | ||
The purpose of this script is to preform the initial validation and parsing of | ||
the TPS manifest. It's responsibility is to then call the appropriate next phase | ||
manifest parser. It will pass the manifest's data in a format the next phase | ||
understands, and execute the next phase using capabilities defined within this | ||
file. | ||
Updates | ||
------- | ||
This file has been vendored into multiple locations. Please be sure to track | ||
progress as the format evolves upstream. Upstream URL: | ||
https://github.com/intel/dffml/blob/manifest/dffml/util/testing/manifest/shim.py | ||
Pull Request for discussion: | ||
Contributing | ||
------------ | ||
This section is documentation for contributing to the TPS Report (manifest) | ||
shim. | ||
We want this shim to be usable on a default format which we'll work to define as | ||
a community upstream. | ||
Design Goals | ||
```````````` | ||
This shim MUST | ||
- Work with arbitrary manifest formats | ||
- Discover verification mechanisms | ||
- Verify the manifest (think secure boot) | ||
- Parse the manifest | ||
- Discover phase 2 parsers | ||
- Output the manifest in a format the phase 2 parser can understand | ||
- Execute the phase 2 parser | ||
Format | ||
`````` | ||
We need to come up with a format that allows us to evolve it as we move | ||
forward. | ||
To make sure we have forwards / backwards compatibility we should | ||
include information which allows us to identify what format the document | ||
is in, and what version of that format it is. This will likely also feed | ||
into our input dataflow requirements as we'll need to have the ability | ||
to check an arbitrary input to see if we might have an applicable | ||
converter. | ||
Let's learn from JSON Schema and include a URL where we might be able | ||
to find the schema for the document. We can double up on our previous | ||
needs by asking that the filename of the URL can help us identify our | ||
document format (we'll provide fallback for if we don't have control | ||
over the filename via the ``document_format`` and ``$document_version`` | ||
keys). We'll parse the URL for the filename component. When we parse it | ||
we'll split on ``.``. If the first part is eff (Extensible Format | ||
Format) we'll treat the rest up until the semantic version as the format | ||
name. Then the semantic version is the version of the format. Then the | ||
rest should be the extension which is associated with the format which | ||
we can use to validate the contents of the document, such as JSON | ||
schema. | ||
``$schema: "https://example.com/eff.my.document.format.0.0.0.schema.json"`` | ||
TODO | ||
---- | ||
- Verification of the manifest. Idea: Developer generates manifest. | ||
Signs manifest with public asymmetric key. Prepends base64 encoded | ||
signature as a valid key, ``$signature``. This means you have to | ||
parse the YAML before you have verified the signature, which is not | ||
ideal. However, it's one method available to us and a simple parse | ||
without the use of a full YAML parser could be done. Or we could | ||
distribute out of band and verify the document before the conversion | ||
stage, in the loading stage. | ||
- Verification of references within manifest. Do we support public | ||
portion of CA key embedded in the document various places? We | ||
could then use it for things like verification of git repos where | ||
the CA must sign all developer keys which are in the repo history. | ||
This will apply to anything that is an external reference in the | ||
document. There should be a way for the document to include an HMAC or | ||
something like that or something more dynamic like a CA. | ||
Notes | ||
----- | ||
- https://github.com/mjg59/ssh_pki | ||
- Should we use this? No. Are we going to? Yes. | ||
""" | ||
import os | ||
import sys | ||
import pathlib | ||
import importlib | ||
import contextlib | ||
import dataclasses | ||
from typing import Dict | ||
|
||
with contextlib.suppress((ImportError, ModuleNotFoundError)): | ||
import yaml | ||
|
||
|
||
def parse(contents: str): | ||
r''' | ||
Given the contents of the manifest file as a string, parse the contents into | ||
a dictionary object. | ||
:param str conents: string containing the manifest file's contents | ||
:return: a dictionary representing the manifest | ||
:rtype: dict | ||
>>> import textwrap | ||
>>> from dffml.util.testing.manifest.shim import parse | ||
>>> | ||
>>> parse( | ||
... textwrap.dedent( | ||
... """\ | ||
... $document_format: tps.manifest | ||
... $document_version: 0.0.1 | ||
... testplan: | ||
... - git: | ||
... repo: https://example.com/my-repo.git | ||
... branch: main | ||
... file: my_test.py | ||
... """ | ||
... ) | ||
... ) | ||
{'$document_format': 'tps.manifest', '$document_version': '0.0.1', 'testplan': [{'git': {'repo': 'https://example.com/my-repo.git', 'branch': 'main', 'file': 'my_test.py'}}]} | ||
''' | ||
try: | ||
return json.loads(contents) | ||
except Exception as json_parse_error: | ||
if "yaml" not in sys.modules[__name__].__dict__: | ||
raise | ||
try: | ||
return yaml.safe_load(contents) | ||
except Exception as yaml_parse_error: | ||
raise yaml_parse_error from json_parse_error | ||
|
||
from pprint import pprint | ||
|
||
# Known parser mapping | ||
parse = { | ||
( | ||
"tps.manifest", | ||
"0.0.0", | ||
"dataflow", | ||
): self.parse_my_document_format_0_0_0_dataflow | ||
} | ||
# Grab mapped parser | ||
document_format_version_output_mode = ( | ||
doc.get("$document_format", None), | ||
doc.get("$document_version", None), | ||
doc.get("$document_version", None), | ||
) | ||
parser = parse.get(document_format_version, None) | ||
|
||
if parser is None: | ||
raise Exception( | ||
f"Unknown document format/version pair: {document_format_version}" | ||
) | ||
|
||
print() | ||
pprint(doc) | ||
print() | ||
parser(doc) | ||
|
||
def parse_my_document_format_0_0_0_dataflow(self, doc): | ||
pass | ||
|
||
|
||
@dataclasses.dataclass | ||
class ManifestFormatParser: | ||
""" | ||
Read in configuration to determine what the next phase of parsing is. | ||
args holds arguments passed to target. | ||
""" | ||
|
||
format_name: str | ||
version: str | ||
output: str | ||
action: str | ||
target: str | ||
args: str = "" | ||
|
||
|
||
ENV_PREFIX = "TPS_MANIFEST_" | ||
|
||
|
||
def environ_discover_dataclass( | ||
dataclass, | ||
environ: Dict[str, str] = None, | ||
*, | ||
prefix: str = ENV_PREFIX, | ||
dataclass_key: str = None, | ||
): | ||
r""" | ||
>>> import dataclasses | ||
>>> from dffml.util.testing.manifest.shim import environ_discover_dataclass | ||
>>> | ||
>>> @dataclasses.dataclass | ||
... class MyDataclass: | ||
... name: str | ||
... version: str | ||
>>> | ||
>>> environ_discover_dataclass( | ||
... MyDataclass, | ||
... { | ||
... "MYPREFIX_NAME_EXAMPLE_FORMAT": "Example Format", | ||
... "MYPREFIX_VERSION_EXAMPLE_FORMAT": "0.0.1", | ||
... }, | ||
... prefix="MYPREFIX_", | ||
... ) | ||
{'example_format': MyDataclass(name='Example Format', version='0.0.1')} | ||
>>> | ||
>>> environ_discover_dataclass( | ||
... MyDataclass, | ||
... { | ||
... "MYPREFIX_VERSION_EXAMPLE_FORMAT": "0.0.1", | ||
... }, | ||
... prefix="MYPREFIX_", | ||
... dataclass_key="name", | ||
... ) | ||
{'example_format': MyDataclass(name='example_format', version='0.0.1')} | ||
""" | ||
if environ is None: | ||
environ = os.environ | ||
discovered_parsers = {} | ||
for key, value in environ.items(): | ||
if not key.startswith(prefix): | ||
continue | ||
metadata_key, parser_name = ( | ||
key[len(prefix) :].lower().split("_", maxsplit=1) | ||
) | ||
discovered_parsers.setdefault(parser_name, {}) | ||
discovered_parsers[parser_name][metadata_key] = value | ||
# Ensure they are loaded into the correct class | ||
for key, value in discovered_parsers.items(): | ||
if dataclass_key is not None: | ||
value[dataclass_key] = key | ||
discovered_parsers[key] = dataclass(**value) | ||
return discovered_parsers | ||
|
||
|
||
def shim(manifest: str, lockdown: bool, strict: bool): | ||
parsers = environ_discover_dataclass( | ||
ManifestFormatParser, dataclass_key="format_name", environ=os.environ | ||
) | ||
print(parsers) | ||
|
||
|
||
def make_parser(): | ||
parser = argparse.ArgumentParser( | ||
prog="shim.py", | ||
formatter_class=argparse.RawDescriptionHelpFormatter, | ||
description=__doc__, | ||
) | ||
|
||
parser.add_argument( | ||
"-l", "--lockdown", type=bool, action="store_true", default=False, | ||
) | ||
parser.add_argument( | ||
"-s", "--strict", type=argparse.FileType("r"), default=sys.stdin | ||
) | ||
parser.add_argument( | ||
"-i", "--input", type=argparse.FileType("r"), default=sys.stdin | ||
) | ||
parser.add_argument( | ||
"-o", "--output", type=argparse.FileType("w"), default=sys.stdout | ||
) | ||
parser.add_argument("-n", "--name", help="Name of function to replace") | ||
return parser | ||
|
||
|
||
def main(): | ||
parser = make_parser() | ||
args = parser.parse_args() | ||
args.output.write( | ||
replace_function(args.input.read(), args.name, args.func.read()) + "\n" | ||
) |