diff --git a/.github/workflows/build-container.yml b/.github/workflows/build-container.yml index ebd58bed..f3bfb193 100644 --- a/.github/workflows/build-container.yml +++ b/.github/workflows/build-container.yml @@ -261,6 +261,10 @@ jobs: discovery: needs: build + strategy: + fail-fast: false + matrix: + integration: ["container", "embedded"] runs-on: ubuntu-latest env: HUGEPAGES: 768 # 3 spdk instances @@ -285,10 +289,12 @@ jobs: docker load < bdevperf.tar - name: Start discovery controller + if: matrix.integration == 'container' run: | docker-compose up --detach discovery - name: Wait for discovery controller to be listening + if: matrix.integration == 'container' timeout-minutes: 3 run: | . .env @@ -392,7 +398,14 @@ jobs: echo "ℹ️ bdevperf start up logs" make logs SVC=bdevperf eval $(make run SVC=bdevperf OPTS="--entrypoint=env" | grep BDEVPERF_SOCKET | tr -d '\n\r' ) - ip=$(container_ip $DISC1) + + if [ "${{ matrix.integration }}" == "embedded" ]; then + ip=$(container_ip $GW1) + echo "ℹ️ Using discovery service in gateway $GW1 ip $ip" + else + ip=$(container_ip $DISC1) + echo "ℹ️ Using standalone discovery container $DISC1 ip $ip" + fi rpc="/usr/libexec/spdk/scripts/rpc.py" echo "ℹ️ bdevperf bdev_nvme_set_options" make exec SVC=bdevperf OPTS=-T CMD="$rpc -v -s $BDEVPERF_SOCKET bdev_nvme_set_options -r -1" diff --git a/ceph-nvmeof.conf b/ceph-nvmeof.conf index f05cd065..449eb089 100644 --- a/ceph-nvmeof.conf +++ b/ceph-nvmeof.conf @@ -17,7 +17,7 @@ state_update_notify = True state_update_interval_sec = 5 #min_controller_id = 1 #max_controller_id = 65519 -enable_discovery_controller = false +enable_spdk_discovery_controller = false [discovery] addr = 0.0.0.0 diff --git a/control/server.py b/control/server.py index bc740fc7..6250dd0f 100644 --- a/control/server.py +++ b/control/server.py @@ -16,7 +16,6 @@ import json import logging import signal -import traceback from concurrent import futures from google.protobuf import json_format @@ -28,6 +27,7 @@ from .proto import gateway_pb2_grpc as pb2_grpc from .state import GatewayState, LocalGatewayState, OmapGatewayState, GatewayStateHandler from .grpc import GatewayService +from .discovery import DiscoveryService def sigchld_handler(signum, frame): """Handle SIGCHLD, runs when a spdk process terminates.""" @@ -43,7 +43,7 @@ def sigchld_handler(signum, frame): exit_code = os.waitstatus_to_exitcode(wait_status) # GW process should exit now - raise SystemExit(f"spdk subprocess terminated {pid=} {exit_code=}") + raise SystemExit(f"Gateway subprocess terminated {pid=} {exit_code=}") class GatewayServer: """Runs SPDK and receives client requests for the gateway service. @@ -56,6 +56,7 @@ class GatewayServer: spdk_rpc_client: Client of SPDK RPC server spdk_rpc_ping_client: Ping client of SPDK RPC server spdk_process: Subprocess running SPDK NVMEoF target application + discovery_pid: Subprocess running Ceph nvmeof discovery service """ def __init__(self, config): @@ -64,6 +65,7 @@ def __init__(self, config): self.spdk_process = None self.gateway_rpc = None self.server = None + self.discovery_pid = None self.name = self.config.get("gateway", "name") if not self.name: @@ -79,6 +81,7 @@ def __exit__(self, exc_type, exc_value, traceback): if exc_type is not None: self.logger.exception("GatewayServer exception occurred:") + signal.signal(signal.SIGCHLD, signal.SIG_IGN) if self.spdk_process is not None: self._stop_spdk() @@ -86,15 +89,24 @@ def __exit__(self, exc_type, exc_value, traceback): self.logger.info("Stopping the server...") self.server.stop(None) + if self.discovery_pid: + self._stop_discovery() + self.logger.info("Exiting the gateway process.") def serve(self): """Starts gateway server.""" self.logger.debug("Starting serve") + # install SIGCHLD handler + signal.signal(signal.SIGCHLD, sigchld_handler) + # Start SPDK self._start_spdk() + # Start discovery service + self._start_discovery_service() + # Register service implementation with server omap_state = OmapGatewayState(self.config) local_state = LocalGatewayState() @@ -113,13 +125,28 @@ def serve(self): # Start server self.server.start() - enable_discovery_controller = self.config.getboolean_with_default("gateway", "enable_discovery_controller", False) - if not enable_discovery_controller: - try: - rpc_nvmf.nvmf_delete_subsystem(self.spdk_rpc_ping_client, "nqn.2014-08.org.nvmexpress.discovery") - except Exception as ex: - self.logger.error(f" Delete Discovery subsystem returned with error: \n {ex}") - raise + + + def _start_discovery_service(self): + """Runs either SPDK on CEPH NVMEOF Discovery Service.""" + enable_spdk_discovery_controller = self.config.getboolean_with_default("gateway", "enable_spdk_discpovery_controller", False) + if enable_spdk_discovery_controller: + self.logger.info("Using SPDK discovery service") + return + + try: + rpc_nvmf.nvmf_delete_subsystem(self.spdk_rpc_ping_client, "nqn.2014-08.org.nvmexpress.discovery") + except Exception as ex: + self.logger.error(f" Delete Discovery subsystem returned with error: \n {ex}") + raise + + # run ceph nvmeof discovery service in sub-process + assert self.discovery_pid is None + self.discovery_pid = os.fork() + if self.discovery_pid == 0: + self.logger.info("Starting ceph nvmeof discovery service") + DiscoveryService(self.config).start_service() + os._exit(0) def _add_server_listener(self): """Adds listener port to server.""" @@ -170,9 +197,6 @@ def _start_spdk(self): cmd += shlex.split(spdk_tgt_cmd_extra_args) self.logger.info(f"Starting {' '.join(cmd)}") try: - # install SIGCHLD handler - signal.signal(signal.SIGCHLD, sigchld_handler) - # start spdk process self.spdk_process = subprocess.Popen(cmd) except Exception as ex: @@ -221,7 +245,6 @@ def _stop_spdk(self): rpc_socket = self.config.get("spdk", "rpc_socket") # Terminate spdk process - signal.signal(signal.SIGCHLD, signal.SIG_DFL) if return_code is not None: self.logger.error(f"SPDK({self.name}) pid {self.spdk_process.pid} " f"already terminated, exit code: {return_code}") @@ -245,6 +268,21 @@ def _stop_spdk(self): self.logger.exception(f"An error occurred while removing " f"rpc socket {rpc_socket}:") + def _stop_discovery(self): + """Stops Discovery service process.""" + assert self.discovery_pid is not None # should be verified by the caller + + self.logger.info("Terminating discovery service...") + # discovery service selector loop should exit due to KeyboardInterrupt exception + try: + os.kill(self.discovery_pid, signal.SIGINT) + os.waitpid(self.discovery_pid, 0) + except ChildProcessError: + pass # ignore + self.logger.info("Discovery service terminated") + + self.discovery_pid = None + def _create_transport(self, trtype): """Initializes a transport type.""" args = {'trtype': trtype} diff --git a/tests/test_multi_gateway.py b/tests/test_multi_gateway.py index 8b57d6ec..a96eaf57 100644 --- a/tests/test_multi_gateway.py +++ b/tests/test_multi_gateway.py @@ -20,6 +20,7 @@ def conn(config): configA.config["gateway"]["state_update_notify"] = str(update_notify) configA.config["gateway"]["min_controller_id"] = "1" configA.config["gateway"]["max_controller_id"] = "20000" + configA.config["gateway"]["enable_spdk_discovery_controller"] = "true" configB = copy.deepcopy(configA) addr = configA.get("gateway", "addr") portA = configA.getint("gateway", "port") diff --git a/tests/test_server.py b/tests/test_server.py index 42a38797..1bddc93a 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -12,7 +12,7 @@ def _config(self, config): self.config = config def validate_exception(self, e): - pattern = r'spdk subprocess terminated pid=(\d+) exit_code=(\d+)' + pattern = r'Gateway subprocess terminated pid=(\d+) exit_code=(\d+)' m = re.match(pattern, e.code) assert(m) pid = int(m.group(1))