-
Notifications
You must be signed in to change notification settings - Fork 44
/
get_matched_schemas.py
74 lines (61 loc) · 2.68 KB
/
get_matched_schemas.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import inspect
from typing import List
import acto.input.known_schemas
from acto.input.known_schemas.base import K8sSchema
from acto.input.testcase import K8sInvalidTestCase
from acto.schema import ArraySchema, BaseSchema, ObjectSchema, extract_schema
def field_matched(schema: ObjectSchema, k8s_schema: K8sSchema) -> bool:
if not isinstance(schema, ObjectSchema):
return False
if k8s_schema.Match(schema):
return True
for property_name in schema.properties:
if property_name == "apiVersion":
# avoid matching if it is a Kind, which is too generic
return False
if property_name != "enabled" and property_name not in k8s_schema.fields:
return False
return True
def find_matched_schema(schema: BaseSchema) -> List[List[str]]:
"""
Find all the matched schemas in acto.input.known_schemas
@param schema: the schema to be matched, usually is a CustomResourceDefinition schema
@return: A list of paths to the matched schemas, eg [['Deployment', 'spec', 'template', 'spec', 'containers']]
"""
matched_schemas = []
for name, obj in inspect.getmembers(acto.input.known_schemas):
if inspect.isclass(obj) and issubclass(obj, K8sSchema):
if hasattr(obj, 'fields') and isinstance(schema, ObjectSchema):
if field_matched(schema, obj):
matched_schemas.append(list(schema.path))
return matched_schemas
if isinstance(schema, ObjectSchema):
for sub_schema in schema.properties.values():
matched_schemas.extend(find_matched_schema(sub_schema))
elif isinstance(schema, ArraySchema):
matched_schemas.extend(find_matched_schema(schema.get_item_schema()))
return matched_schemas
def get_testcase_breakdown():
for name, obj in inspect.getmembers(acto.input.known_schemas):
if inspect.isclass(obj) and issubclass(obj, K8sSchema):
for _, class_member in inspect.getmembers(obj):
if class_member == K8sInvalidTestCase:
print(name)
if __name__ == '__main__':
import glob
import json
import sys
sys.path.append('..')
files = glob.glob('data/*/context.json')
files.sort()
for file in files:
with open(file, 'r') as f:
context = json.load(f)
crd = extract_schema(
[], context['crd']['body']['spec']['versions'][-1]['schema']['openAPIV3Schema'])
print(f'CRD: {type(crd)}')
paths = find_matched_schema(crd['spec'])
print("Matched schemas for %s:" % file)
for schema in paths:
print(f"known_schemas.K8sField({schema}),")
print()