Skip to content
This repository has been archived by the owner on Dec 18, 2024. It is now read-only.

Use kuksa-client as non context-manager #526

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions kuksa-client/kuksa_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

from . import cli_backend


class KuksaClientThread(threading.Thread):

# Constructor
Expand Down Expand Up @@ -86,17 +87,23 @@ def getValues(self, paths: Iterable[str], attribute="value", timeout=5):
# The given callback function will be called then, if the given path is updated:
# updateMessage = await webSocket.recv()
# callback(updateMessage)
def subscribe(self, path, callback, attribute = "value", timeout=5):
def subscribe(self, path, callback, attribute="value", timeout=5):
return self.backend.subscribe(path, callback, attribute, timeout)

def subscribeMultiple(self, paths, callback, attribute = "value", timeout=5):
def subscribeMultiple(self, paths, callback, attribute="value", timeout=5):
return self.backend.subscribeMultiple(paths, callback, attribute, timeout)

# Unsubscribe value changes of to a given path.
# The subscription id from the response of the corresponding subscription request will be required
def unsubscribe(self, sub_id, timeout=5):
return self.backend.unsubscribe(sub_id, timeout)

def disconnect(self, timeout=5):
return self.backend.disconnect(timeout)

def connect(self, timeout=5):
SebastianSchildt marked this conversation as resolved.
Show resolved Hide resolved
return self.backend.connect(timeout)

# Thread function: Start the asyncio loop
def run(self):
self.loop = asyncio.new_event_loop()
Expand Down
21 changes: 17 additions & 4 deletions kuksa-client/kuksa_client/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ def subscriptionIdCompleter(self, text, line, begidx, endidx):

ap_getServerAddr = argparse.ArgumentParser()
ap_connect = argparse.ArgumentParser()
ap_connect.add_argument('-i', "--insecure", default=False, action="store_true", help='Connect in insecure mode')
ap_disconnect = argparse.ArgumentParser()
ap_connectThread = argparse.ArgumentParser()
ap_connectThread.add_argument('-i', "--insecure", default=False, action="store_true", help='Connect in insecure mode')
ap_disconnectThread = argparse.ArgumentParser()
ap_authorize = argparse.ArgumentParser()
tokenfile_completer_method = functools.partial(Cmd.path_complete,
path_filter=lambda path: (os.path.isdir(path) or path.endswith(".token")))
Expand Down Expand Up @@ -414,13 +416,19 @@ def do_getMetaData(self, args):


@with_category(COMM_SETUP_COMMANDS)
@with_argparser(ap_disconnect)
def do_disconnect(self, _args):
@with_argparser(ap_disconnectThread)
def do_disconnectThread(self, _args):
"""Disconnect from the VISS/gRPC Server"""
if hasattr(self, "commThread"):
if self.commThread is not None:
self.commThread.stop()
self.commThread = None

@with_category(COMM_SETUP_COMMANDS)
@with_argparser(ap_disconnect)
def do_disconnect(self, _args):
"""Disconnect from the VISS/gRPC Server"""
self.commThread.disconnect()

def checkConnection(self):
if self.commThread is None or not self.commThread.checkConnection():
Expand Down Expand Up @@ -453,10 +461,15 @@ def connect(self, insecure=False):
self.commThread.stop()
self.commThread = None

@with_category(COMM_SETUP_COMMANDS)
@with_argparser(ap_connectThread)
def do_connectThread(self, args):
self.connect(args.insecure)

@with_category(COMM_SETUP_COMMANDS)
@with_argparser(ap_connect)
def do_connect(self, args):
self.connect(args.insecure)
self.commThread.connect()

@with_category(COMM_SETUP_COMMANDS)
@with_argparser(ap_setServerAddr)
Expand Down
21 changes: 18 additions & 3 deletions kuksa-client/kuksa_client/cli_backend/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def checkConnection(self):

# Function to stop the communication
def stop(self):
self.disconnect()
self.run = False
print("gRPC channel disconnected.")

Expand Down Expand Up @@ -126,7 +127,8 @@ def authorize(self, tokenfile=None, timeout=5):
tokenfile = self.tokenfile
tokenfile = pathlib.Path(tokenfile)
requestArgs = {
'token': tokenfile.expanduser().read_text(encoding='utf-8').rstrip('\n')}
'token': tokenfile.expanduser().read_text(encoding='utf-8').rstrip('\n')
}

return self._sendReceiveMsg(("authorize", requestArgs), timeout)

Expand Down Expand Up @@ -158,6 +160,14 @@ def unsubscribe(self, sub_id, timeout=5):
requestArgs = {'subscription_id': sub_id}
return self._sendReceiveMsg(("unsubscribe", requestArgs), timeout)

def connect(self, timeout=5):
requestArgs = {}
return self._sendReceiveMsg(("connect", requestArgs), timeout)

def disconnect(self, timeout=5):
requestArgs = {}
return self._sendReceiveMsg(("disconnect", requestArgs), timeout)

def _sendReceiveMsg(self, req, timeout):
(call, requestArgs) = req
recvQueue = queue.Queue(maxsize=1)
Expand Down Expand Up @@ -189,8 +199,9 @@ async def _grpcHandler(self, vss_client):
try:
if call == "get":
resp = await vss_client.get(**requestArgs)
resp = [entry.to_dict() for entry in resp]
resp = resp[0] if len(resp) == 1 else resp
if resp != None:
resp = [entry.to_dict() for entry in resp]
resp = resp[0] if len(resp) == 1 else resp
elif call == "set":
resp = await vss_client.set(**requestArgs)
elif call == "authorize":
Expand All @@ -203,6 +214,10 @@ async def _grpcHandler(self, vss_client):
resp = {"subscriptionId": str(resp)}
elif call == "unsubscribe":
resp = await subscriber_manager.remove_subscriber(**requestArgs)
elif call == "connect":
resp = await vss_client.connect()
elif call == "disconnect":
resp = await vss_client.disconnect()
else:
raise Exception("Not Implemented.")

