Skip to content

Commit

Permalink
grpc: allow get_subsystem concurrent to other grpc requests
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Indenbaum <[email protected]>
  • Loading branch information
Alexander Indenbaum committed Aug 29, 2024
1 parent de77dac commit 54a8030
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 61 deletions.
87 changes: 27 additions & 60 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import os
import errno
import contextlib
import threading
import time
from typing import Callable
from collections import defaultdict
Expand Down Expand Up @@ -150,9 +151,14 @@ class GatewayService(pb2_grpc.GatewayServicer):
gateway_name: Gateway identifier
gateway_state: Methods for target state persistence
spdk_rpc_client: Client of SPDK RPC server
spdk_rpc_subsystems_client: Client of SPDK RPC server for get_subsystems
spdk_rpc_subsystems_lock: Mutex to hold while using get subsystems SPDK client
shared_state_lock: guard mutex for bdev_cluster and cluster_nonce
subsystem_nsid_bdev_and_uuid: map of nsid to bdev
cluster_nonce: cluster context nonce map
"""

def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rpc_lock, omap_lock: OmapLock, group_id: int, spdk_rpc_client, ceph_utils: CephUtils) -> None:
def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rpc_lock, omap_lock: OmapLock, group_id: int, spdk_rpc_client, spdk_rpc_subsystems_client, ceph_utils: CephUtils) -> None:
"""Constructor"""
self.gw_logger_object = GatewayLogger(config)
self.logger = self.gw_logger_object.logger
Expand Down Expand Up @@ -205,6 +211,9 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self.omap_lock = omap_lock
self.group_id = group_id
self.spdk_rpc_client = spdk_rpc_client
self.spdk_rpc_subsystems_client = spdk_rpc_subsystems_client
self.spdk_rpc_subsystems_lock = threading.Lock()
self.shared_state_lock = threading.Lock()
self.gateway_name = self.config.get("gateway", "name")
if not self.gateway_name:
self.gateway_name = socket.gethostname()
Expand Down Expand Up @@ -367,8 +376,9 @@ def _alloc_cluster(self, anagrp: int) -> str:
user = self.rados_id,
core_mask = self.librbd_core_mask,
)
self.logger.info(f"Allocated cluster {name=} {nonce=} {anagrp=}")
self.cluster_nonce[name] = nonce
with self.shared_state_lock:
self.logger.info(f"Allocated cluster {name=} {nonce=} {anagrp=}")
self.cluster_nonce[name] = nonce
return name

def _grpc_function_with_lock(self, func, request, context):
Expand Down Expand Up @@ -446,7 +456,8 @@ def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, bl
block_size=block_size,
uuid=uuid,
)
self.bdev_cluster[name] = cluster_name
with self.shared_state_lock:
self.bdev_cluster[name] = cluster_name
self.bdev_params[name] = {'uuid':uuid, 'pool_name':rbd_pool_name, 'image_name':rbd_image_name, 'image_size':rbd_image_size, 'block_size': block_size}

self.logger.debug(f"bdev_rbd_create: {bdev_name}, cluster_name {cluster_name}")
Expand Down Expand Up @@ -540,8 +551,10 @@ def delete_bdev(self, bdev_name, recycling_mode=False, peer_msg=""):
)
if not recycling_mode:
del self.bdev_params[bdev_name]
self.logger.debug(f"to delete_bdev {bdev_name} cluster {self.bdev_cluster[bdev_name]} ")
self._put_cluster(self.bdev_cluster[bdev_name])
with self.shared_state_lock:
cluster = self.bdev_cluster[bdev_name]
self.logger.debug(f"to delete_bdev {bdev_name} cluster {cluster} ")
self._put_cluster(cluster)
self.logger.debug(f"delete_bdev {bdev_name}: {ret}")
except Exception as ex:
errmsg = f"Failure deleting bdev {bdev_name}"
Expand Down Expand Up @@ -2582,7 +2595,7 @@ def get_subsystems_safe(self, request, context):
self.logger.debug(f"Received request to get subsystems, context: {context}{peer_msg}")
subsystems = []
try:
ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client)
ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_subsystems_client)
except Exception as ex:
self.logger.exception(f"get_subsystems failed")
context.set_code(grpc.StatusCode.INTERNAL)
Expand All @@ -2591,71 +2604,25 @@ def get_subsystems_safe(self, request, context):

for s in ret:
try:
nqn = s["nqn"]
ns_key = "namespaces"
ns_list = []
if ns_key in s:
ns_list = s[ns_key]
if not ns_list:
self.subsystem_nsid_bdev_and_uuid.remove_namespace(nqn)
for n in ns_list:
nsid = n["nsid"]
uuid = n["uuid"]
bdev = n["bdev_name"]
nonce = self.cluster_nonce[self.bdev_cluster[bdev]]
n["nonce"] = nonce
for n in s[ns_key]:
bdev = n["bdev_name"]
with self.shared_state_lock:
nonce = self.cluster_nonce[self.bdev_cluster[bdev]]
n["nonce"] = nonce
# Parse the JSON dictionary into the protobuf message
subsystem = pb2.subsystem()
json_format.Parse(json.dumps(s), subsystem, ignore_unknown_fields=True)
psk_hosts = []
saw_psk = False
# if now host nqn is passed, just check if there is any psk host in subsystem
if self.host_info.is_psk_host(nqn):
for h in subsystem.hosts:
psk_val = False
if self.host_info.is_psk_host(nqn, h.nqn):
psk_val = True
saw_psk = True
psk_hosts.append(pb2.host(nqn=h.nqn, use_psk = psk_val))

secure_listeners = []
saw_secure = False
if nqn in self.subsystem_listeners:
for lstnr in subsystem.listen_addresses:
secure_val = False
# We get the address family as IPv4 and the port as a string, we need to adjust to internal form
if (lstnr.adrfam.lower(), lstnr.traddr, int(lstnr.trsvcid), True) in self.subsystem_listeners[nqn]:
saw_secure = True
secure_val = True
secure_listeners.append(pb2.listen_address(trtype=lstnr.trtype,
adrfam=lstnr.adrfam.lower(),
traddr=lstnr.traddr,
trsvcid=lstnr.trsvcid,
transport=lstnr.transport,
secure=secure_val))

if saw_psk or saw_secure:
secure_subsystem = pb2.subsystem(nqn = subsystem.nqn,
subtype = subsystem.subtype,
listen_addresses = secure_listeners,
hosts = psk_hosts,
allow_any_host = subsystem.allow_any_host,
serial_number = subsystem.serial_number,
max_namespaces = subsystem.max_namespaces,
min_cntlid = subsystem.min_cntlid,
max_cntlid = subsystem.max_cntlid,
namespaces = subsystem.namespaces)
subsystems.append(secure_subsystem)
else:
subsystems.append(subsystem)
subsystems.append(subsystem)
except Exception:
self.logger.exception(f"{s=} parse error")
pass

return pb2.subsystems_info(subsystems=subsystems)

def get_subsystems(self, request, context):
with self.rpc_lock:
with self.spdk_rpc_subsystems_lock:
return self.get_subsystems_safe(request, context)

def list_subsystems(self, request, context=None):
Expand Down
10 changes: 9 additions & 1 deletion control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class GatewayServer:
server: gRPC server instance to receive gateway client requests
spdk_rpc_client: Client of SPDK RPC server
spdk_rpc_ping_client: Ping client of SPDK RPC server
spdk_rpc_subsystems_client: subsystems client of SPDK RPC server
spdk_process: Subprocess running SPDK NVMEoF target application
discovery_pid: Subprocess running Ceph nvmeof discovery service
"""
Expand Down Expand Up @@ -174,7 +175,7 @@ def serve(self):
# Register service implementation with server
gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller, f"gateway-{self.name}")
self.omap_lock = OmapLock(omap_state, gateway_state, self.rpc_lock)
self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, self.omap_lock, self.group_id, self.spdk_rpc_client, self.ceph_utils)
self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, self.omap_lock, self.group_id, self.spdk_rpc_client, self.spdk_rpc_subsystems_client, self.ceph_utils)
self.server = self._grpc_server(self._gateway_address())
pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server)

Expand Down Expand Up @@ -389,6 +390,13 @@ def _start_spdk(self, omap_state):
log_level=protocol_log_level,
conn_retries=conn_retries,
)
self.spdk_rpc_subsystems_client = rpc_client.JSONRPCClient(
self.spdk_rpc_socket_path,
None,
timeout,
log_level=protocol_log_level,
conn_retries=conn_retries,
)
except Exception:
self.logger.exception(f"Unable to initialize SPDK")
raise
Expand Down

0 comments on commit 54a8030

Please sign in to comment.