Skip to content
This repository has been archived by the owner on Dec 18, 2024. It is now read-only.

Commit

Permalink
Use entries iterator and handles permissions correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Sep 22, 2023
1 parent 9f67867 commit afbe8db
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 154 deletions.
10 changes: 6 additions & 4 deletions kuksa-client/kuksa_client/cli_backend/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,24 @@
from typing import Optional
import uuid
import os
import re

from kuksa_client import cli_backend
import kuksa_client.grpc
import kuksa_client.grpc.aio
from kuksa_client.grpc import EntryUpdate
from kuksa.val.v1 import types_pb2


def callback_wrapper(callback: Callable[[str], None]) -> Callable[[Iterable[EntryUpdate]], None]:
def wrapper(updates: Iterable[EntryUpdate]) -> None:
callback(json.dumps([update.to_dict() for update in updates]))
return wrapper


class DatabrokerEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (types_pb2.StringArray, types_pb2.BoolArray, types_pb2.Uint32Array, types_pb2.Uint64Array, types_pb2.FloatArray, types_pb2.Int32Array, types_pb2.Int64Array, types_pb2.DoubleArray)):
if isinstance(obj, (types_pb2.StringArray, types_pb2.BoolArray, types_pb2.Uint32Array, types_pb2.Uint64Array,
types_pb2.FloatArray, types_pb2.Int32Array, types_pb2.Int64Array, types_pb2.DoubleArray)):
string_values = []
for value in obj.values:
value = str(value)
Expand Down Expand Up @@ -145,7 +147,7 @@ def setValues(self, updates: Dict[str, Any], attribute="value", timeout=5):
return json.dumps({"error": "Invalid Attribute"})

# Function for authorization
def authorize(self, token_or_tokenfile:Optional[str] =None, timeout=5):
def authorize(self, token_or_tokenfile: Optional[str] = None, timeout=5):
if token_or_tokenfile is None:
token_or_tokenfile = self.token_or_tokenfile
if os.path.isfile(token_or_tokenfile):
Expand Down Expand Up @@ -251,7 +253,7 @@ async def _grpcHandler(self, vss_client: kuksa_client.grpc.aio.VSSClient):
responseQueue.put((resp, None))
except kuksa_client.grpc.VSSClientError as exc:
responseQueue.put((None, exc.to_dict()))
except ValueError as exc:
except ValueError:
responseQueue.put(
(None, {"error": "ValueError in casting the value."}))

Expand Down
46 changes: 21 additions & 25 deletions kuksa-client/kuksa_client/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ def from_message(cls, message: types_pb2.Datapoint):
) if message.HasField('timestamp') else None,
)


