diff --git a/.env b/.env index 74e76046..b04f7f6e 100644 --- a/.env +++ b/.env @@ -58,8 +58,8 @@ SPDK_CENTOS_REPO_VER="9.0-21.el9" # Ceph Cluster CEPH_CLUSTER_VERSION="${CEPH_VERSION}" -CEPH_BRANCH=wip-baum-20240725-00-centos9-only -CEPH_SHA=0ec90b1e61a7489b13d6d8432156a0417f35db7f +CEPH_BRANCH=wip-baum-main-202258fa-20240805-nvmeof-enabled +CEPH_SHA=2ef08fc34dbdd6cc5a59f45582bfe959b1940161 CEPH_DEVEL_MGR_PATH=../ceph diff --git a/control/grpc.py b/control/grpc.py index ee880cd0..9cf0c51f 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -15,6 +15,7 @@ import os import errno import contextlib +import threading import time from typing import Callable from collections import defaultdict @@ -67,9 +68,14 @@ class GatewayService(pb2_grpc.GatewayServicer): gateway_name: Gateway identifier gateway_state: Methods for target state persistence spdk_rpc_client: Client of SPDK RPC server + spdk_rpc_subsystems_client: Client of SPDK RPC server for get_subsystems + spdk_rpc_subsystems_lock: Mutex to hold while using get subsystems SPDK client + shared_state_lock: guard mutex for subsystem_nsid_bdev and cluster_nonce + subsystem_nsid_bdev: map of nsid to bdev + cluster_nonce: cluster context nonce map """ - def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rpc_lock, omap_lock: OmapLock, group_id: int, spdk_rpc_client, ceph_utils: CephUtils) -> None: + def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rpc_lock, omap_lock: OmapLock, group_id: int, spdk_rpc_client, spdk_rpc_subsystems_client, ceph_utils: CephUtils) -> None: """Constructor""" self.gw_logger_object = GatewayLogger(config) self.logger = self.gw_logger_object.logger @@ -121,6 +127,9 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp self.omap_lock = omap_lock self.group_id = group_id self.spdk_rpc_client = spdk_rpc_client + self.spdk_rpc_subsystems_client = spdk_rpc_subsystems_client + self.spdk_rpc_subsystems_lock = threading.Lock() + self.shared_state_lock = threading.Lock() self.gateway_name = self.config.get("gateway", "name") if not self.gateway_name: self.gateway_name = socket.gethostname() @@ -245,8 +254,9 @@ def _alloc_cluster(self, anagrp: int) -> str: user = self.rados_id, core_mask = self.librbd_core_mask, ) - self.logger.info(f"Allocated cluster {name=} {nonce=} {anagrp=}") - self.cluster_nonce[name] = nonce + with self.shared_state_lock: + self.logger.info(f"Allocated cluster {name=} {nonce=} {anagrp=}") + self.cluster_nonce[name] = nonce return name def _grpc_function_with_lock(self, func, request, context): @@ -788,8 +798,9 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, conte anagrpid=anagrpid, uuid=uuid, ) - self.subsystem_nsid_bdev[subsystem_nqn][nsid] = bdev_name - self.logger.debug(f"subsystem_add_ns: {nsid}") + with self.shared_state_lock: + self.subsystem_nsid_bdev[subsystem_nqn][nsid] = bdev_name + self.logger.debug(f"subsystem_add_ns: {nsid}") except Exception as ex: self.logger.exception(add_namespace_error_prefix) errmsg = f"{add_namespace_error_prefix}:\n{ex}" @@ -2440,7 +2451,7 @@ def get_subsystems_safe(self, request, context): self.logger.debug(f"Received request to get subsystems, context: {context}{peer_msg}") subsystems = [] try: - ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client) + ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_subsystems_client) except Exception as ex: self.logger.exception(f"get_subsystems failed") context.set_code(grpc.StatusCode.INTERNAL) @@ -2454,8 +2465,9 @@ def get_subsystems_safe(self, request, context): for n in s[ns_key]: nqn = s["nqn"] nsid = n["nsid"] - bdev = self.subsystem_nsid_bdev[nqn][nsid] - nonce = self.cluster_nonce[self.bdev_cluster[bdev]] + with self.shared_state_lock: + bdev = self.subsystem_nsid_bdev[nqn][nsid] + nonce = self.cluster_nonce[self.bdev_cluster[bdev]] n["nonce"] = nonce # Parse the JSON dictionary into the protobuf message subsystem = pb2.subsystem() @@ -2468,7 +2480,7 @@ def get_subsystems_safe(self, request, context): return pb2.subsystems_info(subsystems=subsystems) def get_subsystems(self, request, context): - with self.rpc_lock: + with self.spdk_rpc_subsystems_lock: return self.get_subsystems_safe(request, context) def list_subsystems(self, request, context=None): diff --git a/control/server.py b/control/server.py index 6a30536c..a9836c98 100644 --- a/control/server.py +++ b/control/server.py @@ -64,6 +64,7 @@ class GatewayServer: server: gRPC server instance to receive gateway client requests spdk_rpc_client: Client of SPDK RPC server spdk_rpc_ping_client: Ping client of SPDK RPC server + spdk_rpc_subsystems_client: subsystems client of SPDK RPC server spdk_process: Subprocess running SPDK NVMEoF target application discovery_pid: Subprocess running Ceph nvmeof discovery service """ @@ -171,7 +172,7 @@ def serve(self): # Register service implementation with server gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller, f"gateway-{self.name}") omap_lock = OmapLock(omap_state, gateway_state, self.rpc_lock) - self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, omap_lock, self.group_id, self.spdk_rpc_client, self.ceph_utils) + self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, omap_lock, self.group_id, self.spdk_rpc_client, self.spdk_rpc_subsystems_client, self.ceph_utils) self.server = self._grpc_server(self._gateway_address()) pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server) @@ -400,6 +401,13 @@ def _start_spdk(self, omap_state): log_level=protocol_log_level, conn_retries=conn_retries, ) + self.spdk_rpc_subsystems_client = rpc_client.JSONRPCClient( + self.spdk_rpc_socket_path, + None, + timeout, + log_level=protocol_log_level, + conn_retries=conn_retries, + ) except Exception: self.logger.exception(f"Unable to initialize SPDK") raise