Skip to content

Commit

Permalink
Merge pull request #751 from gbregman/devel
Browse files Browse the repository at this point in the history
Close discovery service socket before exiting

Signed-off-by: barakda <[email protected]>
  • Loading branch information
gbregman authored and barakda committed Jul 10, 2024
1 parent ff28802 commit f892617
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 12 deletions.
56 changes: 46 additions & 10 deletions control/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class DiscoveryLogEntry(AutoSerializableStructure):
class DiscoveryService:
"""Implements discovery controller.
Response discover request from initiator.
Response discover request from initiator, this must be called from within a "with" block.
Instance attributes:
version: Discovery controller version
Expand Down Expand Up @@ -329,10 +329,49 @@ def __init__(self, config):
assert 0
self.logger.info(f"discovery addr: {self.discovery_addr} port: {self.discovery_port}")

self.sock = None
self.conn_vals = {}
self.connection_counter = 1
self.selector = selectors.DefaultSelector()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
if self.omap_state:
self.omap_state.cleanup_omap()
self.omap_state = None

if self.selector:
with self.lock:
for key in self.conn_vals:
try:
self.selector.unregister(self.conn_vals[key].connection)
except Except as ex:
pass
try:
self.conn_vals[key].connection.close()
except Except as ex:
pass
self.conn_vals = {}

if self.sock:
try:
self.selector.unregister(self.sock)
except Exception as ex:
pass
try:
self.sock.close()
except Exception as ex:
pass
self.sock = None

try:
self.selector.close()
except Exception as ex:
pass
self.selector = None

def _read_all(self) -> Dict[str, str]:
"""Reads OMAP and returns dict of all keys and values."""

Expand Down Expand Up @@ -1068,11 +1107,11 @@ def update_log_level(self):
def start_service(self):
"""Enable listening on the server side."""

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((self.discovery_addr, int(self.discovery_port)))
sock.listen(MAX_CONNECTION)
sock.setblocking(False)
self.selector.register(sock, selectors.EVENT_READ, self.nvmeof_accept)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.bind((self.discovery_addr, int(self.discovery_port)))
self.sock.listen(MAX_CONNECTION)
self.sock.setblocking(False)
self.selector.register(self.sock, selectors.EVENT_READ, self.nvmeof_accept)
self.logger.debug("waiting for connection...")
t = threading.Thread(target=self.handle_timeout)
t.start()
Expand All @@ -1090,10 +1129,7 @@ def start_service(self):
callback = key.data
callback(key.fileobj, mask)
except KeyboardInterrupt:
for key in self.conn_vals:
self.conn_vals[key].connection.close()
self.selector.close()
self.logger.debug("received a ctrl+C interrupt. exiting...")
self.logger.debug("received a ctrl+C interrupt. exiting...")

def main(args=None):
parser = argparse.ArgumentParser(prog="python3 -m control",
Expand Down
2 changes: 1 addition & 1 deletion control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ def _stop_discovery(self):
try:
os.kill(self.discovery_pid, signal.SIGINT)
os.waitpid(self.discovery_pid, 0)
except ChildProcessError:
except (ChildProcessError, ProcessLookupError):
pass # ignore
self.logger.info("Discovery service terminated")

Expand Down
16 changes: 16 additions & 0 deletions control/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,22 @@ def _watcher_callback(notify_id, notifier_id, watch_id, data):
else:
self.logger.info(f"Watch already exists.")

def cleanup_omap(self):
self.logger.info(f"Cleanup OMAP on exit ({self.id_text})")
if self.watch:
try:
self.watch.close()
self.logger.debug(f"Unregistered watch ({self.id_text})")
self.watch = None
except Exception:
pass
if self.ioctx:
try:
self.ioctx.close()
self.logger.debug(f"Closed Rados connection ({self.id_text})")
self.ioctx = None
except Exception:
pass

class GatewayStateHandler:
"""Maintains consistency in NVMeoF target state store instances.
Expand Down
15 changes: 14 additions & 1 deletion tests/ha/4gws.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,20 @@ expect_optimized() {
EXPECTED_OPTIMIZED=$2
NQN=$3

socket=$(docker exec "$GW_NAME" find /var/run/ceph -name spdk.sock)
socket_retries=0
socket=""
while [ $socket_retries -lt 10 ] ; do
socket=$(docker exec "$GW_NAME" find /var/run/ceph -name spdk.sock)
if [ -n "$socket" ]; then
break
fi
socket_retries=$(expr $socket_retries + 1)
sleep 1
done
if [ -z "$socket" ]; then
exit 1 # failed
fi

# Verify expected number of "optimized"
for i in $(seq 50); do
response=$(docker exec "$GW_NAME" "$rpc" "-s" "$socket" "$cmd" "$NQN")
Expand Down

0 comments on commit f892617

Please sign in to comment.