def cast_array_values(cast, array):
"""
Parses array input and cast individual values to wanted type.
Expand All @@ -339,7 +338,7 @@ def cast_array_values(cast, array):
# My Way
# ... without quotes
if item.strip() == '':
#skip
# skip
pass
else:
yield cast(item)
Expand All @@ -365,7 +364,7 @@ def cast_str(value) -> str:
new_val = new_val.replace('\\\"', '\"')
new_val = new_val.replace("\\\'", "\'")
return new_val

def to_message(self, value_type: DataType) -> types_pb2.Datapoint:
message = types_pb2.Datapoint()

Expand All @@ -374,7 +373,6 @@ def set_array_attr(obj, attr, values):
array.Clear()
array.values.extend(values)


field, set_field, cast_field = {
DataType.INT8: ('int32', setattr, int),
DataType.INT16: ('int32', setattr, int),
Expand All @@ -388,29 +386,29 @@ def set_array_attr(obj, attr, values):
DataType.DOUBLE: ('double', setattr, float),
DataType.BOOLEAN: ('bool', setattr, Datapoint.cast_bool),
DataType.STRING: ('string', setattr, Datapoint.cast_str),
DataType.INT8_ARRAY: ('int32_array', set_array_attr,
DataType.INT8_ARRAY: ('int32_array', set_array_attr,
lambda array: Datapoint.cast_array_values(int, array)),
DataType.INT16_ARRAY: ('int32_array', set_array_attr,
DataType.INT16_ARRAY: ('int32_array', set_array_attr,
lambda array: Datapoint.cast_array_values(int, array)),
DataType.INT32_ARRAY: ('int32_array', set_array_attr,
DataType.INT32_ARRAY: ('int32_array', set_array_attr,
lambda array: Datapoint.cast_array_values(int, array)),
DataType.UINT8_ARRAY: ('uint32_array', set_array_attr,
DataType.UINT8_ARRAY: ('uint32_array', set_array_attr,
lambda array: Datapoint.cast_array_values(int, array)),
DataType.UINT16_ARRAY: ('uint32_array', set_array_attr,
DataType.UINT16_ARRAY: ('uint32_array', set_array_attr,
lambda array: Datapoint.cast_array_values(int, array)),
DataType.UINT32_ARRAY: ('uint32_array', set_array_attr,
DataType.UINT32_ARRAY: ('uint32_array', set_array_attr,
lambda array: Datapoint.cast_array_values(int, array)),
DataType.UINT64_ARRAY: ('uint64_array', set_array_attr,
DataType.UINT64_ARRAY: ('uint64_array', set_array_attr,
lambda array: Datapoint.cast_array_values(int, array)),
DataType.INT64_ARRAY: ('int64_array', set_array_attr,
DataType.INT64_ARRAY: ('int64_array', set_array_attr,
lambda array: Datapoint.cast_array_values(int, array)),
DataType.FLOAT_ARRAY: ('float_array', set_array_attr,
DataType.FLOAT_ARRAY: ('float_array', set_array_attr,
lambda array: Datapoint.cast_array_values(float, array)),
DataType.DOUBLE_ARRAY: ('double_array', set_array_attr,
DataType.DOUBLE_ARRAY: ('double_array', set_array_attr,
lambda array: Datapoint.cast_array_values(float, array)),
DataType.BOOLEAN_ARRAY: ('bool_array', set_array_attr,
DataType.BOOLEAN_ARRAY: ('bool_array', set_array_attr,
lambda array: Datapoint.cast_array_values(Datapoint.cast_bool, array)),
DataType.STRING_ARRAY: ('string_array', set_array_attr,
DataType.STRING_ARRAY: ('string_array', set_array_attr,
lambda array: Datapoint.cast_array_values(Datapoint.cast_str, array)),
}.get(value_type, (None, None, None))
if self.value is not None:
Expand Down Expand Up @@ -523,6 +521,7 @@ class ServerInfo:
def from_message(cls, message: val_pb2.GetServerInfoResponse):
return cls(name=message.name, version=message.version)


class BaseVSSClient:
def __init__(
self,
Expand All @@ -536,7 +535,6 @@ def __init__(
connected: bool = False,
tls_server_name: Optional[str] = None
):


self.authorization_header = self.get_authorization_header(token)
self.target_host = f'{host}:{port}'
Expand All @@ -559,11 +557,10 @@ def _load_creds(self) -> Optional[grpc.ChannelCredentials]:
logger.info("Using client private key and certificates, mutual TLS supported if supported by server")
return grpc.ssl_channel_credentials(root_certificates, private_key, certificate_chain)
else:
logger.info(f"No client certificates provided, mutual TLS not supported!")
logger.info("No client certificates provided, mutual TLS not supported!")
return grpc.ssl_channel_credentials(root_certificates)
logger.info(f"No Root CA present, it will not be posible to use a secure connection!")
logger.info("No Root CA present, it will not be posible to use a secure connection!")
return None


def _prepare_get_request(self, entries: Iterable[EntryRequest]) -> val_pb2.GetRequest:
req = val_pb2.GetRequest(entries=[])
Expand Down Expand Up @@ -649,7 +646,7 @@ def get_authorization_header(self, token: str):
return "Bearer " + token

def generate_metadata_header(self, metadata: list, header=None) -> list:
if header == None:
if header is None:
header = self.authorization_header
if metadata:
metadata = dict(metadata)
Expand Down Expand Up @@ -686,20 +683,20 @@ def connect(self, target_host=None):
creds = self._load_creds()
if target_host is None:
target_host = self.target_host

if creds is not None:
logger.info("Establishing secure channel")
if self.tls_server_name:
logger.info(f"Using TLS server name {self.tls_server_name}")
options = [('grpc.ssl_target_name_override', self.tls_server_name)]
channel = grpc.secure_channel(target_host, creds, options)
else:
logger.debug(f"Not providing explicit TLS server name")
logger.debug("Not providing explicit TLS server name")
channel = grpc.secure_channel(target_host, creds)
else:
logger.info("Establishing insecure channel")
channel = grpc.insecure_channel(target_host)

self.channel = self.exit_stack.enter_context(channel)
self.client_stub = val_pb2_grpc.VALStub(self.channel)
self.connected = True
Expand Down Expand Up @@ -973,7 +970,6 @@ def get_server_info(self, **rpc_kwargs) -> Optional[ServerInfo]:
else:
raise VSSClientError.from_grpc_error(exc) from exc
return None


def get_value_types(self, paths: Collection[str], **rpc_kwargs) -> Dict[str, DataType]:
"""
Expand Down
4 changes: 2 additions & 2 deletions kuksa-client/kuksa_client/grpc/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def connect(self, target_host=None):
options = [('grpc.ssl_target_name_override', self.tls_server_name)]
channel = grpc.aio.secure_channel(target_host, creds, options)
else:
logger.debug(f"Not providing explicit TLS server name")
logger.debug("Not providing explicit TLS server name")
channel = grpc.aio.secure_channel(target_host, creds)
else:
logger.info("Establishing insecure channel")
Expand Down Expand Up @@ -363,7 +363,7 @@ async def get_server_info(self, **rpc_kwargs) -> Optional[ServerInfo]:
except AioRpcError as exc:
if exc.code() == grpc.StatusCode.UNAUTHENTICATED:
logger.info("Unauthenticated channel started")
else:
else:
raise VSSClientError.from_grpc_error(exc) from exc
return None

