From e8646cdf1fa84467587b88f7dca1ceb9d5932408 Mon Sep 17 00:00:00 2001 From: Alexander Indenbaum Date: Thu, 5 Oct 2023 10:23:04 +0300 Subject: [PATCH] Thread + sigint for termination Signed-off-by: Alexander Indenbaum --- control/server.py | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/control/server.py b/control/server.py index ce7e4ff1..faba5109 100644 --- a/control/server.py +++ b/control/server.py @@ -17,6 +17,7 @@ import logging import signal import time +import threading from concurrent import futures from google.protobuf import json_format @@ -66,7 +67,7 @@ def __init__(self, config): self.spdk_process = None self.gateway_rpc = None self.server = None - self.discovery_pid = None + self.discovery_thread = None self.name = self.config.get("gateway", "name") if not self.name: @@ -89,11 +90,17 @@ def __exit__(self, exc_type, exc_value, traceback): self.logger.info("Stopping the server...") self.server.stop(None) - if self.discovery_pid: + if self.discovery_thread: self.logger.info("Terminating discovery service...") - os.kill(self.discovery_pid, signal.SIGTERM) - time.sleep(0.1) # a bit of grace period before final blow - os.kill(self.discovery_pid, signal.SIGKILL) + # discovery service selector loop should exit due to KeyboardInterrupt exception + try: + os.kill(os.getpid(), signal.SIGINT) + self.discovery_thread.join() + except KeyboardInterrupt: + self.logger.info("Ignore KeyboardInterrupt in the main thread") + + self.discovery_thread = None + self.logger.info("Discovery service terminated") self.logger.info("Exiting the gateway process.") @@ -139,12 +146,19 @@ def _start_discovery_service(self): self.logger.error(f" Delete Discovery subsystem returned with error: \n {ex}") raise - # run ceph nvmeof discovery service in sub-process - self.discovery_pid = os.fork() - if self.discovery_pid == 0: - self.logger.info("Starting ceph nvmeof discovery service") - DiscoveryService(self.config).start_service() - raise SystemExit + def discovery_thread(): + while True: + try: + DiscoveryService(self.config).start_service() + except: + self.logger.exception("Discovery service exception:") + self.logger.error("Discovery service restarting") + + # Start discovery service thread + self.logger.info("Starting ceph nvmeof discovery service thread") + assert self.discovery_thread is None + self.discovery_thread = threading.Thread(target=discovery_thread) + self.discovery_thread.start() def _add_server_listener(self): """Adds listener port to server."""