-
Notifications
You must be signed in to change notification settings - Fork 6
/
test_val_seat.py
349 lines (267 loc) · 11.9 KB
/
test_val_seat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#!/usr/bin/env python3
# /********************************************************************************
# * Copyright (c) 2022 Contributors to the Eclipse Foundation
# *
# * See the NOTICE file(s) distributed with this work for additional
# * information regarding copyright ownership.
# *
# * This program and the accompanying materials are made available under the
# * terms of the Apache License 2.0 which is available at
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * SPDX-License-Identifier: Apache-2.0
# ********************************************************************************/
import json
import logging
import os
import subprocess # nosec
import grpc
# kuksa API imported as package as some types are with same name
import kuksa.val.v1.types_pb2 as kuksa_types
import kuksa.val.v1.val_pb2 as kuksa_val
import pytest
from sdv.databroker.v1.types_pb2 import Datapoint, DataType
from vdb_helper import VDBHelper
logger = logging.getLogger(__name__)
logger.setLevel(os.getenv("LOG_LEVEL", "INFO"))
# Env USE_DAPR forces usage of vscode tasks and scripts using 'dapr run' with predefined ports
USE_DAPR = os.getenv("USE_DAPR", "1") != "0"
USE_VSS3 = os.getenv("USE_VSS3", "0") != "0"
if USE_VSS3:
DEFAULT_VSS_PATH = "Vehicle.Cabin.Seat.Row1.Pos1.Position"
else:
DEFAULT_VSS_PATH = "Vehicle.Cabin.Seat.Row1.DriverSide.Position"
if USE_DAPR:
DEFAULT_VDB_ADDRESS = "localhost:55555"
DEFAULT_SCRIPT_SEAT_MOVE = "task-seat-move.sh"
else:
DEFAULT_VDB_ADDRESS = "localhost:35555"
DEFAULT_SCRIPT_SEAT_MOVE = "it-seat-move.sh"
VDB_ADDRESS = os.environ.get("VDB_ADDRESS", DEFAULT_VDB_ADDRESS)
SCRIPT_SEAT_MOVE = os.getenv(
"SCRIPT_SEAT_MOVE",
os.path.join(os.path.dirname(__file__), DEFAULT_SCRIPT_SEAT_MOVE),
)
def execute_script(args: list) -> None:
logger.info("$ {}".format(" ".join(args)))
try:
process = subprocess.run(args, check=True) # nosec
# , shell=True, capture_output=True, check=True)
logger.debug("rc:{}".format(process.returncode))
# logger.debug("-->>\n[out] {}\n\[err]{}\n".format(process.stdout, process.stderr))
except Exception as ex:
logging.exception(ex)
@pytest.fixture
async def setup_helper() -> VDBHelper:
logger.info("Using VDB_ADDR={}".format(VDB_ADDRESS))
helper = VDBHelper(VDB_ADDRESS)
return helper
@pytest.mark.asyncio
async def test_vdb_metadata_get(setup_helper: VDBHelper) -> None:
helper = setup_helper
name = os.getenv("TEST_NAME", DEFAULT_VSS_PATH)
meta = await helper.get_vdb_metadata()
logger.debug("# get_vdb_metadata() -> \n{}".format(str(meta).replace("\n", " ")))
assert len(meta) > 0, "VDB Metadata is empty" # nosec B101
meta_list = helper.vdb_metadata_to_json(meta)
logger.debug("get_vdb_metadata()->\n{}".format(json.dumps(meta_list, indent=2)))
meta_names = [d["name"] for d in meta_list]
assert name in meta_names, "{} not registered!".format(name) # nosec B101
name_reg = meta_list[meta_names.index(name)]
assert len(name_reg) == 4 and name_reg["name"] == name # nosec B101
logger.info("Found metadata: {}".format(name_reg))
assert ( # nosec B101
name_reg["data_type"] == DataType.UINT16
), "{} datatype is {}".format(name, name_reg["data_type"])
await helper.close()
@pytest.mark.asyncio
async def test_subscribe_seat_pos_0(setup_helper: VDBHelper) -> None:
helper: VDBHelper = setup_helper
name = os.getenv("TEST_NAME", DEFAULT_VSS_PATH)
query = "SELECT {}".format(name)
start_value = int(os.getenv("TEST_START_VALUE", "500"))
expected_value = int(os.getenv("TEST_VALUE", "0"))
timeout = int(os.getenv("TEST_TIMEOUT", "10"))
# initiate seat move to 42
logger.info(" -- moving seat to initial pos: {} (sync)".format(start_value))
# sync move to predefined pos
execute_script([SCRIPT_SEAT_MOVE, str(start_value), "-w"])
events = []
# inner function for collecting subscription events
def inner_callback(name: str, dp: Datapoint):
dd = helper.datapoint_to_dict(name, dp)
events.append(dd)
logger.info(" -- moving seat to test position {} (async)...".format(expected_value))
execute_script([SCRIPT_SEAT_MOVE, str(expected_value)])
logger.debug(
"\n# subscribing('{}', timeout={}), expecting:{}".format(
query, timeout, expected_value
)
)
await helper.subscribe_datapoints(
query, timeout=timeout, sub_callback=inner_callback
)
assert ( # nosec B101
len(events) > 0
), "Not received events for '{}' in {} sec.".format(name, timeout)
# list of received names
event_names = set([e["name"] for e in events])
# list of received values
event_values_name = [e["value"] for e in events if e["name"] == name]
logger.debug(" --> names : {}".format(event_names))
# event_values = [e['value'] for e in events]
# logger.debug(" --> values : {}".format(event_values))
logger.debug(" --> <{}> : {}".format(name, event_values_name))
assert name in event_names, "{} event not received! {}".format( # nosec B101
name, event_names
)
assert ( # nosec B101
expected_value in event_values_name
), "{} value {} missing! {}".format(name, expected_value, event_values_name)
await helper.close()
@pytest.mark.asyncio
async def test_subscribe_seat_pos_where_eq(setup_helper: VDBHelper) -> None:
helper = setup_helper
name = os.getenv("TEST_NAME", DEFAULT_VSS_PATH)
expected_value = int(os.getenv("TEST_VALUE", "1000"))
timeout = int(os.getenv("TEST_TIMEOUT", "10"))
query = "SELECT {} where {} = {}".format(name, name, expected_value)
events = []
# inner function for collecting subscription events
def inner_callback(name: str, dp: Datapoint):
dd = helper.datapoint_to_dict(name, dp)
events.append(dd)
logger.info(" -- moving seat to test position {} (async)...".format(expected_value))
execute_script([SCRIPT_SEAT_MOVE, str(expected_value)])
logger.debug(
"\n# subscribing('{}', timeout={}), expecting:{}".format(
query, timeout, expected_value
)
)
await helper.subscribe_datapoints(
query, timeout=timeout, sub_callback=inner_callback
)
assert ( # nosec B101
len(events) > 0
), "Not received subscription events for '{}' in {} sec.".format(name, timeout)
# list of received names
event_names = set([e["name"] for e in events])
# list of received values
event_values_name = [e["value"] for e in events if e["name"] == name]
logger.debug(" --> names : {}".format(event_names))
# event_values = [e['value'] for e in events]
# logger.debug(" --> values : {}".format(event_values))
logger.debug(" --> <{}> : {}".format(name, event_values_name))
assert name in event_names, "{} event not received! {}".format( # nosec B101
name, event_names
)
assert ( # nosec B101
expected_value in event_values_name
), "{} value {} missing! {}".format(name, expected_value, event_values_name)
assert ( # nosec B101
len(set(event_values_name)) == 1
), "Should get only 1 value for {}, got: {}".format(name, event_values_name)
await helper.close()
@pytest.mark.asyncio
async def test_subscribe_seat_pos_where_error(setup_helper: VDBHelper) -> None:
helper = setup_helper
name = os.getenv("TEST_NAME", DEFAULT_VSS_PATH)
expected_value = int(os.getenv("TEST_VALUE", "-42"))
timeout = int(os.getenv("TEST_TIMEOUT", "10"))
query = "SELECT {} where <invalid>".format(name)
events = []
# inner function for collecting subscription events
def inner_callback(name: str, dp: Datapoint):
dd = helper.datapoint_to_dict(name, dp)
events.append(dd)
with pytest.raises(grpc.RpcError):
logger.debug(
"\n# subscribing('{}', timeout={}), expecting:{}".format(
query, timeout, expected_value
)
)
await helper.subscribe_datapoints(
query, timeout=timeout, sub_callback=inner_callback
)
assert ( # nosec B101
len(events) == 0
), "Should not receive events for query:'{}'. Got {}".format(query, events)
await helper.close()
@pytest.mark.asyncio
async def test_kuksa_actuator_meta(setup_helper: VDBHelper) -> None:
helper = setup_helper
name = os.getenv("TEST_NAME", DEFAULT_VSS_PATH)
get_response: kuksa_val.GetResponse = await helper.kuksa_get([name])
logger.debug("# kuksa_get({}) -> \n{}".format(name, str(get_response).replace("\n", " ")))
assert ( # nosec B101
not get_response.HasField("error")
), "kuksa_get() Error: {}".format(str(get_response.error))
assert ( # nosec B101
len(get_response.errors) == 0
), "kuksa_get() Errors: {}".format(str(get_response.errors))
assert ( # nosec B101
len(get_response.entries) == 1
), "Unexpected entries count {}".format(str(get_response.entries
))
entry: kuksa_types.DataEntry = get_response.entries[0]
assert entry.path == DEFAULT_VSS_PATH # nosec B101
assert entry.HasField("metadata") # nosec B101
# assert entry.HasField("actuator_target") # nosec B101
assert ( # nosec B101
entry.metadata.data_type == kuksa_types.DATA_TYPE_UINT16
), "Seat position data_type != UINT16: {}".format(entry.metadata.data_type)
assert ( # nosec B101
entry.metadata.entry_type == kuksa_types.ENTRY_TYPE_ACTUATOR
), "Seat position entry_type != ACTUATOR: {}".format(entry.metadata.entry_type)
await helper.close()
@pytest.mark.asyncio
async def test_subscribe_actuator_pos(setup_helper: VDBHelper) -> None:
helper: VDBHelper = setup_helper
name = os.getenv("TEST_NAME", DEFAULT_VSS_PATH)
query = "SELECT {}".format(name)
start_value = int(os.getenv("TEST_START_VALUE", "500"))
expected_value = int(os.getenv("TEST_VALUE", "0"))
timeout = int(os.getenv("TEST_TIMEOUT", "10"))
# initiate seat move to 42
logger.info(" -- moving seat to initial pos: {} (sync)".format(start_value))
# sync move to predefined pos
execute_script([SCRIPT_SEAT_MOVE, str(start_value), "-w"])
events = []
# inner function for collecting subscription events
def inner_callback(name: str, dp: Datapoint):
dd = helper.datapoint_to_dict(name, dp)
events.append(dd)
logger.info(" -- setting seat position {} (async) via actuator target...".format(expected_value))
await helper.set_actuator_uint32_value(name, expected_value)
logger.debug(
"\n# subscribing('{}', timeout={}), expecting:{}".format(
query, timeout, expected_value
)
)
await helper.subscribe_datapoints(
query, timeout=timeout, sub_callback=inner_callback
)
assert ( # nosec B101
len(events) > 0
), "Not received events for '{}' in {} sec.".format(name, timeout)
# list of received names
event_names = set([e["name"] for e in events])
# list of received values
event_values_name = [e["value"] for e in events if e["name"] == name]
logger.debug(" --> names : {}".format(event_names))
# event_values = [e['value'] for e in events]
# logger.debug(" --> values : {}".format(event_values))
logger.debug(" --> <{}> : {}".format(name, event_values_name))
assert name in event_names, "{} event not received! {}".format( # nosec B101
name, event_names
)
assert ( # nosec B101
expected_value in event_values_name
), "{} value {} missing! {}".format(name, expected_value, event_values_name)
await helper.close()
async def main() -> None:
log_level = os.environ.get("LOG_LEVEL", "INFO")
logging.basicConfig(format="<%(levelname)s>\t%(message)s", level=log_level)
if __name__ == "__main__":
# execute_script([SCRIPT_SEAT_MOVE, "500", "-w"])
pytest.main(["-vvs", "--log-cli-level=DEBUG", os.path.abspath(__file__)])