Expand Down
37 changes: 23 additions & 14 deletions kuksa-client/kuksa_client/cli_backend/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from kuksa_client import cli_backend


class Backend(cli_backend.Backend):
def __init__(self, config):
super().__init__(config)
Expand All @@ -53,7 +54,8 @@ async def _receiver_handler(self, webSocket):
else:
if "subscriptionId" in resJson and resJson["subscriptionId"] in self.subscriptionCallbacks:
try:
self.subscriptionCallbacks[resJson["subscriptionId"]](message)
self.subscriptionCallbacks[resJson["subscriptionId"]](
message)
except Exception as e: # pylint: disable=broad-except
print(e)

Expand Down Expand Up @@ -99,11 +101,11 @@ def _sendReceiveMsg(self, req, timeout):
# Wait on the receive queue
try:
res = recvQueue.get(timeout=timeout)
resJson = json.loads(res)
resJson = json.loads(res)
if "requestId" in res and str(req["requestId"]) == str(resJson["requestId"]):
return json.dumps(resJson, indent=2)
except queue.Empty:
req["error"] = "timeout"
req["error"] = "timeout"
return json.dumps(req, indent=2)

# Function to stop the communication
Expand All @@ -112,6 +114,9 @@ def stop(self):
self.run = False
print("Server disconnected.")

def disconnect(self):
self.stop()

# Function to authorize against the kuksa.val server
def authorize(self, tokenfile=None, timeout=2):
if tokenfile is None:
Expand All @@ -120,14 +125,14 @@ def authorize(self, tokenfile=None, timeout=2):
token = tokenfile.expanduser().read_text(encoding='utf-8')

req = {}
req["action"]= "authorize"
req["action"] = "authorize"
req["tokens"] = token
return self._sendReceiveMsg(req, timeout)

# Update VSS Tree Entry
def updateVSSTree(self, jsonStr, timeout=5):
req = {}
req["action"]= "updateVSSTree"
req["action"] = "updateVSSTree"
if os.path.isfile(jsonStr):
with open(jsonStr, "r", encoding="utf-8") as f:
req["metadata"] = json.load(f)
Expand All @@ -138,7 +143,7 @@ def updateVSSTree(self, jsonStr, timeout=5):
# Update Meta Data of a given path
def updateMetaData(self, path, jsonStr, timeout=5):
req = {}
req["action"]= "updateMetaData"
req["action"] = "updateMetaData"
req["path"] = path
req["metadata"] = json.loads(jsonStr)
return self._sendReceiveMsg(req, timeout)
Expand All @@ -147,7 +152,7 @@ def updateMetaData(self, path, jsonStr, timeout=5):
def getMetaData(self, path, timeout=5):
"""Get MetaData of the parameter"""
req = {}
req["action"]= "getMetaData"
req["action"] = "getMetaData"
req["path"] = path
return self._sendReceiveMsg(req, timeout)

Expand All @@ -156,7 +161,7 @@ def setValue(self, path, value, attribute="value", timeout=5):
if 'nan' == value:
return json.dumps({"error": path + " has an invalid value " + str(value)}, indent=2)
req = {}
req["action"]= "set"
req["action"] = "set"
req["path"] = path
req["attribute"] = attribute
try:
Expand Down Expand Up @@ -186,11 +191,11 @@ def getValue(self, path, attribute="value", timeout=5):
# callback(updateMessage)
def subscribe(self, path, callback, attribute="value", timeout=5):
req = {}
req["action"]= "subscribe"
req["action"] = "subscribe"
req["path"] = path
req["attribute"] = attribute
res = self._sendReceiveMsg(req, timeout)
resJson = json.loads(res)
resJson = json.loads(res)
if "subscriptionId" in resJson:
self.subscriptionCallbacks[resJson["subscriptionId"]] = callback
return res
Expand All @@ -199,7 +204,7 @@ def subscribe(self, path, callback, attribute="value", timeout=5):
# The subscription id from the response of the corresponding subscription request will be required
def unsubscribe(self, subId, timeout=5):
req = {}
req["action"]= "unsubscribe"
req["action"] = "unsubscribe"
req["subscriptionId"] = subId

res = {}
Expand All @@ -222,11 +227,11 @@ def unsubscribe(self, subId, timeout=5):
def checkConnection(self):
return self.wsConnected

# Main loop for handling websocket communication
async def mainLoop(self):
async def connect(self):
if not self.insecure:
context = ssl.create_default_context()
context.load_cert_chain(certfile=self.certificate, keyfile=self.keyfile)
context.load_cert_chain(
certfile=self.certificate, keyfile=self.keyfile)
context.load_verify_locations(cafile=self.cacertificate)
# Certificates in ../kuksa_certificates does not contain the IP address used for
# connection to server so hostname check must be disabled
Expand All @@ -246,3 +251,7 @@ async def mainLoop(self):
await self._msgHandler(ws)
except OSError as e:
print("Disconnected!! " + str(e))

# Main loop for handling websocket communication
async def mainLoop(self):
await self.connect()
Loading