From 37f4ccab0c8f3d7f4b2ce5532083c68eb5549156 Mon Sep 17 00:00:00 2001 From: lukasmittag Date: Mon, 3 Apr 2023 09:37:19 +0200 Subject: [PATCH 1/4] Add connect/disconnect to allow use of kuksa-client as non context-manager --- kuksa-client/kuksa_client/__init__.py | 11 +++- kuksa-client/kuksa_client/__main__.py | 21 ++++-- kuksa-client/kuksa_client/cli_backend/grpc.py | 21 +++++- kuksa-client/kuksa_client/cli_backend/ws.py | 37 +++++++---- kuksa-client/kuksa_client/grpc/__init__.py | 66 ++++++++++++------- kuksa-client/kuksa_client/grpc/aio.py | 64 +++++++++++------- 6 files changed, 152 insertions(+), 68 deletions(-) diff --git a/kuksa-client/kuksa_client/__init__.py b/kuksa-client/kuksa_client/__init__.py index 23b94d6e4..2053d6bb6 100644 --- a/kuksa-client/kuksa_client/__init__.py +++ b/kuksa-client/kuksa_client/__init__.py @@ -35,6 +35,7 @@ from . import cli_backend + class KuksaClientThread(threading.Thread): # Constructor @@ -86,10 +87,10 @@ def getValues(self, paths: Iterable[str], attribute="value", timeout=5): # The given callback function will be called then, if the given path is updated: # updateMessage = await webSocket.recv() # callback(updateMessage) - def subscribe(self, path, callback, attribute = "value", timeout=5): + def subscribe(self, path, callback, attribute="value", timeout=5): return self.backend.subscribe(path, callback, attribute, timeout) - def subscribeMultiple(self, paths, callback, attribute = "value", timeout=5): + def subscribeMultiple(self, paths, callback, attribute="value", timeout=5): return self.backend.subscribeMultiple(paths, callback, attribute, timeout) # Unsubscribe value changes of to a given path. @@ -97,6 +98,12 @@ def subscribeMultiple(self, paths, callback, attribute = "value", timeout=5): def unsubscribe(self, sub_id, timeout=5): return self.backend.unsubscribe(sub_id, timeout) + def disconnect(self, timeout=5): + return self.backend.disconnect(timeout) + + def connect(self, timeout=5): + return self.backend.connect(timeout) + # Thread function: Start the asyncio loop def run(self): self.loop = asyncio.new_event_loop() diff --git a/kuksa-client/kuksa_client/__main__.py b/kuksa-client/kuksa_client/__main__.py index d3eae0f67..989be7dc2 100755 --- a/kuksa-client/kuksa_client/__main__.py +++ b/kuksa-client/kuksa_client/__main__.py @@ -138,8 +138,10 @@ def subscriptionIdCompleter(self, text, line, begidx, endidx): ap_getServerAddr = argparse.ArgumentParser() ap_connect = argparse.ArgumentParser() - ap_connect.add_argument('-i', "--insecure", default=False, action="store_true", help='Connect in insecure mode') ap_disconnect = argparse.ArgumentParser() + ap_connectThread = argparse.ArgumentParser() + ap_connectThread.add_argument('-i', "--insecure", default=False, action="store_true", help='Connect in insecure mode') + ap_disconnectThread = argparse.ArgumentParser() ap_authorize = argparse.ArgumentParser() tokenfile_completer_method = functools.partial(Cmd.path_complete, path_filter=lambda path: (os.path.isdir(path) or path.endswith(".token"))) @@ -414,13 +416,19 @@ def do_getMetaData(self, args): @with_category(COMM_SETUP_COMMANDS) - @with_argparser(ap_disconnect) - def do_disconnect(self, _args): + @with_argparser(ap_disconnectThread) + def do_disconnectThread(self, _args): """Disconnect from the VISS/gRPC Server""" if hasattr(self, "commThread"): if self.commThread is not None: self.commThread.stop() self.commThread = None + + @with_category(COMM_SETUP_COMMANDS) + @with_argparser(ap_disconnect) + def do_disconnect(self, _args): + """Disconnect from the VISS/gRPC Server""" + self.commThread.disconnect() def checkConnection(self): if self.commThread is None or not self.commThread.checkConnection(): @@ -453,10 +461,15 @@ def connect(self, insecure=False): self.commThread.stop() self.commThread = None + @with_category(COMM_SETUP_COMMANDS) + @with_argparser(ap_connectThread) + def do_connectThread(self, args): + self.connect(args.insecure) + @with_category(COMM_SETUP_COMMANDS) @with_argparser(ap_connect) def do_connect(self, args): - self.connect(args.insecure) + self.commThread.connect() @with_category(COMM_SETUP_COMMANDS) @with_argparser(ap_setServerAddr) diff --git a/kuksa-client/kuksa_client/cli_backend/grpc.py b/kuksa-client/kuksa_client/cli_backend/grpc.py index e2edbafce..f294e8dab 100644 --- a/kuksa-client/kuksa_client/cli_backend/grpc.py +++ b/kuksa-client/kuksa_client/cli_backend/grpc.py @@ -126,7 +126,11 @@ def authorize(self, tokenfile=None, timeout=5): tokenfile = self.tokenfile tokenfile = pathlib.Path(tokenfile) requestArgs = { +<<<<<<< HEAD 'token': tokenfile.expanduser().read_text(encoding='utf-8').rstrip('\n')} +======= + 'token': tokenfile.expanduser().read_text(encoding='utf-8')} +>>>>>>> 8de9460 (Add connect/disconnect to allow use of kuksa-client as non context-manager) return self._sendReceiveMsg(("authorize", requestArgs), timeout) @@ -158,6 +162,14 @@ def unsubscribe(self, sub_id, timeout=5): requestArgs = {'subscription_id': sub_id} return self._sendReceiveMsg(("unsubscribe", requestArgs), timeout) + def connect(self, timeout=5): + requestArgs = {} + return self._sendReceiveMsg(("connect", requestArgs), timeout) + + def disconnect(self, timeout=5): + requestArgs = {} + return self._sendReceiveMsg(("disconnect", requestArgs), timeout) + def _sendReceiveMsg(self, req, timeout): (call, requestArgs) = req recvQueue = queue.Queue(maxsize=1) @@ -189,8 +201,9 @@ async def _grpcHandler(self, vss_client): try: if call == "get": resp = await vss_client.get(**requestArgs) - resp = [entry.to_dict() for entry in resp] - resp = resp[0] if len(resp) == 1 else resp + if resp != None: + resp = [entry.to_dict() for entry in resp] + resp = resp[0] if len(resp) == 1 else resp elif call == "set": resp = await vss_client.set(**requestArgs) elif call == "authorize": @@ -203,6 +216,10 @@ async def _grpcHandler(self, vss_client): resp = {"subscriptionId": str(resp)} elif call == "unsubscribe": resp = await subscriber_manager.remove_subscriber(**requestArgs) + elif call == "connect": + resp = await vss_client.connect() + elif call == "disconnect": + resp = await vss_client.disconnect() else: raise Exception("Not Implemented.") diff --git a/kuksa-client/kuksa_client/cli_backend/ws.py b/kuksa-client/kuksa_client/cli_backend/ws.py index 9fd7a8942..ee99762d1 100644 --- a/kuksa-client/kuksa_client/cli_backend/ws.py +++ b/kuksa-client/kuksa_client/cli_backend/ws.py @@ -31,6 +31,7 @@ from kuksa_client import cli_backend + class Backend(cli_backend.Backend): def __init__(self, config): super().__init__(config) @@ -53,7 +54,8 @@ async def _receiver_handler(self, webSocket): else: if "subscriptionId" in resJson and resJson["subscriptionId"] in self.subscriptionCallbacks: try: - self.subscriptionCallbacks[resJson["subscriptionId"]](message) + self.subscriptionCallbacks[resJson["subscriptionId"]]( + message) except Exception as e: # pylint: disable=broad-except print(e) @@ -99,11 +101,11 @@ def _sendReceiveMsg(self, req, timeout): # Wait on the receive queue try: res = recvQueue.get(timeout=timeout) - resJson = json.loads(res) + resJson = json.loads(res) if "requestId" in res and str(req["requestId"]) == str(resJson["requestId"]): return json.dumps(resJson, indent=2) except queue.Empty: - req["error"] = "timeout" + req["error"] = "timeout" return json.dumps(req, indent=2) # Function to stop the communication @@ -112,6 +114,9 @@ def stop(self): self.run = False print("Server disconnected.") + def disconnect(self): + self.stop() + # Function to authorize against the kuksa.val server def authorize(self, tokenfile=None, timeout=2): if tokenfile is None: @@ -120,14 +125,14 @@ def authorize(self, tokenfile=None, timeout=2): token = tokenfile.expanduser().read_text(encoding='utf-8') req = {} - req["action"]= "authorize" + req["action"] = "authorize" req["tokens"] = token return self._sendReceiveMsg(req, timeout) # Update VSS Tree Entry def updateVSSTree(self, jsonStr, timeout=5): req = {} - req["action"]= "updateVSSTree" + req["action"] = "updateVSSTree" if os.path.isfile(jsonStr): with open(jsonStr, "r", encoding="utf-8") as f: req["metadata"] = json.load(f) @@ -138,7 +143,7 @@ def updateVSSTree(self, jsonStr, timeout=5): # Update Meta Data of a given path def updateMetaData(self, path, jsonStr, timeout=5): req = {} - req["action"]= "updateMetaData" + req["action"] = "updateMetaData" req["path"] = path req["metadata"] = json.loads(jsonStr) return self._sendReceiveMsg(req, timeout) @@ -147,7 +152,7 @@ def updateMetaData(self, path, jsonStr, timeout=5): def getMetaData(self, path, timeout=5): """Get MetaData of the parameter""" req = {} - req["action"]= "getMetaData" + req["action"] = "getMetaData" req["path"] = path return self._sendReceiveMsg(req, timeout) @@ -156,7 +161,7 @@ def setValue(self, path, value, attribute="value", timeout=5): if 'nan' == value: return json.dumps({"error": path + " has an invalid value " + str(value)}, indent=2) req = {} - req["action"]= "set" + req["action"] = "set" req["path"] = path req["attribute"] = attribute try: @@ -186,11 +191,11 @@ def getValue(self, path, attribute="value", timeout=5): # callback(updateMessage) def subscribe(self, path, callback, attribute="value", timeout=5): req = {} - req["action"]= "subscribe" + req["action"] = "subscribe" req["path"] = path req["attribute"] = attribute res = self._sendReceiveMsg(req, timeout) - resJson = json.loads(res) + resJson = json.loads(res) if "subscriptionId" in resJson: self.subscriptionCallbacks[resJson["subscriptionId"]] = callback return res @@ -199,7 +204,7 @@ def subscribe(self, path, callback, attribute="value", timeout=5): # The subscription id from the response of the corresponding subscription request will be required def unsubscribe(self, subId, timeout=5): req = {} - req["action"]= "unsubscribe" + req["action"] = "unsubscribe" req["subscriptionId"] = subId res = {} @@ -222,11 +227,11 @@ def unsubscribe(self, subId, timeout=5): def checkConnection(self): return self.wsConnected - # Main loop for handling websocket communication - async def mainLoop(self): + async def connect(self): if not self.insecure: context = ssl.create_default_context() - context.load_cert_chain(certfile=self.certificate, keyfile=self.keyfile) + context.load_cert_chain( + certfile=self.certificate, keyfile=self.keyfile) context.load_verify_locations(cafile=self.cacertificate) # Certificates in ../kuksa_certificates does not contain the IP address used for # connection to server so hostname check must be disabled @@ -246,3 +251,7 @@ async def mainLoop(self): await self._msgHandler(ws) except OSError as e: print("Disconnected!! " + str(e)) + + # Main loop for handling websocket communication + async def mainLoop(self): + await self.connect() diff --git a/kuksa-client/kuksa_client/grpc/__init__.py b/kuksa-client/kuksa_client/grpc/__init__.py index 03908631f..9df8cf9dc 100644 --- a/kuksa-client/kuksa_client/grpc/__init__.py +++ b/kuksa-client/kuksa_client/grpc/__init__.py @@ -599,28 +599,39 @@ def __init__(self, *args, **kwargs): self.exit_stack = contextlib.ExitStack() def __enter__(self): + self.connect() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.disconnect() + + def check_connected(func): + def wrapper(self, *args, **kwargs): + if self.ensure_startup_connection: + return func(self, *args, **kwargs) + else: + logger.info( + "Disconnected from server! Try connect.") + return wrapper + + def connect(self, target_host=None): creds = self._load_creds() + if target_host is None: + target_host = self.target_host if creds is not None: - channel = grpc.secure_channel(self.target_host, creds) + channel = grpc.secure_channel(target_host, creds) else: - channel = grpc.insecure_channel(self.target_host) + channel = grpc.insecure_channel(target_host) self.channel = self.exit_stack.enter_context(channel) self.client_stub = val_pb2_grpc.VALStub(self.channel) - if self.authorization_header is None: - logger.debug( - "Can not ensure startup connection without token to authorize") - if self.ensure_startup_connection: - try: - info = self.get_server_info() - logger.debug("Connected to server: %s", info) - except: - logger.debug("Connection could not be ensured") - return self + self.ensure_startup_connection = True + logger.debug("Connected to server: %s", self.get_server_info()) - def __exit__(self, exc_type, exc_value, traceback): + def disconnect(self): self.exit_stack.close() self.client_stub = None self.channel = None + self.ensure_startup_connection = False def get_current_values(self, paths: Iterable[str], **rpc_kwargs) -> Dict[str, Datapoint]: """ @@ -782,6 +793,7 @@ def subscribe_metadata( ): yield {update.entry.path: update.entry.metadata for update in updates} + @check_connected def get(self, *, entries: Iterable[EntryRequest], **rpc_kwargs) -> List[DataEntry]: """ Parameters: @@ -797,6 +809,7 @@ def get(self, *, entries: Iterable[EntryRequest], **rpc_kwargs) -> List[DataEntr raise VSSClientError.from_grpc_error(exc) from exc return self._process_get_response(resp) + @check_connected def set(self, *, updates: Collection[EntryUpdate], **rpc_kwargs) -> None: """ Parameters: @@ -818,23 +831,29 @@ def set(self, *, updates: Collection[EntryUpdate], **rpc_kwargs) -> None: raise VSSClientError.from_grpc_error(exc) from exc self._process_set_response(resp) + # needs to be handled differently def subscribe(self, *, entries: Iterable[SubscribeEntry], **rpc_kwargs) -> Iterator[List[EntryUpdate]]: """ Parameters: rpc_kwargs grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials. """ - rpc_kwargs["metadata"] = self.generate_metadata_header( - rpc_kwargs.get("metadata")) - req = self._prepare_subscribe_request(entries) - resp_stream = self.client_stub.Subscribe(req, **rpc_kwargs) - try: - for resp in resp_stream: - logger.debug("%s: %s", type(resp).__name__, resp) - yield [EntryUpdate.from_message(update) for update in resp.updates] - except RpcError as exc: - raise VSSClientError.from_grpc_error(exc) from exc + if self.ensure_startup_connection: + rpc_kwargs["metadata"] = self.generate_metadata_header( + rpc_kwargs.get("metadata")) + req = self._prepare_subscribe_request(entries) + resp_stream = self.client_stub.Subscribe(req, **rpc_kwargs) + try: + for resp in resp_stream: + logger.debug("%s: %s", type(resp).__name__, resp) + yield [EntryUpdate.from_message(update) for update in resp.updates] + except RpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc + else: + logger.info("Disconnected from server! Try connect.") + + @check_connected def authorize(self, *, token: str, **rpc_kwargs) -> str: rpc_kwargs["metadata"] = self.generate_metadata_header( metadata=rpc_kwargs.get("metadata"), header=self.get_authorization_header(token)) @@ -847,6 +866,7 @@ def authorize(self, *, token: str, **rpc_kwargs) -> str: self.authorization_header = self.get_authorization_header(token) return "Authenticated" + @check_connected def get_server_info(self, **rpc_kwargs) -> ServerInfo: """ Parameters: diff --git a/kuksa-client/kuksa_client/grpc/aio.py b/kuksa-client/kuksa_client/grpc/aio.py index f311dc288..7b6f86aa9 100644 --- a/kuksa-client/kuksa_client/grpc/aio.py +++ b/kuksa-client/kuksa_client/grpc/aio.py @@ -57,28 +57,39 @@ def __init__(self, *args, **kwargs): self.exit_stack = contextlib.AsyncExitStack() async def __aenter__(self): + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self.disconnect() + + async def connect(self, target_host=None): creds = self._load_creds() + if target_host is None: + target_host = self.target_host if creds is not None: - channel = grpc.aio.secure_channel(self.target_host, creds) + channel = grpc.aio.secure_channel(target_host, creds) else: - channel = grpc.aio.insecure_channel(self.target_host) + channel = grpc.aio.insecure_channel(target_host) self.channel = await self.exit_stack.enter_async_context(channel) self.client_stub = val_pb2_grpc.VALStub(self.channel) - if self.authorization_header is None: - logger.debug( - "Can not ensure startup connection without token to authorize") - if self.ensure_startup_connection: - try: - info = await self.get_server_info() - logger.debug("Connected to server: %s", info) - except: - logger.debug("Connection could not be ensured") - return self + self.ensure_startup_connection = True + logger.debug("Connected to server: %s", await self.get_server_info()) - async def __aexit__(self, exc_type, exc_value, traceback): + async def disconnect(self): await self.exit_stack.aclose() self.client_stub = None self.channel = None + self.ensure_startup_connection = False + + def check_connected_async(func): + async def wrapper(self, *args, **kwargs): + if self.ensure_startup_connection: + return await func(self, *args, **kwargs) + else: + logger.info( + "Disconnected from server! Try cli command connect.") + return wrapper async def get_current_values(self, paths: Iterable[str], **rpc_kwargs) -> Dict[str, Datapoint]: """ @@ -244,6 +255,7 @@ async def subscribe_metadata( ): yield {update.entry.path: update.entry.metadata for update in updates} + @check_connected_async async def get(self, *, entries: Iterable[EntryRequest], **rpc_kwargs) -> List[DataEntry]: """ Parameters: @@ -259,6 +271,7 @@ async def get(self, *, entries: Iterable[EntryRequest], **rpc_kwargs) -> List[Da raise VSSClientError.from_grpc_error(exc) from exc return self._process_get_response(resp) + @check_connected_async async def set(self, *, updates: Collection[EntryUpdate], **rpc_kwargs) -> None: """ Parameters: @@ -288,17 +301,21 @@ async def subscribe(self, *, rpc_kwargs grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials. """ - rpc_kwargs["metadata"] = self.generate_metadata_header( - rpc_kwargs.get("metadata")) - req = self._prepare_subscribe_request(entries) - resp_stream = self.client_stub.Subscribe(req, **rpc_kwargs) - try: - async for resp in resp_stream: - logger.debug("%s: %s", type(resp).__name__, resp) - yield [EntryUpdate.from_message(update) for update in resp.updates] - except AioRpcError as exc: - raise VSSClientError.from_grpc_error(exc) from exc + if self.ensure_startup_connection: + rpc_kwargs["metadata"] = self.generate_metadata_header( + rpc_kwargs.get("metadata")) + req = self._prepare_subscribe_request(entries) + resp_stream = self.client_stub.Subscribe(req, **rpc_kwargs) + try: + async for resp in resp_stream: + logger.debug("%s: %s", type(resp).__name__, resp) + yield [EntryUpdate.from_message(update) for update in resp.updates] + except AioRpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc + else: + logger.info("Disconnected from server! Try connect.") + @check_connected_async async def authorize(self, token: str, **rpc_kwargs) -> str: rpc_kwargs["metadata"] = self.generate_metadata_header( metadata=rpc_kwargs.get("metadata"), header=self.get_authorization_header(token)) @@ -311,6 +328,7 @@ async def authorize(self, token: str, **rpc_kwargs) -> str: self.authorization_header = self.get_authorization_header(token) return "Authenticated" + @check_connected_async async def get_server_info(self, **rpc_kwargs) -> ServerInfo: """ Parameters: From 5696664e721c4a95c6f669d0182cde4830a71e31 Mon Sep 17 00:00:00 2001 From: lukasmittag Date: Tue, 11 Apr 2023 10:39:19 +0200 Subject: [PATCH 2/4] Refactor ensure_startup_connection to connected (keep ensure_startup_connection as used before) --- kuksa-client/kuksa_client/grpc/__init__.py | 13 ++++++++----- kuksa-client/kuksa_client/grpc/aio.py | 11 ++++++----- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/kuksa-client/kuksa_client/grpc/__init__.py b/kuksa-client/kuksa_client/grpc/__init__.py index 9df8cf9dc..c11412ee7 100644 --- a/kuksa-client/kuksa_client/grpc/__init__.py +++ b/kuksa-client/kuksa_client/grpc/__init__.py @@ -480,6 +480,7 @@ def __init__( certificate_chain: Optional[Path] = None, *, ensure_startup_connection: bool = True, + connected: bool = False, ): self.authorization_header = self.get_authorization_header(token) self.target_host = f'{host}:{port}' @@ -487,6 +488,7 @@ def __init__( self.private_key = private_key self.certificate_chain = certificate_chain self.ensure_startup_connection = ensure_startup_connection + self.connected = connected self.client_stub = None def _load_creds(self) -> Optional[grpc.ChannelCredentials]: @@ -607,7 +609,7 @@ def __exit__(self, exc_type, exc_value, traceback): def check_connected(func): def wrapper(self, *args, **kwargs): - if self.ensure_startup_connection: + if self.connected: return func(self, *args, **kwargs) else: logger.info( @@ -624,14 +626,15 @@ def connect(self, target_host=None): channel = grpc.insecure_channel(target_host) self.channel = self.exit_stack.enter_context(channel) self.client_stub = val_pb2_grpc.VALStub(self.channel) - self.ensure_startup_connection = True - logger.debug("Connected to server: %s", self.get_server_info()) + self.connected = True + if self.ensure_startup_connection: + logger.debug("Connected to server: %s", self.get_server_info()) def disconnect(self): self.exit_stack.close() self.client_stub = None self.channel = None - self.ensure_startup_connection = False + self.connected = False def get_current_values(self, paths: Iterable[str], **rpc_kwargs) -> Dict[str, Datapoint]: """ @@ -839,7 +842,7 @@ def subscribe(self, *, entries: Iterable[SubscribeEntry], **rpc_kwargs) -> Itera grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials. """ - if self.ensure_startup_connection: + if self.connected: rpc_kwargs["metadata"] = self.generate_metadata_header( rpc_kwargs.get("metadata")) req = self._prepare_subscribe_request(entries) diff --git a/kuksa-client/kuksa_client/grpc/aio.py b/kuksa-client/kuksa_client/grpc/aio.py index 7b6f86aa9..0b6db4f95 100644 --- a/kuksa-client/kuksa_client/grpc/aio.py +++ b/kuksa-client/kuksa_client/grpc/aio.py @@ -73,18 +73,19 @@ async def connect(self, target_host=None): channel = grpc.aio.insecure_channel(target_host) self.channel = await self.exit_stack.enter_async_context(channel) self.client_stub = val_pb2_grpc.VALStub(self.channel) - self.ensure_startup_connection = True - logger.debug("Connected to server: %s", await self.get_server_info()) + self.connected = True + if self.ensure_startup_connection: + logger.debug("Connected to server: %s", await self.get_server_info()) async def disconnect(self): await self.exit_stack.aclose() self.client_stub = None self.channel = None - self.ensure_startup_connection = False + self.connected = False def check_connected_async(func): async def wrapper(self, *args, **kwargs): - if self.ensure_startup_connection: + if self.connected: return await func(self, *args, **kwargs) else: logger.info( @@ -301,7 +302,7 @@ async def subscribe(self, *, rpc_kwargs grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials. """ - if self.ensure_startup_connection: + if self.connected: rpc_kwargs["metadata"] = self.generate_metadata_header( rpc_kwargs.get("metadata")) req = self._prepare_subscribe_request(entries) From c83cae8dc8cf80bf848075cb7c75b867529cf580 Mon Sep 17 00:00:00 2001 From: lukasmittag Date: Tue, 11 Apr 2023 16:49:12 +0200 Subject: [PATCH 3/4] fix minor fault not covered during merge --- kuksa-client/kuksa_client/cli_backend/grpc.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/kuksa-client/kuksa_client/cli_backend/grpc.py b/kuksa-client/kuksa_client/cli_backend/grpc.py index f294e8dab..c0c635403 100644 --- a/kuksa-client/kuksa_client/cli_backend/grpc.py +++ b/kuksa-client/kuksa_client/cli_backend/grpc.py @@ -64,6 +64,7 @@ def checkConnection(self): # Function to stop the communication def stop(self): + self.disconnect() self.run = False print("gRPC channel disconnected.") @@ -126,11 +127,8 @@ def authorize(self, tokenfile=None, timeout=5): tokenfile = self.tokenfile tokenfile = pathlib.Path(tokenfile) requestArgs = { -<<<<<<< HEAD - 'token': tokenfile.expanduser().read_text(encoding='utf-8').rstrip('\n')} -======= - 'token': tokenfile.expanduser().read_text(encoding='utf-8')} ->>>>>>> 8de9460 (Add connect/disconnect to allow use of kuksa-client as non context-manager) + 'token': tokenfile.expanduser().read_text(encoding='utf-8').rstrip('\n') + } return self._sendReceiveMsg(("authorize", requestArgs), timeout) From 94c207ca295aadd4785d110883a58c09cb41c4b3 Mon Sep 17 00:00:00 2001 From: lukasmittag Date: Wed, 12 Apr 2023 12:51:17 +0200 Subject: [PATCH 4/4] Remove dis/connectThread for better user experience --- kuksa-client/kuksa_client/__main__.py | 218 +++++++++++++++----------- 1 file changed, 130 insertions(+), 88 deletions(-) diff --git a/kuksa-client/kuksa_client/__main__.py b/kuksa-client/kuksa_client/__main__.py index 989be7dc2..d65307490 100755 --- a/kuksa-client/kuksa_client/__main__.py +++ b/kuksa-client/kuksa_client/__main__.py @@ -45,7 +45,7 @@ DEFAULT_SERVER_PROTOCOL = "ws" SUPPORTED_SERVER_PROTOCOLS = ("ws", "grpc") -scriptDir= os.path.dirname(os.path.realpath(__file__)) +scriptDir = os.path.dirname(os.path.realpath(__file__)) def assignment_statement(arg): @@ -74,7 +74,7 @@ def get_childtree(self, pathText): # This else-branch is reached when one of the path components is invalid # In that case stop parsing further and return an empty tree # Autocompletion can't help here. - childVssTree={} + childVssTree = {} break if 'children' in childVssTree: @@ -93,13 +93,13 @@ def path_completer(self, text, line, begidx, endidx): self.pathCompletionItems = [] childTree = self.get_childtree(text) prefix = "" - seperator="/" + seperator = "/" if "/" in text: prefix = text[:text.rfind("/")]+"/" - elif "." in text: + elif "." in text: prefix = text[:text.rfind(".")]+"." - seperator="." + seperator = "." for key in childTree: child = childTree[key] @@ -111,13 +111,15 @@ def path_completer(self, text, line, begidx, endidx): description = child['description'] if 'type' in child: - nodetype=child['type'].capitalize() + nodetype = child['type'].capitalize() - self.pathCompletionItems.append(CompletionItem(prefix + key, nodetype+": "+ description)) + self.pathCompletionItems.append(CompletionItem( + prefix + key, nodetype+": " + description)) if 'children' in child: self.pathCompletionItems.append( - CompletionItem(prefix + key+seperator, "Children of branch "+prefix+key), + CompletionItem(prefix + key+seperator, + "Children of branch "+prefix+key), ) return basic_complete(text, line, begidx, endidx, self.pathCompletionItems) @@ -138,21 +140,22 @@ def subscriptionIdCompleter(self, text, line, begidx, endidx): ap_getServerAddr = argparse.ArgumentParser() ap_connect = argparse.ArgumentParser() + ap_connect.add_argument( + '-i', "--insecure", default=False, action="store_true", help='Connect in insecure mode') ap_disconnect = argparse.ArgumentParser() - ap_connectThread = argparse.ArgumentParser() - ap_connectThread.add_argument('-i', "--insecure", default=False, action="store_true", help='Connect in insecure mode') - ap_disconnectThread = argparse.ArgumentParser() ap_authorize = argparse.ArgumentParser() tokenfile_completer_method = functools.partial(Cmd.path_complete, - path_filter=lambda path: (os.path.isdir(path) or path.endswith(".token"))) + path_filter=lambda path: (os.path.isdir(path) or path.endswith(".token"))) ap_authorize.add_argument( 'Token', help='JWT(or the file storing the token) for authorizing the client.', completer_method=tokenfile_completer_method, ) ap_setServerAddr = argparse.ArgumentParser() - ap_setServerAddr.add_argument('IP', help='VISS/gRPC Server IP Address', default=DEFAULT_SERVER_ADDR) - ap_setServerAddr.add_argument('Port', type=int, help='VISS/gRPC Server Port', default=DEFAULT_SERVER_PORT) + ap_setServerAddr.add_argument( + 'IP', help='VISS/gRPC Server IP Address', default=DEFAULT_SERVER_ADDR) + ap_setServerAddr.add_argument( + 'Port', type=int, help='VISS/gRPC Server Port', default=DEFAULT_SERVER_PORT) ap_setServerAddr.add_argument( '-p', "--protocol", @@ -161,9 +164,11 @@ def subscriptionIdCompleter(self, text, line, begidx, endidx): ) ap_setValue = argparse.ArgumentParser() - ap_setValue.add_argument("Path", help="Path to be set", completer_method=path_completer) + ap_setValue.add_argument( + "Path", help="Path to be set", completer_method=path_completer) ap_setValue.add_argument("Value", help="Value to be set") - ap_setValue.add_argument("-a", "--attribute", help="Attribute to be set", default="value") + ap_setValue.add_argument( + "-a", "--attribute", help="Attribute to be set", default="value") ap_setValues = argparse.ArgumentParser() ap_setValues.add_argument( @@ -172,18 +177,24 @@ def subscriptionIdCompleter(self, text, line, begidx, endidx): nargs='+', type=assignment_statement, ) - ap_setValues.add_argument("-a", "--attribute", help="Attribute to be set", default="value") + ap_setValues.add_argument( + "-a", "--attribute", help="Attribute to be set", default="value") ap_getValue = argparse.ArgumentParser() - ap_getValue.add_argument("Path", help="Path to be read", completer_method=path_completer) - ap_getValue.add_argument("-a", "--attribute", help="Attribute to be get", default="value") + ap_getValue.add_argument( + "Path", help="Path to be read", completer_method=path_completer) + ap_getValue.add_argument( + "-a", "--attribute", help="Attribute to be get", default="value") ap_getValues = argparse.ArgumentParser() - ap_getValues.add_argument("Path", help="Path whose value is to be read", nargs='+', completer_method=path_completer) - ap_getValues.add_argument("-a", "--attribute", help="Attribute to be get", default="value") + ap_getValues.add_argument( + "Path", help="Path whose value is to be read", nargs='+', completer_method=path_completer) + ap_getValues.add_argument( + "-a", "--attribute", help="Attribute to be get", default="value") ap_setTargetValue = argparse.ArgumentParser() - ap_setTargetValue.add_argument("Path", help="Path whose target value to be set", completer_method=path_completer) + ap_setTargetValue.add_argument( + "Path", help="Path whose target value to be set", completer_method=path_completer) ap_setTargetValue.add_argument("Value", help="Value to be set") ap_setTargetValues = argparse.ArgumentParser() @@ -195,18 +206,24 @@ def subscriptionIdCompleter(self, text, line, begidx, endidx): ) ap_getTargetValue = argparse.ArgumentParser() - ap_getTargetValue.add_argument("Path", help="Path whose target value is to be read", completer_method=path_completer) + ap_getTargetValue.add_argument( + "Path", help="Path whose target value is to be read", completer_method=path_completer) ap_getTargetValues = argparse.ArgumentParser() - ap_getTargetValues.add_argument("Path", help="Path whose target value is to be read", nargs='+', completer_method=path_completer) + ap_getTargetValues.add_argument( + "Path", help="Path whose target value is to be read", nargs='+', completer_method=path_completer) ap_subscribe = argparse.ArgumentParser() - ap_subscribe.add_argument("Path", help="Path to subscribe to", completer_method=path_completer) - ap_subscribe.add_argument("-a", "--attribute", help="Attribute to subscribe to", default="value") + ap_subscribe.add_argument( + "Path", help="Path to subscribe to", completer_method=path_completer) + ap_subscribe.add_argument( + "-a", "--attribute", help="Attribute to subscribe to", default="value") ap_subscribeMultiple = argparse.ArgumentParser() - ap_subscribeMultiple.add_argument("Path", help="Path to subscribe to", nargs='+', completer_method=path_completer) - ap_subscribeMultiple.add_argument("-a", "--attribute", help="Attribute to subscribe to", default="value") + ap_subscribeMultiple.add_argument( + "Path", help="Path to subscribe to", nargs='+', completer_method=path_completer) + ap_subscribeMultiple.add_argument( + "-a", "--attribute", help="Attribute to subscribe to", default="value") ap_unsubscribe = argparse.ArgumentParser() ap_unsubscribe.add_argument( @@ -214,9 +231,11 @@ def subscriptionIdCompleter(self, text, line, begidx, endidx): ) ap_getMetaData = argparse.ArgumentParser() - ap_getMetaData.add_argument("Path", help="Path whose metadata is to be read", completer_method=path_completer) + ap_getMetaData.add_argument( + "Path", help="Path whose metadata is to be read", completer_method=path_completer) ap_updateMetaData = argparse.ArgumentParser() - ap_updateMetaData.add_argument("Path", help="Path whose MetaData is to update", completer_method=path_completer) + ap_updateMetaData.add_argument( + "Path", help="Path whose MetaData is to update", completer_method=path_completer) ap_updateMetaData.add_argument( "Json", help="MetaData to update. Note, only attributes can be update, if update children or the whole vss tree, use" @@ -225,8 +244,9 @@ def subscriptionIdCompleter(self, text, line, begidx, endidx): ap_updateVSSTree = argparse.ArgumentParser() jsonfile_completer_method = functools.partial(Cmd.path_complete, - path_filter=lambda path: (os.path.isdir(path) or path.endswith(".json"))) - ap_updateVSSTree.add_argument("Json", help="Json tree to update VSS", completer_method=jsonfile_completer_method) + path_filter=lambda path: (os.path.isdir(path) or path.endswith(".json"))) + ap_updateVSSTree.add_argument( + "Json", help="Json tree to update VSS", completer_method=jsonfile_completer_method) # Constructor def __init__(self, server_ip=None, server_port=None, server_protocol=None, insecure=False): @@ -260,15 +280,18 @@ def do_authorize(self, args): """Authorize the client to interact with the server""" if self.checkConnection(): resp = self.commThread.authorize(args.Token) - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) @with_category(VSS_COMMANDS) @with_argparser(ap_setValue) def do_setValue(self, args): """Set the value of a path""" if self.checkConnection(): - resp = self.commThread.setValue(args.Path, args.Value, args.attribute) - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + resp = self.commThread.setValue( + args.Path, args.Value, args.attribute) + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) self.pathCompletionItems = [] @with_category(VSS_COMMANDS) @@ -276,8 +299,10 @@ def do_setValue(self, args): def do_setValues(self, args): """Set the value of given paths""" if self.checkConnection(): - resp = self.commThread.setValues(dict(getattr(args, 'Path=Value')), args.attribute) - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + resp = self.commThread.setValues( + dict(getattr(args, 'Path=Value')), args.attribute) + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) self.pathCompletionItems = [] @with_category(VSS_COMMANDS) @@ -285,8 +310,10 @@ def do_setValues(self, args): def do_setTargetValue(self, args): """Set the target value of a path""" if self.checkConnection(): - resp = self.commThread.setValue(args.Path, args.Value, "targetValue") - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + resp = self.commThread.setValue( + args.Path, args.Value, "targetValue") + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) self.pathCompletionItems = [] @with_category(VSS_COMMANDS) @@ -294,8 +321,10 @@ def do_setTargetValue(self, args): def do_setTargetValues(self, args): """Set the target value of given paths""" if self.checkConnection(): - resp = self.commThread.setValues(dict(getattr(args, 'Path=Value')), "targetValue") - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + resp = self.commThread.setValues( + dict(getattr(args, 'Path=Value')), "targetValue") + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) self.pathCompletionItems = [] @with_category(VSS_COMMANDS) @@ -304,7 +333,8 @@ def do_getValue(self, args): """Get the value of a path""" if self.checkConnection(): resp = self.commThread.getValue(args.Path, args.attribute) - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) self.pathCompletionItems = [] @with_category(VSS_COMMANDS) @@ -313,7 +343,8 @@ def do_getValues(self, args): """Get the value of given paths""" if self.checkConnection(): resp = self.commThread.getValues(args.Path, args.attribute) - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) self.pathCompletionItems = [] @with_category(VSS_COMMANDS) @@ -322,7 +353,8 @@ def do_getTargetValue(self, args): """Get the value of a path""" if self.checkConnection(): resp = self.commThread.getValue(args.Path, "targetValue") - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) self.pathCompletionItems = [] @with_category(VSS_COMMANDS) @@ -331,7 +363,8 @@ def do_getTargetValues(self, args): """Get the value of given paths""" if self.checkConnection(): resp = self.commThread.getValues(args.Path, "targetValue") - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) self.pathCompletionItems = [] @with_category(VSS_COMMANDS) @@ -340,16 +373,20 @@ def do_subscribe(self, args): """Subscribe the value of a path""" if self.checkConnection(): - logPath = pathlib.Path.cwd() / f"log_{args.Path.replace('/', '.')}_{args.attribute}_{str(time.time())}" + logPath = pathlib.Path.cwd() / \ + f"log_{args.Path.replace('/', '.')}_{args.attribute}_{str(time.time())}" callback = functools.partial(self.subscribeCallback, logPath) - resp = self.commThread.subscribe(args.Path, callback, args.attribute) - resJson = json.loads(resp) + resp = self.commThread.subscribe( + args.Path, callback, args.attribute) + resJson = json.loads(resp) if "subscriptionId" in resJson: self.subscribeIds.add(resJson["subscriptionId"]) logPath.touch() print(f"Subscription log available at {logPath}") - print(f"Execute tail -f {logPath} on another Terminal instance") - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + print( + f"Execute tail -f {logPath} on another Terminal instance") + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) self.pathCompletionItems = [] @with_category(VSS_COMMANDS) @@ -357,16 +394,20 @@ def do_subscribe(self, args): def do_subscribeMultiple(self, args): """Subscribe to updates of given paths""" if self.checkConnection(): - logPath = pathlib.Path.cwd() / f"subscribeMultiple_{args.attribute}_{str(time.time())}.log" + logPath = pathlib.Path.cwd() / \ + f"subscribeMultiple_{args.attribute}_{str(time.time())}.log" callback = functools.partial(self.subscribeCallback, logPath) - resp = self.commThread.subscribeMultiple(args.Path, callback, args.attribute) - resJson = json.loads(resp) + resp = self.commThread.subscribeMultiple( + args.Path, callback, args.attribute) + resJson = json.loads(resp) if "subscriptionId" in resJson: self.subscribeIds.add(resJson["subscriptionId"]) logPath.touch() print(f"Subscription log available at {logPath}") - print(f"Execute tail -f {logPath} on another Terminal instance") - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + print( + f"Execute tail -f {logPath} on another Terminal instance") + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) self.pathCompletionItems = [] @with_category(VSS_COMMANDS) @@ -375,7 +416,8 @@ def do_unsubscribe(self, args): """Unsubscribe an existing subscription""" if self.checkConnection(): resp = self.commThread.unsubscribe(args.SubscribeId) - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) self.subscribeIds.discard(args.SubscribeId) self.pathCompletionItems = [] @@ -395,16 +437,18 @@ def getMetaData(self, path): def do_updateVSSTree(self, args): """Update VSS Tree Entry""" if self.checkConnection(): - resp = self.commThread.updateVSSTree(args.Json) - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + resp = self.commThread.updateVSSTree(args.Json) + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) @with_category(VSS_COMMANDS) @with_argparser(ap_updateMetaData) def do_updateMetaData(self, args): """Update MetaData of a given path""" if self.checkConnection(): - resp = self.commThread.updateMetaData(args.Path, args.Json) - print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) + resp = self.commThread.updateMetaData(args.Path, args.Json) + print(highlight(resp, lexers.JsonLexer(), + formatters.TerminalFormatter())) @with_category(VSS_COMMANDS) @with_argparser(ap_getMetaData) @@ -414,21 +458,14 @@ def do_getMetaData(self, args): print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter())) self.pathCompletionItems = [] - @with_category(COMM_SETUP_COMMANDS) - @with_argparser(ap_disconnectThread) - def do_disconnectThread(self, _args): + @with_argparser(ap_disconnect) + def do_disconnect(self, _args): """Disconnect from the VISS/gRPC Server""" if hasattr(self, "commThread"): if self.commThread is not None: self.commThread.stop() self.commThread = None - - @with_category(COMM_SETUP_COMMANDS) - @with_argparser(ap_disconnect) - def do_disconnect(self, _args): - """Disconnect from the VISS/gRPC Server""" - self.commThread.disconnect() def checkConnection(self): if self.commThread is None or not self.commThread.checkConnection(): @@ -441,11 +478,11 @@ def connect(self, insecure=False): if self.commThread is not None: self.commThread.stop() self.commThread = None - config = {'ip':self.serverIP, - 'port': self.serverPort, - 'insecure' : insecure, - 'protocol': self.serverProtocol - } + config = {'ip': self.serverIP, + 'port': self.serverPort, + 'insecure': insecure, + 'protocol': self.serverProtocol + } self.commThread = KuksaClientThread(config) self.commThread.start() @@ -457,19 +494,15 @@ def connect(self, insecure=False): if self.commThread.checkConnection(): pass else: - print("Error: Websocket could not be connected or the gRPC channel could not be created.") + print( + "Error: Websocket could not be connected or the gRPC channel could not be created.") self.commThread.stop() self.commThread = None - @with_category(COMM_SETUP_COMMANDS) - @with_argparser(ap_connectThread) - def do_connectThread(self, args): - self.connect(args.insecure) - @with_category(COMM_SETUP_COMMANDS) @with_argparser(ap_connect) def do_connect(self, args): - self.commThread.connect() + self.connect(args.insecure) @with_category(COMM_SETUP_COMMANDS) @with_argparser(ap_setServerAddr) @@ -481,9 +514,11 @@ def do_setServerAddress(self, args): if args.protocol not in self.supportedProtocols: raise ValueError self.serverProtocol = args.protocol - print("Setting Server Address to " + args.IP + ":" + str(args.Port) + " with protocol " + args.protocol) + print("Setting Server Address to " + args.IP + ":" + + str(args.Port) + " with protocol " + args.protocol) except ValueError: - print("Error: Please give a valid server Address/Protocol. Only ws and grpc are supported.") + print( + "Error: Please give a valid server Address/Protocol. Only ws and grpc are supported.") @with_category(COMM_SETUP_COMMANDS) @with_argparser(ap_getServerAddr) @@ -498,7 +533,8 @@ def getDefaultTokenDir(self): try: return os.path.join(kuksa_certificates.__certificate_dir__, "jwt") except AttributeError: - guessTokenDir = os.path.join(scriptDir, "../kuksa_certificates/jwt") + guessTokenDir = os.path.join( + scriptDir, "../kuksa_certificates/jwt") if os.path.isdir(guessTokenDir): return guessTokenDir return "Unknown" @@ -525,17 +561,22 @@ def do_printTokenDir(self, _args): # pylint: enable=too-many-instance-attributes # Main Function + + def main(): parser = argparse.ArgumentParser() - parser.add_argument('--ip', help="VISS/gRPC Server IP Address", default=DEFAULT_SERVER_ADDR) - parser.add_argument('--port', type=int, help="VISS/gRPC Server Port", default=DEFAULT_SERVER_PORT) + parser.add_argument( + '--ip', help="VISS/gRPC Server IP Address", default=DEFAULT_SERVER_ADDR) + parser.add_argument( + '--port', type=int, help="VISS/gRPC Server Port", default=DEFAULT_SERVER_PORT) parser.add_argument( '--protocol', help="VISS/gRPC Server Communication Protocol", choices=SUPPORTED_SERVER_PROTOCOLS, default=DEFAULT_SERVER_PROTOCOL, ) - parser.add_argument('--insecure', default=False, action='store_true', help='Connect in insecure mode') + parser.add_argument('--insecure', default=False, + action='store_true', help='Connect in insecure mode') parser.add_argument( '--logging-config', default=os.path.join(scriptDir, 'logging.ini'), help="Path to logging configuration file", ) @@ -549,5 +590,6 @@ def main(): finally: clientApp.stop() -if __name__=="__main__": + +if __name__ == "__main__": sys.exit(main())