-
Notifications
You must be signed in to change notification settings - Fork 37
/
validator.py
executable file
·163 lines (131 loc) · 4.45 KB
/
validator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/env python3
import collections
import logging
import sys
from argparse import ArgumentParser
from os import getcwd, path
import jsonschema
import yaml
from colors import color
from jsonschema.exceptions import RefResolutionError
from openapi_spec_validator import openapi_v3_spec_validator
from openapi_spec_validator.handlers import UrlHandler
from six.moves.urllib.parse import urlparse
logging.basicConfig(level=logging.DEBUG)
def resolve_refs(uri, spec):
"""Resolve JSON references in a given dictionary.
OpenAPI spec may contain JSON references to its nodes or external
sources, so any attempt to rely that there's some expected attribute
in the spec may fail. So we need to resolve JSON references before
we use it (i.e. replace with referenced object). For details see:
https://tools.ietf.org/html/draft-pbryan-zyp-json-ref-02
The input spec is modified in-place despite being returned from
the function.
"""
resolver = jsonschema.RefResolver(uri, spec)
def _do_resolve(node):
if isinstance(node, collections.Mapping) and "$ref" in node:
with resolver.resolving(node["$ref"]) as resolved:
return resolved
elif isinstance(node, collections.Mapping):
for k, v in node.items():
node[k] = _do_resolve(v)
elif isinstance(node, (list, tuple)):
for i in range(len(node)):
node[i] = _do_resolve(node[i])
return node
return _do_resolve(spec)
def get_custom_spec(fpath, version):
italia_oas3_schema = yaml.safe_load(open(fpath))
print(italia_oas3_schema)
resolve_refs("", italia_oas3_schema)
return italia_oas3_schema["oas-" + ".".join(version)]
def validate(url):
counter = 0
try:
handler = UrlHandler("http", "https", "file")
if not urlparse(url).scheme:
url = "file://" + path.join(getcwd(), url)
spec = handler(url)
for i in openapi_v3_spec_validator.iter_errors(spec, spec_url=url):
counter += 1
print_error(
counter, ":".join(i.absolute_path), i.message, i.instance
)
except RefResolutionError as e:
counter += 1
print_error(
counter,
"",
f"Unable to resolve {e.__context__.args[0]} in {e.args[0]}",
"",
)
except BaseException:
counter += 1
print_error(counter, "", sys.exc_info()[0], "")
finally:
if counter > 0:
print()
print(
color(
" [FAIL] %d errors found " % counter,
fg="white",
bg="red",
style="bold",
)
)
return 1
else:
print_ok(" [PASS] No errors found ")
return 0
def print_ok(msg):
print(color(f" {msg} ", fg="white", bg="green", style="bold"))
def print_error(count, path, message, instance):
print()
print(
color("Error #%d in [%s]:" % (count, path or "unknown"), style="bold")
)
print(" %s" % message)
print(" %s" % instance)
def main(
specfile,
validation_type="extended",
schema_file="openapi-v3/metadata.yaml",
):
# OAS validation.
spec = yaml.safe_load(open(specfile))
openapi_version = spec["openapi"].split(".")
validate(specfile)
# Interoperability model validation
if validation_type == "extended":
api_schema = get_custom_spec(schema_file, openapi_version[:2])
for g in ("info", "servers"):
assert g in spec
jsonschema.validate(instance=spec[g], schema=api_schema[g])
print_ok(" [PASS] All required metadata are present.")
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"--spec",
dest="spec",
required=True,
help="OAS3 spec file or url",
default=False,
)
parser.add_argument(
"--schema",
dest="schema",
required=False,
help="Additional schema file to use for validation",
default="openapi-v3/metadata.yaml",
)
parser.add_argument(
"--type",
dest="validation_type",
required=False,
help="Whether validate just oas3 or extended x-attributes."
" Valid values: 'oas3', 'extended'",
default="extended",
)
args = parser.parse_args()
main(args.spec, validation_type=args.validation_type)