diff --git a/ceph-nvmeof.conf b/ceph-nvmeof.conf index 07666a58..41e6f6b2 100644 --- a/ceph-nvmeof.conf +++ b/ceph-nvmeof.conf @@ -15,8 +15,6 @@ port = 5500 enable_auth = False state_update_notify = True state_update_interval_sec = 5 -#min_controller_id = 1 -#max_controller_id = 65519 enable_spdk_discovery_controller = False #omap_file_lock_duration = 60 #omap_file_lock_retries = 15 diff --git a/control/grpc.py b/control/grpc.py index 732c7cda..49c63686 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -16,19 +16,31 @@ import os import threading import errno +from typing import Callable import spdk.rpc.bdev as rpc_bdev import spdk.rpc.nvmf as rpc_nvmf import spdk.rpc.log as rpc_log from google.protobuf import json_format +from google.protobuf.empty_pb2 import Empty from .proto import gateway_pb2 as pb2 from .proto import gateway_pb2_grpc as pb2_grpc +from .proto import monitor_pb2 +from .proto import monitor_pb2_grpc from .config import GatewayConfig from .discovery import DiscoveryService from .state import GatewayState, GatewayStateHandler, OmapLock -MAX_ANA_GROUPS = 4 +MAX_ANA_GROUPS = 32 # should match nvmeof gateway monitor ceph c++ code + +class MonitorService(monitor_pb2_grpc.MonitorServicer): + def __init__(self, set_group_id: Callable[[int], None]) -> None: + self.set_group_id = set_group_id + + def group_id(self, request: monitor_pb2.group_id_req, context = None) -> Empty: + self.set_group_id(request.id) + return Empty() class GatewayService(pb2_grpc.GatewayServicer): """Implements gateway service interface. @@ -43,7 +55,7 @@ class GatewayService(pb2_grpc.GatewayServicer): spdk_rpc_client: Client of SPDK RPC server """ - def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, omap_lock: OmapLock, spdk_rpc_client) -> None: + def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, omap_lock: OmapLock, group_id: int, spdk_rpc_client) -> None: """Constructor""" self.logger = logging.getLogger(__name__) ver = os.getenv("NVMEOF_VERSION") @@ -75,6 +87,7 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, om self.rpc_lock = threading.Lock() self.gateway_state = gateway_state self.omap_lock = omap_lock + self.group_id = group_id self.spdk_rpc_client = spdk_rpc_client self.gateway_name = self.config.get("gateway", "name") if not self.gateway_name: @@ -309,8 +322,10 @@ def create_subsystem_safe(self, request, context=None): if self.is_discovery_nqn(request.subsystem_nqn): raise Exception(f"Can't create a discovery subsystem") - min_cntlid = self.config.getint_with_default("gateway", "min_controller_id", 1) - max_cntlid = self.config.getint_with_default("gateway", "max_controller_id", 65519) + # Assuming max of 32 gateways and protocol min 1 max 65519 + offset = (self.group_id - 1) * 2040 + min_cntlid = offset + 1 + max_cntlid = offset + 2040 if not request.serial_number: random.seed() randser = random.randint(2, 99999999999999) diff --git a/control/server.py b/control/server.py index 9762a9e8..c1b21253 100644 --- a/control/server.py +++ b/control/server.py @@ -16,6 +16,7 @@ import json import logging import signal +import threading from concurrent import futures from google.protobuf import json_format @@ -25,10 +26,14 @@ from .proto import gateway_pb2 as pb2 from .proto import gateway_pb2_grpc as pb2_grpc +from .proto import monitor_pb2 +from .proto import monitor_pb2_grpc from .state import GatewayState, LocalGatewayState, OmapLock, OmapGatewayState, GatewayStateHandler -from .grpc import GatewayService +from .grpc import GatewayService, MonitorService from .discovery import DiscoveryService from .config import GatewayConfig +from .grpc import MAX_ANA_GROUPS +from typing import Callable def sigchld_handler(signum, frame): """Handle SIGCHLD, runs when a spdk process terminates.""" @@ -68,6 +73,7 @@ def __init__(self, config: GatewayConfig): self.server = None self.discovery_pid = None self.spdk_rpc_socket_path = None + self.monitor_event = threading.Event() self.name = self.config.get("gateway", "name") if not self.name: @@ -99,6 +105,25 @@ def __exit__(self, exc_type, exc_value, traceback): self.logger.info("Exiting the gateway process.") + def set_group_id(self, id: int): + self.logger.info(f"Gateway {self.name} group {id=}") + assert id >= 1 and id <= MAX_ANA_GROUPS + self.group_id = id + self.monitor_event.set() + + def _wait_for_group_id(self): + """Waits for the monitor notification of this gatway's group id""" + self.monitor_server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) + monitor_pb2_grpc.add_MonitorServicer_to_server(MonitorService(self.set_group_id), self.monitor_server) + self.monitor_server.add_insecure_port(self._monitor_address()) + self.monitor_server.start() + self.logger.info(f"Monitor server is listening on {self._monitor_address()} for group id") + self.monitor_event.wait() + self.monitor_event = None + self.logger.info("Stopping the monitor server...") + self.monitor_server.stop(None) + self.monitor_server = None + def serve(self): """Starts gateway server.""" self.logger.debug("Starting serve") @@ -109,6 +134,9 @@ def serve(self): # install SIGCHLD handler signal.signal(signal.SIGCHLD, sigchld_handler) + # wait for monitor notification of the group id + self._wait_for_group_id() + # Start SPDK self._start_spdk(omap_state) @@ -118,7 +146,7 @@ def serve(self): # Register service implementation with server gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller) omap_lock = OmapLock(omap_state, gateway_state) - self.gateway_rpc = GatewayService(self.config, gateway_state, omap_lock, self.spdk_rpc_client) + self.gateway_rpc = GatewayService(self.config, gateway_state, omap_lock, self.group_id, self.spdk_rpc_client) self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server) @@ -190,6 +218,14 @@ def _gateway_address(self): gateway_addr = GatewayConfig.escape_address_if_ipv6(gateway_addr) return "{}:{}".format(gateway_addr, gateway_port) + def _monitor_address(self): + """Calculate gateway gRPC address string.""" + monitor_addr = self.config.get("gateway", "addr") + monitor_port = self.config.get("gateway", "port") - 1 + # We need to enclose IPv6 addresses in brackets before concatenating a colon and port number to it + monitor_addr = GatewayConfig.escape_address_if_ipv6(monitor_addr) + return "{}:{}".format(monitor_addr, monitor_port) + def _add_server_listener(self): """Adds listener port to server.""" diff --git a/tests/__init__.py b/tests/__init__.py index e69de29b..cb9e2a04 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,12 @@ +from control.proto import monitor_pb2 +from control.proto.monitor_pb2_grpc import MonitorStub +from control.server import GatewayServer +import grpc + +def set_group_id(group_id: int, server: GatewayServer): + """Set group ID od the gateway, mocking nvmeof gateway monitor client""" + channel = grpc.insecure_channel(server._monitor_address()) + stub = MonitorStub(channel) + request = monitor_pb2.group_id_req(id=group_id) + stub.group_id(request) + print(f"Set group id {group_id=} using address {server._monitor_address()} successfully.") diff --git a/tests/test_multi_gateway.py b/tests/test_multi_gateway.py index e2b42330..bea0dfd2 100644 --- a/tests/test_multi_gateway.py +++ b/tests/test_multi_gateway.py @@ -7,6 +7,7 @@ from control.server import GatewayServer from control.proto import gateway_pb2 as pb2 from control.proto import gateway_pb2_grpc as pb2_grpc +from . import set_group_id update_notify = True update_interval_sec = 5 @@ -19,18 +20,14 @@ def conn(config): configA.config["gateway"]["name"] = "GatewayA" configA.config["gateway"]["group"] = "Group1" configA.config["gateway"]["state_update_notify"] = str(update_notify) - configA.config["gateway"]["min_controller_id"] = "1" - configA.config["gateway"]["max_controller_id"] = "20000" configA.config["gateway"]["enable_spdk_discovery_controller"] = "True" configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayA.sock" configB = copy.deepcopy(configA) addr = configA.get("gateway", "addr") portA = configA.getint("gateway", "port") - portB = portA + 1 + portB = portA + 2 configB.config["gateway"]["name"] = "GatewayB" configB.config["gateway"]["port"] = str(portB) - configA.config["gateway"]["min_controller_id"] = "20001" - configA.config["gateway"]["max_controller_id"] = "40000" configB.config["gateway"]["state_update_interval_sec"] = str( update_interval_sec) configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock" @@ -42,10 +39,12 @@ def conn(config): GatewayServer(configB) as gatewayB, ): gatewayA.serve() + set_group_id(1, gatewayA) # Delete existing OMAP state gatewayA.gateway_rpc.gateway_state.delete_state() # Create new gatewayB.serve() + set_group_id(2, gatewayB) # Bind the client and Gateways A & B channelA = grpc.insecure_channel(f"{addr}:{portA}") diff --git a/tests/test_omap_lock.py b/tests/test_omap_lock.py index cd48e08b..1e0a32eb 100644 --- a/tests/test_omap_lock.py +++ b/tests/test_omap_lock.py @@ -7,6 +7,7 @@ from control.server import GatewayServer from control.proto import gateway_pb2 as pb2 from control.proto import gateway_pb2_grpc as pb2_grpc +from . import set_group_id import spdk.rpc.bdev as rpc_bdev image = "mytestdevimage" @@ -37,18 +38,14 @@ def conn(config, request): configA.config["gateway"]["state_update_interval_sec"] = str(update_interval_sec) configA.config["gateway"]["omap_file_disable_unlock"] = str(disable_unlock) configA.config["gateway"]["omap_file_lock_duration"] = str(lock_duration) - configA.config["gateway"]["min_controller_id"] = "1" - configA.config["gateway"]["max_controller_id"] = "20000" configA.config["gateway"]["enable_spdk_discovery_controller"] = "True" configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayA.sock" configB = copy.deepcopy(configA) addr = configA.get("gateway", "addr") portA = configA.getint("gateway", "port") - portB = portA + 1 + portB = portA + 2 configB.config["gateway"]["name"] = "GatewayB" configB.config["gateway"]["port"] = str(portB) - configB.config["gateway"]["min_controller_id"] = "20001" - configB.config["gateway"]["max_controller_id"] = "40000" configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock" configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02" @@ -58,10 +55,12 @@ def conn(config, request): GatewayServer(configB) as gatewayB, ): gatewayA.serve() + set_group_id(1, gatewayA) # Delete existing OMAP state gatewayA.gateway_rpc.gateway_state.delete_state() # Create new gatewayB.serve() + set_group_id(2, gatewayB) # Bind the client and Gateways A & B channelA = grpc.insecure_channel(f"{addr}:{portA}")