Skip to content

Commit

Permalink
Thread + sigint for termination
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Indenbaum <[email protected]>
  • Loading branch information
Alexander Indenbaum committed Oct 5, 2023
1 parent 6770c39 commit e8646cd
Showing 1 changed file with 25 additions and 11 deletions.
36 changes: 25 additions & 11 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import logging
import signal
import time
import threading
from concurrent import futures
from google.protobuf import json_format

Expand Down Expand Up @@ -66,7 +67,7 @@ def __init__(self, config):
self.spdk_process = None
self.gateway_rpc = None
self.server = None
self.discovery_pid = None
self.discovery_thread = None

self.name = self.config.get("gateway", "name")
if not self.name:
Expand All @@ -89,11 +90,17 @@ def __exit__(self, exc_type, exc_value, traceback):
self.logger.info("Stopping the server...")
self.server.stop(None)

if self.discovery_pid:
if self.discovery_thread:
self.logger.info("Terminating discovery service...")
os.kill(self.discovery_pid, signal.SIGTERM)
time.sleep(0.1) # a bit of grace period before final blow
os.kill(self.discovery_pid, signal.SIGKILL)
# discovery service selector loop should exit due to KeyboardInterrupt exception
try:
os.kill(os.getpid(), signal.SIGINT)
self.discovery_thread.join()
except KeyboardInterrupt:
self.logger.info("Ignore KeyboardInterrupt in the main thread")

self.discovery_thread = None
self.logger.info("Discovery service terminated")

self.logger.info("Exiting the gateway process.")

Expand Down Expand Up @@ -139,12 +146,19 @@ def _start_discovery_service(self):
self.logger.error(f" Delete Discovery subsystem returned with error: \n {ex}")
raise

# run ceph nvmeof discovery service in sub-process
self.discovery_pid = os.fork()
if self.discovery_pid == 0:
self.logger.info("Starting ceph nvmeof discovery service")
DiscoveryService(self.config).start_service()
raise SystemExit
def discovery_thread():
while True:
try:
DiscoveryService(self.config).start_service()
except:
self.logger.exception("Discovery service exception:")
self.logger.error("Discovery service restarting")

# Start discovery service thread
self.logger.info("Starting ceph nvmeof discovery service thread")
assert self.discovery_thread is None
self.discovery_thread = threading.Thread(target=discovery_thread)
self.discovery_thread.start()

def _add_server_listener(self):
"""Adds listener port to server."""
Expand Down

0 comments on commit e8646cd

Please sign in to comment.