diff --git a/.env b/.env index 74e76046..b04f7f6e 100644 --- a/.env +++ b/.env @@ -58,8 +58,8 @@ SPDK_CENTOS_REPO_VER="9.0-21.el9" # Ceph Cluster CEPH_CLUSTER_VERSION="${CEPH_VERSION}" -CEPH_BRANCH=wip-baum-20240725-00-centos9-only -CEPH_SHA=0ec90b1e61a7489b13d6d8432156a0417f35db7f +CEPH_BRANCH=wip-baum-main-202258fa-20240805-nvmeof-enabled +CEPH_SHA=2ef08fc34dbdd6cc5a59f45582bfe959b1940161 CEPH_DEVEL_MGR_PATH=../ceph diff --git a/control/grpc.py b/control/grpc.py index 55ea9303..6d485ad1 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -15,6 +15,7 @@ import os import errno import contextlib +import threading import time from typing import Callable from collections import defaultdict @@ -113,9 +114,14 @@ class GatewayService(pb2_grpc.GatewayServicer): gateway_name: Gateway identifier gateway_state: Methods for target state persistence spdk_rpc_client: Client of SPDK RPC server + spdk_rpc_subsystems_client: Client of SPDK RPC server for get_subsystems + spdk_rpc_subsystems_lock: Mutex to hold while using get subsystems SPDK client + shared_state_lock: guard mutex for bdev_cluster and cluster_nonce + subsystem_nsid_bdev_and_uuid: map of nsid to bdev + cluster_nonce: cluster context nonce map """ - def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rpc_lock, omap_lock: OmapLock, group_id: int, spdk_rpc_client, ceph_utils: CephUtils) -> None: + def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rpc_lock, omap_lock: OmapLock, group_id: int, spdk_rpc_client, spdk_rpc_subsystems_client, ceph_utils: CephUtils) -> None: """Constructor""" self.gw_logger_object = GatewayLogger(config) self.logger = self.gw_logger_object.logger @@ -167,6 +173,9 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp self.omap_lock = omap_lock self.group_id = group_id self.spdk_rpc_client = spdk_rpc_client + self.spdk_rpc_subsystems_client = spdk_rpc_subsystems_client + self.spdk_rpc_subsystems_lock = threading.Lock() + self.shared_state_lock = threading.Lock() self.gateway_name = self.config.get("gateway", "name") if not self.gateway_name: self.gateway_name = socket.gethostname() @@ -291,8 +300,9 @@ def _alloc_cluster(self, anagrp: int) -> str: user = self.rados_id, core_mask = self.librbd_core_mask, ) - self.logger.info(f"Allocated cluster {name=} {nonce=} {anagrp=}") - self.cluster_nonce[name] = nonce + with self.shared_state_lock: + self.logger.info(f"Allocated cluster {name=} {nonce=} {anagrp=}") + self.cluster_nonce[name] = nonce return name def _grpc_function_with_lock(self, func, request, context): @@ -370,7 +380,8 @@ def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, bl block_size=block_size, uuid=uuid, ) - self.bdev_cluster[name] = cluster_name + with self.shared_state_lock: + self.bdev_cluster[name] = cluster_name self.bdev_params[name] = {'uuid':uuid, 'pool_name':rbd_pool_name, 'image_name':rbd_image_name, 'image_size':rbd_image_size, 'block_size': block_size} self.logger.debug(f"bdev_rbd_create: {bdev_name}, cluster_name {cluster_name}") @@ -464,8 +475,10 @@ def delete_bdev(self, bdev_name, recycling_mode=False, peer_msg=""): ) if not recycling_mode: del self.bdev_params[bdev_name] - self.logger.debug(f"to delete_bdev {bdev_name} cluster {self.bdev_cluster[bdev_name]} ") - self._put_cluster(self.bdev_cluster[bdev_name]) + with self.shared_state_lock: + cluster = self.bdev_cluster[bdev_name] + self.logger.debug(f"to delete_bdev {bdev_name} cluster {cluster} ") + self._put_cluster(cluster) self.logger.debug(f"delete_bdev {bdev_name}: {ret}") except Exception as ex: errmsg = f"Failure deleting bdev {bdev_name}" @@ -2439,7 +2452,7 @@ def get_subsystems_safe(self, request, context): self.logger.debug(f"Received request to get subsystems, context: {context}{peer_msg}") subsystems = [] try: - ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client) + ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_subsystems_client) except Exception as ex: self.logger.exception(f"get_subsystems failed") context.set_code(grpc.StatusCode.INTERNAL) @@ -2448,19 +2461,13 @@ def get_subsystems_safe(self, request, context): for s in ret: try: - nqn = s["nqn"] ns_key = "namespaces" - ns_list = [] if ns_key in s: - ns_list = s[ns_key] - if not ns_list: - self.subsystem_nsid_bdev_and_uuid.remove_namespace(nqn) - for n in ns_list: - nsid = n["nsid"] - uuid = n["uuid"] - bdev = n["bdev_name"] - nonce = self.cluster_nonce[self.bdev_cluster[bdev]] - n["nonce"] = nonce + for n in s[ns_key]: + bdev = n["bdev_name"] + with self.shared_state_lock: + nonce = self.cluster_nonce[self.bdev_cluster[bdev]] + n["nonce"] = nonce # Parse the JSON dictionary into the protobuf message subsystem = pb2.subsystem() json_format.Parse(json.dumps(s), subsystem, ignore_unknown_fields=True) @@ -2472,7 +2479,7 @@ def get_subsystems_safe(self, request, context): return pb2.subsystems_info(subsystems=subsystems) def get_subsystems(self, request, context): - with self.rpc_lock: + with self.spdk_rpc_subsystems_lock: return self.get_subsystems_safe(request, context) def list_subsystems(self, request, context=None): diff --git a/control/server.py b/control/server.py index 6a30536c..a9836c98 100644 --- a/control/server.py +++ b/control/server.py @@ -64,6 +64,7 @@ class GatewayServer: server: gRPC server instance to receive gateway client requests spdk_rpc_client: Client of SPDK RPC server spdk_rpc_ping_client: Ping client of SPDK RPC server + spdk_rpc_subsystems_client: subsystems client of SPDK RPC server spdk_process: Subprocess running SPDK NVMEoF target application discovery_pid: Subprocess running Ceph nvmeof discovery service """ @@ -171,7 +172,7 @@ def serve(self): # Register service implementation with server gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller, f"gateway-{self.name}") omap_lock = OmapLock(omap_state, gateway_state, self.rpc_lock) - self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, omap_lock, self.group_id, self.spdk_rpc_client, self.ceph_utils) + self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, omap_lock, self.group_id, self.spdk_rpc_client, self.spdk_rpc_subsystems_client, self.ceph_utils) self.server = self._grpc_server(self._gateway_address()) pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server) @@ -400,6 +401,13 @@ def _start_spdk(self, omap_state): log_level=protocol_log_level, conn_retries=conn_retries, ) + self.spdk_rpc_subsystems_client = rpc_client.JSONRPCClient( + self.spdk_rpc_socket_path, + None, + timeout, + log_level=protocol_log_level, + conn_retries=conn_retries, + ) except Exception: self.logger.exception(f"Unable to initialize SPDK") raise