Skip to content

Commit

Permalink
Finish fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmittag committed Sep 25, 2024
1 parent 89a43f1 commit cc19d2c
Show file tree
Hide file tree
Showing 4 changed files with 1,390 additions and 538 deletions.
60 changes: 43 additions & 17 deletions kuksa-client/kuksa_client/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,16 @@ def from_tuple(cls, path: str, dp: types_v2.Datapoint):
field_descriptor, value = data[0]
field_name = field_descriptor.name
value = getattr(dp.value, field_name)
if dp.timestamp.seconds == 0 and dp.timestamp.nanos == 0:
timestamp = None
else:
timestamp = dp.timestamp.ToDatetime(
tzinfo=datetime.timezone.utc,
)
return cls(
entry=DataEntry(path=path, value=Datapoint(value)),
entry=DataEntry(
path=path, value=Datapoint(value=value, timestamp=timestamp)
),
fields=[Field(value=types_v1.FIELD_VALUE)],
)

Expand Down Expand Up @@ -918,7 +926,6 @@ class VSSClient(BaseVSSClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.channel = None
self.channel2 = None
self.exit_stack = contextlib.ExitStack()

def __enter__(self):
Expand Down Expand Up @@ -952,20 +959,16 @@ def connect(self, target_host=None):
logger.info(f"Using TLS server name {self.tls_server_name}")
options = [("grpc.ssl_target_name_override", self.tls_server_name)]
channel = grpc.secure_channel(target_host, creds, options)
channel2 = grpc.secure_channel(target_host, creds, options)
else:
logger.debug("Not providing explicit TLS server name")
channel = grpc.secure_channel(target_host, creds)
channel2 = grpc.secure_channel(target_host, creds)
else:
logger.info("Establishing insecure channel")
channel = grpc.insecure_channel(target_host)
channel2 = grpc.insecure_channel(target_host)

self.channel = self.exit_stack.enter_context(channel)
self.channel2 = self.exit_stack.enter_context(channel2)
self.client_stub_v1 = val_grpc_v1.VALStub(self.channel)
self.client_stub_v2 = val_grpc_v2.VALStub(self.channel2)
self.client_stub_v2 = val_grpc_v2.VALStub(self.channel)
self.connected = True
if self.ensure_startup_connection:
logger.debug("Connected to server: %s", self.get_server_info())
Expand All @@ -975,7 +978,6 @@ def disconnect(self):
self.client_stub_v1 = None
self.client_stub_v2 = None
self.channel = None
self.channel2 = None
self.connected = False

@check_connected
Expand Down Expand Up @@ -1144,6 +1146,7 @@ def subscribe_current_values(
SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,))
for path in paths
),
v1=False,
**rpc_kwargs,
):
yield {update.entry.path: update.entry.value for update in updates}
Expand Down Expand Up @@ -1249,6 +1252,15 @@ def set(
self._process_set_response(resp)
else:
logger.info("Using v2")
if len(updates) == 0:
raise VSSClientError(
error={
"code": grpc.StatusCode.INVALID_ARGUMENT.value[0],
"reason": grpc.StatusCode.INVALID_ARGUMENT.value[1],
"message": "No datapoints requested",
},
errors=[],
)
for update in updates:
req = self._prepare_publish_value_request(
update, paths_with_required_type
Expand All @@ -1260,7 +1272,7 @@ def set(

@check_connected
def subscribe(
self, entries: Iterable[SubscribeEntry], **rpc_kwargs
self, entries: Iterable[SubscribeEntry], v1: bool = True, **rpc_kwargs
) -> Iterator[List[EntryUpdate]]:
"""
Parameters:
Expand All @@ -1271,14 +1283,28 @@ def subscribe(
rpc_kwargs["metadata"] = self.generate_metadata_header(
rpc_kwargs.get("metadata")
)
req = self._prepare_subscribe_request(entries)
resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs)
try:
for resp in resp_stream:
logger.debug("%s: %s", type(resp).__name__, resp)
yield [EntryUpdate.from_message(update) for update in resp.updates]
except RpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc
if v1:
req = self._prepare_subscribe_request(entries)
resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs)
try:
for resp in resp_stream:
logger.debug("%s: %s", type(resp).__name__, resp)
yield [EntryUpdate.from_message(update) for update in resp.updates]
except RpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc
else:
logger.info("Using v2")
req = self._prepare_subscribev2_request(entries)
resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs)
try:
for resp in resp_stream:
logger.debug("%s: %s", type(resp).__name__, resp)
yield [
EntryUpdate.from_tuple(path, dp)
for path, dp in resp.entries.items()
]
except RpcError as exc:
raise VSSClientError.from_grpc_error(exc) from exc

@check_connected
def authorize(self, token: str, **rpc_kwargs) -> str:
Expand Down
17 changes: 10 additions & 7 deletions kuksa-client/kuksa_client/grpc/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class VSSClient(BaseVSSClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.channel = None
self.channel2 = None
self.exit_stack = contextlib.AsyncExitStack()

async def __aenter__(self):
Expand All @@ -76,20 +75,16 @@ async def connect(self, target_host=None):
logger.info(f"Using TLS server name {self.tls_server_name}")
options = [("grpc.ssl_target_name_override", self.tls_server_name)]
channel = grpc.aio.secure_channel(target_host, creds, options)
channel2 = grpc.aio.secure_channel(target_host, creds, options)
else:
logger.debug("Not providing explicit TLS server name")
channel = grpc.aio.secure_channel(target_host, creds)
channel2 = grpc.aio.secure_channel(target_host, creds)
else:
logger.info("Establishing insecure channel")
channel = grpc.aio.insecure_channel(target_host)
channel2 = grpc.aio.insecure_channel(target_host)

self.channel = await self.exit_stack.enter_async_context(channel)
self.channel2 = await self.exit_stack.enter_async_context(channel2)
self.client_stub_v1 = val_grpc_v1.VALStub(self.channel)
self.client_stub_v2 = val_grpc_v2.VALStub(self.channel2)
self.client_stub_v2 = val_grpc_v2.VALStub(self.channel)
self.connected = True
if self.ensure_startup_connection:
logger.debug("Connected to server: %s", await self.get_server_info())
Expand All @@ -99,7 +94,6 @@ async def disconnect(self):
self.client_stub_v1 = None
self.client_stub_v2 = None
self.channel = None
self.channel2 = None
self.connected = False

def check_connected_async(func):
Expand Down Expand Up @@ -417,6 +411,15 @@ async def set(
self._process_set_response(resp)
else:
logger.info("Using v2")
if len(updates) == 0:
raise VSSClientError(
error={
"code": grpc.StatusCode.INVALID_ARGUMENT.value[0],
"reason": grpc.StatusCode.INVALID_ARGUMENT.value[1],
"message": "No datapoints requested",
},
errors=[],
)
for update in updates:
req = self._prepare_publish_value_request(
update, paths_with_required_type
Expand Down
67 changes: 43 additions & 24 deletions kuksa-client/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import pytest
import pytest_asyncio

from kuksa.val.v1 import val_pb2_grpc
from kuksa.val.v1 import val_pb2_grpc as val_v1
from kuksa.val.v2 import val_pb2_grpc as val_v2

import tests

Expand All @@ -32,41 +33,59 @@ def resources_path_fixture():
return pathlib.Path(tests.__path__[0]) / 'resources'


@pytest.fixture(name='val_servicer', scope='function')
def val_servicer_fixture(mocker):
servicer = val_pb2_grpc.VALServicer()
mocker.patch.object(servicer, 'Get', spec=True)
mocker.patch.object(servicer, 'Set', spec=True)
mocker.patch.object(servicer, 'Subscribe', spec=True)
mocker.patch.object(servicer, 'GetServerInfo', spec=True)
@pytest.fixture(name="val_servicer_v1", scope="function")
def val_servicer_v1_fixture(mocker):
servicer_v1 = val_v1.VALServicer()
mocker.patch.object(servicer_v1, "Get", spec=True)
mocker.patch.object(servicer_v1, "Set", spec=True)
mocker.patch.object(servicer_v1, "Subscribe", spec=True)
mocker.patch.object(servicer_v1, "GetServerInfo", spec=True)

return servicer
return servicer_v1


@pytest_asyncio.fixture(name='val_server', scope='function')
async def val_server_fixture(unused_tcp_port, val_servicer):
@pytest.fixture(name="val_servicer_v2", scope="function")
def val_servicer_v2_fixture(mocker):
servicer_v2 = val_v2.VALServicer()
mocker.patch.object(servicer_v2, "PublishValue", spec=True)
mocker.patch.object(servicer_v2, "Subscribe", spec=True)

return servicer_v2


@pytest_asyncio.fixture(name="val_server", scope="function")
async def val_server_fixture(unused_tcp_port, val_servicer_v1, val_servicer_v2):
server = grpc.aio.server()
val_pb2_grpc.add_VALServicer_to_server(val_servicer, server)
server.add_insecure_port(f'127.0.0.1:{unused_tcp_port}')
val_v1.add_VALServicer_to_server(val_servicer_v1, server)
val_v2.add_VALServicer_to_server(val_servicer_v2, server)
server.add_insecure_port(f"127.0.0.1:{unused_tcp_port}")
await server.start()
try:
yield server
finally:
await server.stop(grace=2.0)


@pytest_asyncio.fixture(name='secure_val_server', scope='function')
async def secure_val_server_fixture(unused_tcp_port, resources_path, val_servicer):
@pytest_asyncio.fixture(name="secure_val_server", scope="function")
async def secure_val_server_fixture(
unused_tcp_port, resources_path, val_servicer_v1, val_servicer_v2
):
server = grpc.aio.server()
val_pb2_grpc.add_VALServicer_to_server(val_servicer, server)
server.add_secure_port(f'localhost:{unused_tcp_port}', grpc.ssl_server_credentials(
private_key_certificate_chain_pairs=[(
(resources_path / 'test-server.key').read_bytes(),
(resources_path / 'test-server.pem').read_bytes(),
)],
root_certificates=(resources_path / 'test-ca.pem').read_bytes(),
require_client_auth=False,
))
val_v1.add_VALServicer_to_server(val_servicer_v1, server)
val_v2.add_VALServicer_to_server(val_servicer_v2, server)
server.add_secure_port(
f"localhost:{unused_tcp_port}",
grpc.ssl_server_credentials(
private_key_certificate_chain_pairs=[
(
(resources_path / "test-server.key").read_bytes(),
(resources_path / "test-server.pem").read_bytes(),
)
],
root_certificates=(resources_path / "test-ca.pem").read_bytes(),
require_client_auth=False,
),
)
await server.start()
try:
yield server
Expand Down
Loading

0 comments on commit cc19d2c

Please sign in to comment.