diff --git a/control/grpc.py b/control/grpc.py index c8c27381..b10f69eb 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -137,8 +137,8 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp except ValueError: self.logger.warning(f"Actual huge pages count value {hugepages_val} is not numeric") hugepages_val = "" - if requested_hugepages_val and hugepages_val != "" and requested_hugepages_val != hugepages_val: - self.logger.warning(f"The actual huge page count {hugepages_val} differs from the requested value of {requested_hugepages_val}") + if requested_hugepages_val and hugepages_val != "" and requested_hugepages_val > hugepages_val: + self.logger.warning(f"The actual huge page count {hugepages_val} is smaller than the requested value of {requested_hugepages_val}") else: self.logger.warning(f"Can't read actual huge pages count value from {hugepages_file}") except Exception as ex: @@ -288,7 +288,7 @@ def execute_grpc_function(self, func, request, context): """ return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context) - def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size): + def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size, peer_msg = ""): """Creates a bdev from an RBD image.""" if create_image: @@ -298,7 +298,7 @@ def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, bl self.logger.info(f"Received request to create bdev {name} from" f" {rbd_pool_name}/{rbd_image_name} (size {rbd_image_size} MiB)" - f" with block size {block_size}, {cr_img_msg}") + f" with block size {block_size}, {cr_img_msg}{peer_msg}") if block_size == 0: return BdevStatus(status=errno.EINVAL, @@ -335,7 +335,7 @@ def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, bl self.bdev_cluster[name] = cluster_name self.bdev_params[name] = {'uuid':uuid, 'pool_name':rbd_pool_name, 'image_name':rbd_image_name, 'image_size':rbd_image_size, 'block_size': block_size} - self.logger.info(f"bdev_rbd_create: {bdev_name}, cluster_name {cluster_name}") + self.logger.debug(f"bdev_rbd_create: {bdev_name}, cluster_name {cluster_name}") except Exception as ex: errmsg = f"bdev_rbd_create {name} failed" self.logger.exception(errmsg) @@ -358,10 +358,10 @@ def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, bl return BdevStatus(status=0, error_message=os.strerror(0), bdev_name=name) - def resize_bdev(self, bdev_name, new_size): + def resize_bdev(self, bdev_name, new_size, peer_msg = ""): """Resizes a bdev.""" - self.logger.info(f"Received request to resize bdev {bdev_name} to {new_size} MiB") + self.logger.info(f"Received request to resize bdev {bdev_name} to {new_size} MiB{peer_msg}") with self.rpc_lock: try: ret = rpc_bdev.bdev_rbd_resize( @@ -369,7 +369,7 @@ def resize_bdev(self, bdev_name, new_size): name=bdev_name, new_size=new_size, ) - self.logger.info(f"resize_bdev {bdev_name}: {ret}") + self.logger.debug(f"resize_bdev {bdev_name}: {ret}") except Exception as ex: errmsg = f"Failure resizing bdev {bdev_name}" self.logger.exception(errmsg) @@ -388,14 +388,14 @@ def resize_bdev(self, bdev_name, new_size): return pb2.req_status(status=0, error_message=os.strerror(0)) - def delete_bdev(self, bdev_name, recycling_mode=False): + def delete_bdev(self, bdev_name, recycling_mode=False, peer_msg=""): """Deletes a bdev.""" if not self.rpc_lock.locked(): self.logger.error(f"A call to delete_bdev() without holding the RPC lock") assert self.rpc_lock.locked() - self.logger.info(f"Received request to delete bdev {bdev_name}") + self.logger.info(f"Received request to delete bdev {bdev_name}{peer_msg}") try: ret = rpc_bdev.bdev_rbd_delete( self.spdk_rpc_client, @@ -404,7 +404,7 @@ def delete_bdev(self, bdev_name, recycling_mode=False): if not recycling_mode: del self.bdev_params[bdev_name] self._put_cluster(self.bdev_cluster[bdev_name]) - self.logger.info(f"delete_bdev {bdev_name}: {ret}") + self.logger.debug(f"delete_bdev {bdev_name}: {ret}") except Exception as ex: errmsg = f"Failure deleting bdev {bdev_name}" self.logger.exception(errmsg) @@ -457,13 +457,36 @@ def serial_number_already_used(self, context, serial) -> str: continue return None + def get_peer_message(self, context) -> str: + if not context: + return "" + + try: + peer = context.peer().split(":", 1) + addr_fam = peer[0].lower() + addr = peer[1] + if addr_fam == "ipv6": + addr_fam = "IPv6" + addr = addr.replace("%5B", "[", 1) + addr = addr.replace("%5D", "]", 1) + elif addr_fam == "ipv4": + addr_fam = "IPv4" + else: + addr_fam = "" + return f", client address: {addr_fam} {addr}" + except Exception: + self.logger.exception(f"Got exception trying to get peer's address") + + return "" + def create_subsystem_safe(self, request, context): """Creates a subsystem.""" create_subsystem_error_prefix = f"Failure creating subsystem {request.subsystem_nqn}" + peer_msg = self.get_peer_message(context) self.logger.info( - f"Received request to create subsystem {request.subsystem_nqn}, enable_ha: {request.enable_ha}, context: {context}") + f"Received request to create subsystem {request.subsystem_nqn}, enable_ha: {request.enable_ha}, context: {context}{peer_msg}") if not request.enable_ha: errmsg = f"{create_subsystem_error_prefix}: HA must be enabled for subsystems" @@ -527,7 +550,7 @@ def create_subsystem_safe(self, request, context): ) self.subsys_ha[request.subsystem_nqn] = enable_ha self.subsys_max_ns[request.subsystem_nqn] = request.max_namespaces if request.max_namespaces is not None else 32 - self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") + self.logger.debug(f"create_subsystem {request.subsystem_nqn}: {ret}") except Exception as ex: self.logger.exception(create_subsystem_error_prefix) errmsg = f"{create_subsystem_error_prefix}:\n{ex}" @@ -620,7 +643,7 @@ def delete_subsystem_safe(self, request, context): ) self.subsys_ha.pop(request.subsystem_nqn) self.subsys_max_ns.pop(request.subsystem_nqn) - self.logger.info(f"delete_subsystem {request.subsystem_nqn}: {ret}") + self.logger.debug(f"delete_subsystem {request.subsystem_nqn}: {ret}") except Exception as ex: self.logger.exception(delete_subsystem_error_prefix) errmsg = f"{delete_subsystem_error_prefix}:\n{ex}" @@ -643,8 +666,9 @@ def delete_subsystem_safe(self, request, context): def delete_subsystem(self, request, context=None): """Deletes a subsystem.""" + peer_msg = self.get_peer_message(context) delete_subsystem_error_prefix = f"Failure deleting subsystem {request.subsystem_nqn}" - self.logger.info(f"Received request to delete subsystem {request.subsystem_nqn}, context: {context}") + self.logger.info(f"Received request to delete subsystem {request.subsystem_nqn}, context: {context}{peer_msg}") if GatewayUtils.is_discovery_nqn(request.subsystem_nqn): errmsg = f"{delete_subsystem_error_prefix}: Can't delete a discovery subsystem" @@ -711,7 +735,8 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, conte add_namespace_error_prefix = f"Failure adding namespace{nsid_msg}to {subsystem_nqn}" - self.logger.info(f"Received request to add {bdev_name} to {subsystem_nqn} with ANA group id {anagrpid}{nsid_msg}") + peer_msg = self.get_peer_message(context) + self.logger.info(f"Received request to add {bdev_name} to {subsystem_nqn} with ANA group id {anagrpid}{nsid_msg}{peer_msg}") if anagrpid > self.subsys_max_ns[subsystem_nqn]: errmsg = f"{add_namespace_error_prefix}: Group ID {anagrpid} is bigger than configured maximum {self.subsys_max_ns[subsystem_nqn]}" @@ -734,7 +759,7 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, conte ) self.subsystem_nsid_bdev[subsystem_nqn][nsid] = bdev_name self.subsystem_nsid_anagrp[subsystem_nqn][nsid] = anagrpid - self.logger.info(f"subsystem_add_ns: {nsid}") + self.logger.debug(f"subsystem_add_ns: {nsid}") except Exception as ex: self.logger.exception(add_namespace_error_prefix) errmsg = f"{add_namespace_error_prefix}:\n{ex}" @@ -771,8 +796,9 @@ def set_ana_state(self, request, context=None): return self.execute_grpc_function(self.set_ana_state_safe, request, context) def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None): + peer_msg = self.get_peer_message(context) """Sets ana state for this gateway.""" - self.logger.info(f"Received request to set ana states {ana_info.states}") + self.logger.info(f"Received request to set ana states {ana_info.states}, {peer_msg}") state = self.gateway_state.local.get_state() inaccessible_ana_groups = {} @@ -791,11 +817,11 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None): prefix = GatewayState.build_partial_listener_key(nqn, self.host_name) + GatewayState.OMAP_KEY_DELIMITER listener_keys = [key for key in state.keys() if key.startswith(prefix)] - self.logger.info(f"Iterate over {nqn=} {prefix=} {listener_keys=}") + self.logger.debug(f"Iterate over {nqn=} {prefix=} {listener_keys=}") for listener_key in listener_keys: listener = json.loads(state[listener_key]) - self.logger.info(f"{listener_key=} {listener=}") + self.logger.debug(f"{listener_key=} {listener=}") # Iterate over ana_group_state in nqn_ana_states for gs in nas.states: @@ -813,10 +839,10 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None): for cluster in self.clusters[grp_id]: if not rpc_bdev.bdev_rbd_wait_for_latest_osdmap(self.spdk_rpc_client, name=cluster): raise Exception(f"bdev_rbd_wait_for_latest_osdmap({cluster=}) error") - self.logger.info(f"set_ana_state bdev_rbd_wait_for_latest_osdmap {cluster=}") + self.logger.debug(f"set_ana_state bdev_rbd_wait_for_latest_osdmap {cluster=}") optimized_ana_groups.add(grp_id) - self.logger.info(f"set_ana_state nvmf_subsystem_listener_set_ana_state {nqn=} {listener=} {ana_state=} {grp_id=}") + self.logger.debug(f"set_ana_state nvmf_subsystem_listener_set_ana_state {nqn=} {listener=} {ana_state=} {grp_id=}") ret = rpc_nvmf.nvmf_subsystem_listener_set_ana_state( self.spdk_rpc_client, nqn=nqn, @@ -828,7 +854,7 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None): anagrpid=grp_id) if ana_state == "inaccessible" : inaccessible_ana_groups[grp_id] = True - self.logger.info(f"set_ana_state nvmf_subsystem_listener_set_ana_state response {ret=}") + self.logger.debug(f"set_ana_state nvmf_subsystem_listener_set_ana_state response {ret=}") if not ret: raise Exception(f"nvmf_subsystem_listener_set_ana_state({nqn=}, {listener=}, {ana_state=}, {grp_id=}) error") except Exception as ex: @@ -838,7 +864,7 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None): context.set_details(f"{ex}") return pb2.req_status() for ana_key in inaccessible_ana_groups : - ret_recycle = self.namespace_recycle_safe(ana_key) + ret_recycle = self.namespace_recycle_safe(ana_key, peer_msg) if ret_recycle != 0: errmsg = f"Failure recycle namespaces of ana group {ana_key} " self.logger.error(errmsg) @@ -848,8 +874,9 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None): def namespace_add_safe(self, request, context): """Adds a namespace to a subsystem.""" + peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, context: {context}") + self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, context: {context}{peer_msg}") if not request.uuid: request.uuid = str(uuid.uuid4()) @@ -872,7 +899,7 @@ def namespace_add_safe(self, request, context): create_image = False anagrp = int(request.anagrpid) if request.anagrpid is not None else 0 ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name, - request.rbd_image_name, request.block_size, create_image, request.size) + request.rbd_image_name, request.block_size, create_image, request.size, peer_msg) if ret_bdev.status != 0: errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: {ret_bdev.error_message}" self.logger.error(errmsg) @@ -880,8 +907,8 @@ def namespace_add_safe(self, request, context): ns_bdev = self.get_bdev_info(bdev_name, False) if ns_bdev != None: try: - ret_del = self.delete_bdev(bdev_name) - self.logger.info(f"delete_bdev({bdev_name}): {ret_del.status}") + ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg) + self.logger.debug(f"delete_bdev({bdev_name}): {ret_del.status}") except AssertionError: self.logger.exception(f"Got an assert while trying to delete bdev {bdev_name}") raise @@ -902,7 +929,7 @@ def namespace_add_safe(self, request, context): if ret_ns.status != 0: try: - ret_del = self.delete_bdev(bdev_name) + ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg) if ret_del.status != 0: self.logger.warning(f"Failure {ret_del.status} deleting bdev {bdev_name}: {ret_del.error_message}") except AssertionError: @@ -936,8 +963,9 @@ def namespace_add(self, request, context=None): def namespace_change_load_balancing_group_safe(self, request, context): """Changes a namespace load balancing group.""" + peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to change load balancing group for namespace {nsid_msg}in {request.subsystem_nqn} to {request.anagrpid}, context: {context}") + self.logger.info(f"Received request to change load balancing group for namespace {nsid_msg}in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}") with self.omap_lock(context=context): find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, request.uuid, False, @@ -1075,8 +1103,9 @@ def remove_namespace(self, subsystem_nqn, nsid, context): if context: assert self.omap_lock.locked() + peer_msg = self.get_peer_message(context) namespace_failure_prefix = f"Failure removing namespace {nsid} from {subsystem_nqn}" - self.logger.info(f"Received request to remove namespace {nsid} from {subsystem_nqn}") + self.logger.info(f"Received request to remove namespace {nsid} from {subsystem_nqn}{peer_msg}") if GatewayUtils.is_discovery_nqn(subsystem_nqn): errmsg=f"{namespace_failure_prefix}: Can't remove a namespace from a discovery subsystem" @@ -1089,7 +1118,7 @@ def remove_namespace(self, subsystem_nqn, nsid, context): nqn=subsystem_nqn, nsid=nsid, ) - self.logger.info(f"remove_namespace {nsid}: {ret}") + self.logger.debug(f"remove_namespace {nsid}: {ret}") except Exception as ex: self.logger.exception(namespace_failure_prefix) errmsg = f"{namespace_failure_prefix}:\n{ex}" @@ -1130,6 +1159,7 @@ def get_bdev_info(self, bdev_name, need_to_lock): def list_namespaces(self, request, context=None): """List namespaces.""" + peer_msg = self.get_peer_message(context) if request.nsid == None or request.nsid == 0: if request.uuid: nsid_msg = f"namespace with UUID {request.uuid}" @@ -1140,12 +1170,12 @@ def list_namespaces(self, request, context=None): nsid_msg = f"namespace with NSID {request.nsid} and UUID {request.uuid}" else: nsid_msg = f"namespace with NSID {request.nsid}" - self.logger.info(f"Received request to list {nsid_msg} for {request.subsystem}, context: {context}") + self.logger.info(f"Received request to list {nsid_msg} for {request.subsystem}, context: {context}{peer_msg}") with self.rpc_lock: try: ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client, nqn=request.subsystem) - self.logger.info(f"list_namespaces: {ret}") + self.logger.debug(f"list_namespaces: {ret}") except Exception as ex: errmsg = f"Failure listing namespaces" self.logger.exception(errmsg) @@ -1218,8 +1248,9 @@ def list_namespaces(self, request, context=None): def namespace_get_io_stats(self, request, context=None): """Get namespace's IO stats.""" + peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to get IO stats for namespace {nsid_msg}on {request.subsystem_nqn}, context: {context}") + self.logger.info(f"Received request to get IO stats for namespace {nsid_msg}on {request.subsystem_nqn}, context: {context}{peer_msg}") with self.rpc_lock: find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, request.uuid, False, @@ -1240,7 +1271,7 @@ def namespace_get_io_stats(self, request, context=None): self.spdk_rpc_client, name=bdev_name, ) - self.logger.info(f"get_bdev_iostat {bdev_name}: {ret}") + self.logger.debug(f"get_bdev_iostat {bdev_name}: {ret}") except Exception as ex: errmsg = f"Failure getting IO stats for namespace {nsid_msg}on {request.subsystem_nqn}" self.logger.exception(errmsg) @@ -1319,9 +1350,10 @@ def get_qos_limits_string(self, request): def namespace_set_qos_limits_safe(self, request, context): """Set namespace's qos limits.""" + peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) limits_to_set = self.get_qos_limits_string(request) - self.logger.info(f"Received request to set QOS limits for namespace {nsid_msg}on {request.subsystem_nqn},{limits_to_set}, context: {context}") + self.logger.info(f"Received request to set QOS limits for namespace {nsid_msg}on {request.subsystem_nqn},{limits_to_set}, context: {context}{peer_msg}") find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, request.uuid, False, "Failure setting namespace's QOS limits") @@ -1369,14 +1401,14 @@ def namespace_set_qos_limits_safe(self, request, context): request.w_mbytes_per_second = int(ns_qos_entry["w_mbytes_per_second"]) limits_to_set = self.get_qos_limits_string(request) - self.logger.info(f"After merging current QOS limits with previous ones for namespace {nsid_msg}on {request.subsystem_nqn},{limits_to_set}") + self.logger.debug(f"After merging current QOS limits with previous ones for namespace {nsid_msg}on {request.subsystem_nqn},{limits_to_set}") with self.omap_lock(context=context): try: ret = rpc_bdev.bdev_set_qos_limit( self.spdk_rpc_client, **set_qos_limits_args) - self.logger.info(f"bdev_set_qos_limit {bdev_name}: {ret}") + self.logger.debug(f"bdev_set_qos_limit {bdev_name}: {ret}") except Exception as ex: errmsg = f"Failure setting QOS limits for namespace {nsid_msg}on {request.subsystem_nqn}" self.logger.exception(errmsg) @@ -1428,7 +1460,7 @@ def find_namespace_and_bdev_name(self, nqn, nsid, uuid, needs_lock, err_prefix): with lock_to_use: try: ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client, nqn=nqn) - self.logger.info(f"find_namespace_and_bdev_name: {ret}") + self.logger.debug(f"find_namespace_and_bdev_name: {ret}") except Exception as ex: self.logger.exception(err_prefix) errmsg = f"{err_prefix}:\n{ex}" @@ -1466,8 +1498,9 @@ def find_namespace_and_bdev_name(self, nqn, nsid, uuid, needs_lock, err_prefix): def namespace_resize(self, request, context=None): """Resize a namespace.""" + peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to resize namespace {nsid_msg}on {request.subsystem_nqn} to {request.new_size} MiB, context: {context}") + self.logger.info(f"Received request to resize namespace {nsid_msg}on {request.subsystem_nqn} to {request.new_size} MiB, context: {context}{peer_msg}") find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, request.uuid, True, "Failure resizing namespace") if not find_ret[0]: @@ -1480,7 +1513,7 @@ def namespace_resize(self, request, context=None): self.logger.error(errmsg) return pb2.req_status(status=errno.ENODEV, error_message=errmsg) - ret = self.resize_bdev(bdev_name, request.new_size) + ret = self.resize_bdev(bdev_name, request.new_size, peer_msg) if ret.status == 0: errmsg = os.strerror(0) @@ -1490,10 +1523,10 @@ def namespace_resize(self, request, context=None): return pb2.req_status(status=ret.status, error_message=errmsg) - def namespace_recycle_safe(self, ana_id) ->int: + def namespace_recycle_safe(self, ana_id, peer_msg = "") ->int: """Recycle namespaces.""" now = time.time() - self.logger.info(f"== recycle_safe started == for anagrp{ana_id} time {now} ") + self.logger.info(f"== recycle_safe started == for anagrp{ana_id} time {now}{peer_msg} ") self.logger.info(f"Doing loop on {ana_id} map; subsystem_nsid_anagrp:") list_ns_params = [] @@ -1512,19 +1545,19 @@ def namespace_recycle_safe(self, ana_id) ->int: ret = self.remove_namespace(subsys, nsid, None) if ret.status != 0: return -1 - ret_del = self.delete_bdev(bdev_name, True) + ret_del = self.delete_bdev(bdev_name, True, peer_msg) if ret_del.status != 0: errmsg = f"Failure deleting namespace {nsid} from {subsys}: {ret_del.error_message}" self.logger.error(errmsg) return -1 # recreate: loop on the list of dict 'list_ns_params' - for ns_params in list_ns_params: + for ns_params in list_ns_params: bdev_name = ns_params['bdev_name'] self.logger.info(f" Recreate nsid: {ns_params['nsid']} ") self.logger.info(f"ns params: {ns_params} ") ret_bdev = self.create_bdev( ana_id, bdev_name, self.bdev_params[bdev_name]['uuid'], self.bdev_params[bdev_name]['pool_name'], self.bdev_params[bdev_name]['image_name'], self.bdev_params[bdev_name]['block_size'], False, - self.bdev_params[bdev_name]['image_size']) + self.bdev_params[bdev_name]['image_size'], peer_msg) self.logger.info(f"bdev_rbd_create: {bdev_name}") if ret_bdev.status != 0: errmsg = f"Failure adding bdev {bdev_name} " @@ -1534,7 +1567,7 @@ def namespace_recycle_safe(self, ana_id) ->int: ret_ns = self.create_namespace(ns_params['subsys'], bdev_name, ns_params['nsid'], ana_id, self.bdev_params[bdev_name]['uuid'], None) if ret_ns.status != 0: try: - ret_del = self.delete_bdev(bdev_name) + ret_del = self.delete_bdev(bdev_name, peer_msg=peer_msg) if ret_del.status != 0: self.logger.warning(f"Failure {ret_del.status} deleting bdev {bdev_name}: {ret_del.error_message}") except Exception: @@ -1548,8 +1581,9 @@ def namespace_recycle_safe(self, ana_id) ->int: def namespace_delete_safe(self, request, context): """Delete a namespace.""" + peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to delete namespace {nsid_msg}from {request.subsystem_nqn}, context: {context}") + self.logger.info(f"Received request to delete namespace {nsid_msg}from {request.subsystem_nqn}, context: {context}{peer_msg}") with self.omap_lock(context=context): find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, request.uuid, False, @@ -1570,7 +1604,7 @@ def namespace_delete_safe(self, request, context): self.remove_namespace_from_state(request.subsystem_nqn, nsid, context) if bdev_name: - ret_del = self.delete_bdev(bdev_name) + ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg) if ret_del.status != 0: errmsg = f"Failure deleting namespace {nsid_msg}from {request.subsystem_nqn}: {ret_del.error_message}" self.logger.error(errmsg) @@ -1595,6 +1629,7 @@ def matching_host_exists(self, context, subsys_nqn, host_nqn) -> bool: def add_host_safe(self, request, context): """Adds a host to a subsystem.""" + peer_msg = self.get_peer_message(context) all_host_failure_prefix=f"Failure allowing open host access to {request.subsystem_nqn}" host_failure_prefix=f"Failure adding host {request.host_nqn} to {request.subsystem_nqn}" @@ -1642,22 +1677,22 @@ def add_host_safe(self, request, context): return pb2.req_status(status=errno.EEXIST, error_message=errmsg) if request.host_nqn == "*": # Allow any host access to subsystem - self.logger.info(f"Received request to allow any host access for {request.subsystem_nqn}, context: {context}") + self.logger.info(f"Received request to allow any host access for {request.subsystem_nqn}, context: {context}{peer_msg}") ret = rpc_nvmf.nvmf_subsystem_allow_any_host( self.spdk_rpc_client, nqn=request.subsystem_nqn, disable=False, ) - self.logger.info(f"add_host *: {ret}") + self.logger.debug(f"add_host *: {ret}") else: # Allow single host access to subsystem self.logger.info( - f"Received request to add host {request.host_nqn} to {request.subsystem_nqn}, context: {context}") + f"Received request to add host {request.host_nqn} to {request.subsystem_nqn}, context: {context}{peer_msg}") ret = rpc_nvmf.nvmf_subsystem_add_host( self.spdk_rpc_client, nqn=request.subsystem_nqn, host=request.host_nqn, ) - self.logger.info(f"add_host {request.host_nqn}: {ret}") + self.logger.debug(f"add_host {request.host_nqn}: {ret}") except Exception as ex: if request.host_nqn == "*": self.logger.exception(all_host_failure_prefix) @@ -1721,6 +1756,7 @@ def remove_host_from_state(self, subsystem_nqn, host_nqn, context): def remove_host_safe(self, request, context): """Removes a host from a subsystem.""" + peer_msg = self.get_peer_message(context) all_host_failure_prefix=f"Failure disabling open host access to {request.subsystem_nqn}" host_failure_prefix=f"Failure removing host {request.host_nqn} access from {request.subsystem_nqn}" @@ -1745,23 +1781,23 @@ def remove_host_safe(self, request, context): if request.host_nqn == "*": # Disable allow any host access self.logger.info( f"Received request to disable open host access to" - f" {request.subsystem_nqn}, context: {context}") + f" {request.subsystem_nqn}, context: {context}{peer_msg}") ret = rpc_nvmf.nvmf_subsystem_allow_any_host( self.spdk_rpc_client, nqn=request.subsystem_nqn, disable=True, ) - self.logger.info(f"remove_host *: {ret}") + self.logger.debug(f"remove_host *: {ret}") else: # Remove single host access to subsystem self.logger.info( f"Received request to remove host {request.host_nqn} access from" - f" {request.subsystem_nqn}, context: {context}") + f" {request.subsystem_nqn}, context: {context}{peer_msg}") ret = rpc_nvmf.nvmf_subsystem_remove_host( self.spdk_rpc_client, nqn=request.subsystem_nqn, host=request.host_nqn, ) - self.logger.info(f"remove_host {request.host_nqn}: {ret}") + self.logger.debug(f"remove_host {request.host_nqn}: {ret}") except Exception as ex: if request.host_nqn == "*": self.logger.exception(all_host_failure_prefix) @@ -1799,10 +1835,11 @@ def remove_host(self, request, context=None): def list_hosts_safe(self, request, context): """List hosts.""" - self.logger.info(f"Received request to list hosts for {request.subsystem}, context: {context}") + peer_msg = self.get_peer_message(context) + self.logger.info(f"Received request to list hosts for {request.subsystem}, context: {context}{peer_msg}") try: ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client, nqn=request.subsystem) - self.logger.info(f"list_hosts: {ret}") + self.logger.debug(f"list_hosts: {ret}") except Exception as ex: errmsg = f"Failure listing hosts, can't get subsystems" self.logger.exception(errmsg) @@ -1844,10 +1881,11 @@ def list_hosts(self, request, context=None): def list_connections_safe(self, request, context): """List connections.""" - self.logger.info(f"Received request to list connections for {request.subsystem}, context: {context}") + peer_msg = self.get_peer_message(context) + self.logger.info(f"Received request to list connections for {request.subsystem}, context: {context}{peer_msg}") try: qpair_ret = rpc_nvmf.nvmf_subsystem_get_qpairs(self.spdk_rpc_client, nqn=request.subsystem) - self.logger.info(f"list_connections get_qpairs: {qpair_ret}") + self.logger.debug(f"list_connections get_qpairs: {qpair_ret}") except Exception as ex: errmsg = f"Failure listing connections, can't get qpairs" self.logger.exception(errmsg) @@ -1861,7 +1899,7 @@ def list_connections_safe(self, request, context): try: ctrl_ret = rpc_nvmf.nvmf_subsystem_get_controllers(self.spdk_rpc_client, nqn=request.subsystem) - self.logger.info(f"list_connections get_controllers: {ctrl_ret}") + self.logger.debug(f"list_connections get_controllers: {ctrl_ret}") except Exception as ex: errmsg = f"Failure listing connections, can't get controllers" self.logger.exception(errmsg) @@ -1875,7 +1913,7 @@ def list_connections_safe(self, request, context): try: subsys_ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client, nqn=request.subsystem) - self.logger.info(f"list_connections subsystems: {subsys_ret}") + self.logger.debug(f"list_connections subsystems: {subsys_ret}") except Exception as ex: errmsg = f"Failure listing connections, can't get subsystems" self.logger.exception(errmsg) @@ -1965,7 +2003,7 @@ def get_subsystem_ha_status(self, nqn) -> bool: return False enable_ha = self.subsys_ha[nqn] - self.logger.info(f"Subsystem {nqn} enable_ha: {enable_ha}") + self.logger.debug(f"Subsystem {nqn} enable_ha: {enable_ha}") return enable_ha def matching_listener_exists(self, context, nqn, traddr, trsvcid) -> bool: @@ -1999,9 +2037,10 @@ def create_listener_safe(self, request, context): self.logger.error(f"{errmsg}") return pb2.req_status(status=errno.ENOKEY, error_message=errmsg) + peer_msg = self.get_peer_message(context) self.logger.info(f"Received request to create {request.host_name}" f" TCP {adrfam} listener for {request.nqn} at" - f" {traddr}:{request.trsvcid}, context: {context}") + f" {traddr}:{request.trsvcid}, context: {context}{peer_msg}") if GatewayUtils.is_discovery_nqn(request.nqn): errmsg=f"{create_listener_error_prefix}: Can't create a listener for a discovery subsystem" @@ -2030,7 +2069,7 @@ def create_listener_safe(self, request, context): trsvcid=str(request.trsvcid), adrfam=adrfam, ) - self.logger.info(f"create_listener: {ret}") + self.logger.debug(f"create_listener: {ret}") else: if context: errmsg=f"{create_listener_error_prefix}: Gateway's host name must match current host ({self.host_name})" @@ -2038,7 +2077,7 @@ def create_listener_safe(self, request, context): return pb2.req_status(status=errno.ENODEV, error_message=errmsg) else: errmsg=f"Listener not created as gateway's host name {self.host_name} differs from requested host {request.host_name}" - self.logger.info(f"{errmsg}") + self.logger.debug(f"{errmsg}") return pb2.req_status(status=0, error_message=errmsg) except Exception as ex: self.logger.exception(create_listener_error_prefix) @@ -2059,7 +2098,7 @@ def create_listener_safe(self, request, context): if enable_ha: try: - self.logger.info(f"create_listener nvmf_subsystem_listener_set_ana_state {request=} set inaccessible for all ana groups") + self.logger.debug(f"create_listener nvmf_subsystem_listener_set_ana_state {request=} set inaccessible for all ana groups") _ana_state = "inaccessible" ret = rpc_nvmf.nvmf_subsystem_listener_set_ana_state( self.spdk_rpc_client, @@ -2069,7 +2108,7 @@ def create_listener_safe(self, request, context): traddr=request.traddr, trsvcid=str(request.trsvcid), adrfam=adrfam) - self.logger.info(f"create_listener nvmf_subsystem_listener_set_ana_state response {ret=}") + self.logger.debug(f"create_listener nvmf_subsystem_listener_set_ana_state response {ret=}") # have been provided with ana state for this nqn prior to creation # update optimized ana groups @@ -2078,7 +2117,7 @@ def create_listener_safe(self, request, context): ana_grp = x+1 if ana_grp in self.ana_map[request.nqn] and self.ana_map[request.nqn][ana_grp] == pb2.ana_state.OPTIMIZED: _ana_state = "optimized" - self.logger.info(f"using ana_map: set listener on nqn : {request.nqn} ana state : {_ana_state} for group : {ana_grp}") + self.logger.debug(f"using ana_map: set listener on nqn : {request.nqn} ana state : {_ana_state} for group : {ana_grp}") ret = rpc_nvmf.nvmf_subsystem_listener_set_ana_state( self.spdk_rpc_client, nqn=request.nqn, @@ -2088,7 +2127,7 @@ def create_listener_safe(self, request, context): trsvcid=str(request.trsvcid), adrfam=adrfam, anagrpid=ana_grp ) - self.logger.info(f"create_listener nvmf_subsystem_listener_set_ana_state response {ret=}") + self.logger.debug(f"create_listener nvmf_subsystem_listener_set_ana_state response {ret=}") except Exception as ex: errmsg=f"{create_listener_error_prefix}: Error setting ANA state" @@ -2182,12 +2221,13 @@ def delete_listener_safe(self, request, context): self.logger.error(errmsg) return pb2.req_status(status=errno.ENOKEY, error_message=errmsg) + peer_msg = self.get_peer_message(context) force_msg = " forcefully" if request.force else "" host_msg = "all hosts" if request.host_name == "*" else f"host {request.host_name}" self.logger.info(f"Received request to delete TCP listener of {host_msg}" f" for subsystem {request.nqn} at" - f" {traddr}:{request.trsvcid}{force_msg}, context: {context}") + f" {traddr}:{request.trsvcid}{force_msg}, context: {context}{peer_msg}") if request.host_name == "*" and not request.force: errmsg=f"{delete_listener_error_prefix}. Must use the \"--force\" parameter when setting the host name to \"*\"." @@ -2228,7 +2268,7 @@ def delete_listener_safe(self, request, context): trsvcid=str(request.trsvcid), adrfam=adrfam, ) - self.logger.info(f"delete_listener: {ret}") + self.logger.debug(f"delete_listener: {ret}") else: errmsg=f"{delete_listener_error_prefix}. Gateway's host name must match current host ({self.host_name}). You can continue to delete the listener by adding the `--force` parameter." self.logger.error(f"{errmsg}") @@ -2264,7 +2304,8 @@ def delete_listener(self, request, context=None): def list_listeners_safe(self, request, context): """List listeners.""" - self.logger.info(f"Received request to list listeners for {request.subsystem}, context: {context}") + peer_msg = self.get_peer_message(context) + self.logger.info(f"Received request to list listeners for {request.subsystem}, context: {context}{peer_msg}") listeners = [] with self.omap_lock(context=context): @@ -2297,13 +2338,14 @@ def list_listeners(self, request, context=None): def list_subsystems_safe(self, request, context): """List subsystems.""" + peer_msg = self.get_peer_message(context) ser_msg = "" if request.serial_number: ser_msg = f" with serial number {request.serial_number}" if request.subsystem_nqn: - self.logger.info(f"Received request to list subsystem {request.subsystem_nqn}, context: {context}") + self.logger.info(f"Received request to list subsystem {request.subsystem_nqn}, context: {context}{peer_msg}") else: - self.logger.info(f"Received request to list the subsystem{ser_msg}, context: {context}") + self.logger.info(f"Received request to list the subsystem{ser_msg}, context: {context}{peer_msg}") subsystems = [] try: @@ -2311,7 +2353,7 @@ def list_subsystems_safe(self, request, context): ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client, nqn=request.subsystem_nqn) else: ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client) - self.logger.info(f"list_subsystems: {ret}") + self.logger.debug(f"list_subsystems: {ret}") except Exception as ex: errmsg = f"Failure listing subsystems" self.logger.exception(errmsg) @@ -2347,11 +2389,12 @@ def list_subsystems_safe(self, request, context): def get_subsystems_safe(self, request, context): """Gets subsystems.""" - self.logger.info(f"Received request to get subsystems, context: {context}") + peer_msg = self.get_peer_message(context) + self.logger.info(f"Received request to get subsystems, context: {context}{peer_msg}") subsystems = [] try: ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client) - self.logger.info(f"get_subsystems: {ret}") + self.logger.debug(f"get_subsystems: {ret}") except Exception as ex: self.logger.exception(f"get_subsystems failed") context.set_code(grpc.StatusCode.INTERNAL) @@ -2388,7 +2431,8 @@ def list_subsystems(self, request, context=None): def get_spdk_nvmf_log_flags_and_level_safe(self, request, context): """Gets spdk nvmf log flags, log level and log print level""" - self.logger.info(f"Received request to get SPDK nvmf log flags and level") + peer_msg = self.get_peer_message(context) + self.logger.info(f"Received request to get SPDK nvmf log flags and level{peer_msg}") log_flags = [] with self.omap_lock(context=context): try: @@ -2399,7 +2443,7 @@ def get_spdk_nvmf_log_flags_and_level_safe(self, request, context): log_flags.append(pb2_log_flag) spdk_log_level = rpc_log.log_get_level(self.spdk_rpc_client) spdk_log_print_level = rpc_log.log_get_print_level(self.spdk_rpc_client) - self.logger.info(f"spdk log flags: {nvmf_log_flags}, " + self.logger.debug(f"spdk log flags: {nvmf_log_flags}, " f"spdk log level: {spdk_log_level}, " f"spdk log print level: {spdk_log_print_level}") except Exception as ex: @@ -2430,6 +2474,7 @@ def set_spdk_nvmf_logs_safe(self, request, context): ret_log = False ret_print = False + peer_msg = self.get_peer_message(context) if request.HasField("log_level"): log_level = GatewayEnumUtils.get_key_from_value(pb2.LogLevel, request.log_level) if log_level == None: @@ -2444,7 +2489,7 @@ def set_spdk_nvmf_logs_safe(self, request, context): self.logger.error(f"{errmsg}") return pb2.req_status(status=errno.ENOKEY, error_message=errmsg) - self.logger.info(f"Received request to set SPDK nvmf logs: log_level: {log_level}, print_level: {print_level}") + self.logger.info(f"Received request to set SPDK nvmf logs: log_level: {log_level}, print_level: {print_level}{peer_msg}") with self.omap_lock(context=context): try: @@ -2452,14 +2497,14 @@ def set_spdk_nvmf_logs_safe(self, request, context): if key.startswith('nvmf')] ret = [rpc_log.log_set_flag( self.spdk_rpc_client, flag=flag) for flag in nvmf_log_flags] - self.logger.info(f"Set SPDK nvmf log flags {nvmf_log_flags} to TRUE: {ret}") + self.logger.debug(f"Set SPDK nvmf log flags {nvmf_log_flags} to TRUE: {ret}") if log_level != None: ret_log = rpc_log.log_set_level(self.spdk_rpc_client, level=log_level) - self.logger.info(f"Set log level to {log_level}: {ret_log}") + self.logger.debug(f"Set log level to {log_level}: {ret_log}") if print_level != None: ret_print = rpc_log.log_set_print_level( self.spdk_rpc_client, level=print_level) - self.logger.info(f"Set log print level to {print_level}: {ret_print}") + self.logger.debug(f"Set log print level to {print_level}: {ret_print}") except Exception as ex: errmsg="Failure setting SPDK log levels" self.logger.exception(errmsg) @@ -2491,7 +2536,8 @@ def set_spdk_nvmf_logs(self, request, context=None): def disable_spdk_nvmf_logs_safe(self, request, context): """Disables spdk nvmf logs""" - self.logger.info(f"Received request to disable SPDK nvmf logs") + peer_msg = self.get_peer_message(context) + self.logger.info(f"Received request to disable SPDK nvmf logs{peer_msg}") with self.omap_lock(context=context): try: @@ -2540,7 +2586,8 @@ def parse_version(self, version): def get_gateway_info_safe(self, request, context): """Get gateway's info""" - self.logger.info(f"Received request to get gateway's info") + peer_msg = self.get_peer_message(context) + self.logger.info(f"Received request to get gateway's info{peer_msg}") gw_version_string = os.getenv("NVMEOF_VERSION") spdk_version_string = os.getenv("NVMEOF_SPDK_VERSION") cli_version_string = request.cli_version @@ -2577,7 +2624,7 @@ def get_gateway_info_safe(self, request, context): elif not cli_ver: self.logger.warning(f"Invalid CLI version {cli_version_string}, can't check version compatibility") if ret.status == 0: - log_func = self.logger.info + log_func = self.logger.debug else: log_func = self.logger.error log_func(f"Gateway's info:\n{ret}") @@ -2589,18 +2636,20 @@ def get_gateway_info(self, request, context=None): def get_gateway_log_level(self, request, context=None): """Get gateway's log level""" + peer_msg = self.get_peer_message(context) try: log_level = GatewayEnumUtils.get_key_from_value(pb2.GwLogLevel, self.logger.level) except Exception: self.logger.exception(f"Can't get string value for log level {self.logger.level}") return pb2.gateway_log_level_info(status = errno.ENOKEY, error_message=f"Invalid gateway log level") - self.logger.info(f"Received request to get gateway's log level. Level is {log_level}") + self.logger.info(f"Received request to get gateway's log level. Level is {log_level}{peer_msg}") return pb2.gateway_log_level_info(status = 0, error_message=os.strerror(0), log_level=log_level) def set_gateway_log_level(self, request, context=None): """Set gateway's log level""" + peer_msg = self.get_peer_message(context) log_level = GatewayEnumUtils.get_key_from_value(pb2.GwLogLevel, request.log_level) if log_level == None: errmsg=f"Unknown log level {request.log_level}" @@ -2608,7 +2657,7 @@ def set_gateway_log_level(self, request, context=None): return pb2.req_status(status=errno.ENOKEY, error_message=errmsg) log_level = log_level.upper() - self.logger.info(f"Received request to set gateway's log level to {log_level}") + self.logger.info(f"Received request to set gateway's log level to {log_level}{peer_msg}") self.gw_logger_object.set_log_level(request.log_level) try: