-
Notifications
You must be signed in to change notification settings - Fork 9
/
testSettings.py
177 lines (147 loc) · 6.24 KB
/
testSettings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import dataclasses
import threading
import typing
from ktoolbox import common
import testConfig
import tftbase
from tftbase import PodInfo
from tftbase import TaskRole
from tftbase import TestMetadata
@common.strict_dataclass
@dataclasses.dataclass(frozen=True, kw_only=True)
class TestSettings:
"""TestSettings will handle determining the logic require to configure the client/server for a given test"""
cfg_descr: testConfig.ConfigDescriptor
instance_index: int
reverse: bool
event_server_alive: threading.Event = dataclasses.field(
init=False, default_factory=threading.Event
)
event_client_finished: threading.Event = dataclasses.field(
init=False, default_factory=threading.Event
)
def _post_check(self) -> None:
# As threading.Lock is not a regular type, @strict_dataclass
# cannot handle fields of it. Set the attribute here.
self._lock: threading.Lock
object.__setattr__(self, "_lock", threading.Lock())
# Access some properties here. We would get an exception early on, if
# there is something wrong.
self.connection
self.test_case_id.info
self.conf_server
self.conf_client
@property
def conf_server(self) -> testConfig.ConfServer:
# For now, only one server/client is supported and TestConfig already
# enforces that. Do tuple-unpacking here, to further assert that there
# is only one server/client.
(c_server,) = self.connection.server
return c_server
@property
def conf_client(self) -> testConfig.ConfClient:
# For now, only one server/client is supported and TestConfig already
# enforces that. Do tuple-unpacking here, to further assert that there
# is only one server/client.
(c_client,) = self.connection.client
return c_client
@property
def conf_server_used(self) -> testConfig.ConfBaseClientServer:
if self.test_case_id.info.is_same_node:
return self.conf_client
else:
return self.conf_server
def conf_clientserver(self, task_role: TaskRole) -> testConfig.ConfBaseClientServer:
if task_role == TaskRole.CLIENT:
return self.conf_client
if task_role == TaskRole.SERVER:
return self.conf_server_used
raise ValueError()
@property
def clmo_barrier(self) -> threading.Barrier:
with self._lock:
b = getattr(self, "_clmo_barrier", None)
if b is None:
raise RuntimeError(
"Cannot access the client-monitor barrier before calling initialize_clmo_barrier()"
)
return typing.cast(threading.Barrier, b)
def initialize_clmo_barrier(self, parties: int) -> None:
with self._lock:
if hasattr(self, "_clmo_barrier"):
raise RuntimeError("initialize_clmo_barrier() can only be called once")
b = threading.Barrier(parties=parties)
# TestSettings is for the most part an immutable, frozen object.
# Here we lie about it. We do initialize the _clmo_barrier only
# during initialize_clmo_barrier().
#
# Note that clmo_barrier will raise an exception if called before
# initializing it. So you will only ever see one instance of the
# barrier that never changes. That almost counts as "immutable".
object.__setattr__(self, "_clmo_barrier", b)
@property
def connection(self) -> testConfig.ConfConnection:
return self.cfg_descr.get_connection()
@property
def test_case_id(self) -> tftbase.TestCaseType:
return self.cfg_descr.get_test_case()
@property
def server_is_tenant(self) -> bool:
# TODO: Handle Case when not tenant
return True
@property
def client_is_tenant(self) -> bool:
# TODO: Handle Case when not tenant
return True
@property
def server_index(self) -> int:
# TODO: Add task indexing
return self.instance_index
@property
def client_index(self) -> int:
# TODO: Add task indexing
return self.instance_index
@property
def server_pod_type(self) -> tftbase.PodType:
return self.test_case_id.info.get_server_pod_type(self.conf_server.pod_type)
@property
def client_pod_type(self) -> tftbase.PodType:
return self.test_case_id.info.get_client_pod_type(self.conf_client.pod_type)
@property
def connection_mode(self) -> tftbase.ConnectionMode:
return self.test_case_id.info.connection_mode
def get_test_info(self) -> str:
return f"""type={self.connection.test_type.name}, test-case={self.test_case_id.name}: {self.client_pod_type.name} pod to {self.connection_mode.name} to {self.server_pod_type.name} pod - {self.test_case_id.info.node_location}
Client Node: {self.conf_client.name}
Tenant={self.client_is_tenant}
Index={self.client_index}
Server Node: {self.conf_server_used.name}
Exec Persistence: {self.conf_server.persistent}
Tenant={self.server_is_tenant}
Index={self.server_index}"""
def get_test_str(self) -> str:
direction = ""
if self.reverse:
direction = "-REV"
return f"{self.test_case_id.name}-{self.client_pod_type.name}_TO_{self.connection_mode.name}_TO_{self.server_pod_type.name}-{self.test_case_id.info.node_location}{direction}"
def get_test_metadata(self) -> TestMetadata:
return TestMetadata(
tft_idx=self.cfg_descr.tft_idx,
test_cases_idx=self.cfg_descr.test_cases_idx,
connections_idx=self.cfg_descr.connections_idx,
test_case_id=self.test_case_id,
test_type=self.connection.test_type,
reverse=self.reverse,
server=PodInfo(
name=self.conf_server_used.name,
pod_type=self.server_pod_type,
is_tenant=self.server_is_tenant,
index=self.client_index,
),
client=PodInfo(
name=self.conf_client.name,
pod_type=self.client_pod_type,
is_tenant=self.client_is_tenant,
index=self.client_index,
),
)