Expand Down
42 changes: 35 additions & 7 deletions kuksa-client/tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,9 @@ async def test_set_current_values(self, mocker, unused_tcp_port):
client = VSSClient('127.0.0.1', unused_tcp_port)
mocker.patch.object(client, 'set')
await client.set_current_values({
'Vehicle.Speed': Datapoint(42.0, datetime.datetime(2022, 11, 7, 16, 18, 35, 247307, tzinfo=datetime.timezone.utc)),
'Vehicle.Speed': Datapoint(42.0,
datetime.datetime(2022, 11, 7, 16, 18, 35, 247307,
tzinfo=datetime.timezone.utc)),
'Vehicle.ADAS.ABS.IsActive': Datapoint(True),
'Vehicle.Chassis.Height': Datapoint(666),
})
Expand Down Expand Up @@ -518,7 +520,9 @@ async def subscribe_response_stream(**kwargs):
View.CURRENT_VALUE, (Field.VALUE,)),
]
assert received_updates == {
'Vehicle.Speed': Datapoint(42.0, datetime.datetime(2022, 11, 7, 16, 18, 35, 247307, tzinfo=datetime.timezone.utc)),
'Vehicle.Speed': Datapoint(42.0,
datetime.datetime(2022, 11, 7, 16, 18, 35, 247307,
tzinfo=datetime.timezone.utc)),
'Vehicle.ADAS.ABS.IsActive': Datapoint(True),
'Vehicle.Chassis.Height': Datapoint(666),
}
Expand Down Expand Up @@ -652,7 +656,8 @@ async def test_get_some_entries(self, unused_tcp_port, val_servicer):
),
])
async with VSSClient('127.0.0.1', unused_tcp_port, ensure_startup_connection=False) as client:
entries = await client.get(entries=(entry for entry in ( # generator is intentional as get accepts Iterable
entries = await client.get(entries=(entry for entry in (
# generator is intentional as get accepts Iterable
EntryRequest('Vehicle.Speed',
View.CURRENT_VALUE, (Field.VALUE,)),
EntryRequest('Vehicle.ADAS.ABS.IsActive',
Expand Down Expand Up @@ -795,7 +800,6 @@ async def test_get_unset_entries(self, unused_tcp_port, val_servicer):
EntryRequest('Vehicle.ADAS.ABS.IsActive',
View.TARGET_VALUE, (Field.ACTUATOR_TARGET,)),
))

assert entries == [DataEntry('Vehicle.Speed'), DataEntry(
'Vehicle.ADAS.ABS.IsActive')]

Expand Down Expand Up @@ -991,8 +995,32 @@ async def test_authorize_successful(self, unused_tcp_port, val_servicer):
name='test_server', version='1.2.3')
async with VSSClient('127.0.0.1', unused_tcp_port, ensure_startup_connection=False) as client:
# token from kuksa.val directory under jwt/provide-vehicle-speed.token
success = await client.authorize(token='eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJsb2NhbCBkZXYiLCJpc3MiOiJjcmVhdGVUb2tlbi5weSIsImF1ZCI6WyJrdWtzYS52YWwiXSwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE3NjcyMjU1OTksInNjb3BlIjoicmVhZDpWZWhpY2xlLldpZHRoIHByb3ZpZGU6VmVoaWNsZS5TcGVlZCJ9.w2c8xrYwBVgMav3f0Se6E8H8E36Nd03rJiSS2A8s-CL3GtlwB7wVanjXHhppNsCdWym3tK4JwgslQdMQF-UL4hd7vzdtt-Mx6VjH_jO9mDxz4Z0Uzw7aJtbtQSpi2h6kwceTVTllkbLRF7WRHWIpwzXFF9yZolX6lH-BE9xf1AB62d6icd9SKxFnVvYs3MVK5D1xNmDNOmm-Fr0d2K604MmIIXGW5kPZJYIvBKO4NYRLklhJe47It_lGo3gnh1ppmzTOIo1kB4sDe55hplUCbTCJVricpyQSgTYsf7aFRPK51XMRwwwJ8kShWeaTggMLKpv1W-9dhVWDk4isC8BxsOjaVloArausMmjLmTz6KwAsfARgfXtaCrMsESUBNXi5KIdAyHVXZpmERvc9yeYPcaWlknVFrFsHbV6bw4nwqBX-0Ubuga0NGNQDFKmyTKQrbuZmQ3L9iipxY8_BOSCkdiYtWbE3lpplxpS_PaZl10KAaMmUfbcF9aYZunDEzEtoJgJe2EeGu3XDBtbyXVUKruImdSEdjaImfUGQIWl5bMbVH4N4zK5jE45wT5FJiRUcA5pMN5wNmDYJJzgbxWNpYW40KZYPFc_7XUH8EZ2Cs69wDHam3ArkOs1qMgMIoEPWVzHakjlVJfrPR9zQKxfirBtNNENIoHsBjJ_P4FEJCN4')
assert client.authorization_header == 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJsb2NhbCBkZXYiLCJpc3MiOiJjcmVhdGVUb2tlbi5weSIsImF1ZCI6WyJrdWtzYS52YWwiXSwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE3NjcyMjU1OTksInNjb3BlIjoicmVhZDpWZWhpY2xlLldpZHRoIHByb3ZpZGU6VmVoaWNsZS5TcGVlZCJ9.w2c8xrYwBVgMav3f0Se6E8H8E36Nd03rJiSS2A8s-CL3GtlwB7wVanjXHhppNsCdWym3tK4JwgslQdMQF-UL4hd7vzdtt-Mx6VjH_jO9mDxz4Z0Uzw7aJtbtQSpi2h6kwceTVTllkbLRF7WRHWIpwzXFF9yZolX6lH-BE9xf1AB62d6icd9SKxFnVvYs3MVK5D1xNmDNOmm-Fr0d2K604MmIIXGW5kPZJYIvBKO4NYRLklhJe47It_lGo3gnh1ppmzTOIo1kB4sDe55hplUCbTCJVricpyQSgTYsf7aFRPK51XMRwwwJ8kShWeaTggMLKpv1W-9dhVWDk4isC8BxsOjaVloArausMmjLmTz6KwAsfARgfXtaCrMsESUBNXi5KIdAyHVXZpmERvc9yeYPcaWlknVFrFsHbV6bw4nwqBX-0Ubuga0NGNQDFKmyTKQrbuZmQ3L9iipxY8_BOSCkdiYtWbE3lpplxpS_PaZl10KAaMmUfbcF9aYZunDEzEtoJgJe2EeGu3XDBtbyXVUKruImdSEdjaImfUGQIWl5bMbVH4N4zK5jE45wT5FJiRUcA5pMN5wNmDYJJzgbxWNpYW40KZYPFc_7XUH8EZ2Cs69wDHam3ArkOs1qMgMIoEPWVzHakjlVJfrPR9zQKxfirBtNNENIoHsBjJ_P4FEJCN4'
token = ('eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJsb2NhbCBkZXYiLCJpc3MiOiJjcmVhdGVUb2'
'tlbi5weSIsImF1ZCI6WyJrdWtzYS52YWwiXSwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE3NjcyMjU1OTksIn'
'Njb3BlIjoicmVhZDpWZWhpY2xlLldpZHRoIHByb3ZpZGU6VmVoaWNsZS5TcGVlZCJ9.w2c8xrYwBVgMav3f0S'
'e6E8H8E36Nd03rJiSS2A8s-CL3GtlwB7wVanjXHhppNsCdWym3tK4JwgslQdMQF-UL4hd7vzdtt-Mx6VjH_jO9'
'mDxz4Z0Uzw7aJtbtQSpi2h6kwceTVTllkbLRF7WRHWIpwzXFF9yZolX6lH-BE9xf1AB62d6icd9SKxFnVvYs3M'
'VK5D1xNmDNOmm-Fr0d2K604MmIIXGW5kPZJYIvBKO4NYRLklhJe47It_lGo3gnh1ppmzTOIo1kB4sDe55hplUCb'
'TCJVricpyQSgTYsf7aFRPK51XMRwwwJ8kShWeaTggMLKpv1W-9dhVWDk4isC8BxsOjaVloArausMmjLmTz6KwAsf'
'ARgfXtaCrMsESUBNXi5KIdAyHVXZpmERvc9yeYPcaWlknVFrFsHbV6bw4nwqBX-0Ubuga0NGNQDFKmyTKQrbuZmQ'
'3L9iipxY8_BOSCkdiYtWbE3lpplxpS_PaZl10KAaMmUfbcF9aYZunDEzEtoJgJe2EeGu3XDBtbyXVUKruImdSEdja'
'ImfUGQIWl5bMbVH4N4zK5jE45wT5FJiRUcA5pMN5wNmDYJJzgbxWNpYW40KZYPFc_7XUH8EZ2Cs69wDHam3ArkOs1'
'qMgMIoEPWVzHakjlVJfrPR9zQKxfirBtNNENIoHsBjJ_P4FEJCN4'
)
bearer = ('Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJsb2NhbCBkZXYiLCJpc3MiOiJjcmVhdG'
'VUb2tlbi5weSIsImF1ZCI6WyJrdWtzYS52YWwiXSwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE3NjcyMjU1OTks'
'InNjb3BlIjoicmVhZDpWZWhpY2xlLldpZHRoIHByb3ZpZGU6VmVoaWNsZS5TcGVlZCJ9.w2c8xrYwBVgMav3f0'
'Se6E8H8E36Nd03rJiSS2A8s-CL3GtlwB7wVanjXHhppNsCdWym3tK4JwgslQdMQF-UL4hd7vzdtt-Mx6VjH_jO'
'9mDxz4Z0Uzw7aJtbtQSpi2h6kwceTVTllkbLRF7WRHWIpwzXFF9yZolX6lH-BE9xf1AB62d6icd9SKxFnVvYs3M'
'VK5D1xNmDNOmm-Fr0d2K604MmIIXGW5kPZJYIvBKO4NYRLklhJe47It_lGo3gnh1ppmzTOIo1kB4sDe55hplUCbT'
'CJVricpyQSgTYsf7aFRPK51XMRwwwJ8kShWeaTggMLKpv1W-9dhVWDk4isC8BxsOjaVloArausMmjLmTz6KwAsf'
'ARgfXtaCrMsESUBNXi5KIdAyHVXZpmERvc9yeYPcaWlknVFrFsHbV6bw4nwqBX-0Ubuga0NGNQDFKmyTKQrbuZmQ'
'3L9iipxY8_BOSCkdiYtWbE3lpplxpS_PaZl10KAaMmUfbcF9aYZunDEzEtoJgJe2EeGu3XDBtbyXVUKruImdSEdj'
'aImfUGQIWl5bMbVH4N4zK5jE45wT5FJiRUcA5pMN5wNmDYJJzgbxWNpYW40KZYPFc_7XUH8EZ2Cs69wDHam3ArkO'
's1qMgMIoEPWVzHakjlVJfrPR9zQKxfirBtNNENIoHsBjJ_P4FEJCN4'
)
success = await client.authorize(token)
assert client.authorization_header == bearer
assert success == "Authenticated"

@pytest.mark.usefixtures('val_server')
Expand All @@ -1002,7 +1030,7 @@ async def test_authorize_unsuccessful(self, unused_tcp_port, val_servicer):
async with VSSClient('127.0.0.1', unused_tcp_port, ensure_startup_connection=False) as client:
with pytest.raises(VSSClientError):
await client.authorize(token='')
assert client.authorization_header == None
assert client.authorization_header is None

@pytest.mark.usefixtures('val_server')
async def test_subscribe_some_entries(self, mocker, unused_tcp_port, val_servicer):
Expand Down
23 changes: 0 additions & 23 deletions kuksa_databroker/databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,20 +930,6 @@ impl<'a, 'b> DatabaseReadAccess<'a, 'b> {
}
}

pub fn get_entries_by_regex(&self, regex: regex::Regex) -> Result<Vec<Entry>, ReadError> {
let mut entries: Vec<Entry> = Vec::new();
for (_, value) in self.db.entries.iter() {
if regex.is_match(&value.metadata.path) {
entries.push(value.clone());
}
}
if entries.is_empty() {
return Err(ReadError::NotFound);
}

Ok(entries)
}

pub fn get_metadata_by_id(&self, id: i32) -> Option<&Metadata> {
self.db.entries.get(&id).map(|entry| &entry.metadata)
}
Expand Down Expand Up @@ -1231,15 +1217,6 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.cloned()
}

pub async fn get_entries_by_regex(&self, regex: regex::Regex) -> Result<Vec<Entry>, ReadError> {
self.broker
.database
.read()
.await
.authorized_read_access(self.permissions)
.get_entries_by_regex(regex)
}

pub async fn get_entry_by_id(&self, id: i32) -> Result<Entry, ReadError> {
self.broker
.database
Expand Down
Loading

0 comments on commit afbe8db

Please sign in to comment.