Skip to content

Commit

Permalink
Add grpc service to set monitor provided group is, aka optimized ana …
Browse files Browse the repository at this point in the history
…group

Signed-off-by: Alexander Indenbaum <[email protected]>
  • Loading branch information
Alexander Indenbaum committed Dec 4, 2023
1 parent 5eee6f4 commit 67397b7
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 18 deletions.
2 changes: 0 additions & 2 deletions ceph-nvmeof.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ port = 5500
enable_auth = False
state_update_notify = True
state_update_interval_sec = 5
#min_controller_id = 1
#max_controller_id = 65519
enable_spdk_discovery_controller = False
#omap_file_lock_duration = 60
#omap_file_lock_retries = 15
Expand Down
23 changes: 19 additions & 4 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,31 @@
import os
import threading
import errno
from typing import Callable

import spdk.rpc.bdev as rpc_bdev
import spdk.rpc.nvmf as rpc_nvmf
import spdk.rpc.log as rpc_log

from google.protobuf import json_format
from google.protobuf.empty_pb2 import Empty
from .proto import gateway_pb2 as pb2
from .proto import gateway_pb2_grpc as pb2_grpc
from .proto import monitor_pb2
from .proto import monitor_pb2_grpc
from .config import GatewayConfig
from .discovery import DiscoveryService
from .state import GatewayState, GatewayStateHandler, OmapLock

MAX_ANA_GROUPS = 4
MAX_ANA_GROUPS = 32 # should match nvmeof gateway monitor ceph c++ code

class MonitorService(monitor_pb2_grpc.MonitorServicer):
def __init__(self, set_group_id: Callable[[int], None]) -> None:
self.set_group_id = set_group_id

def group_id(self, request: monitor_pb2.group_id_req, context = None) -> Empty:
self.set_group_id(request.id)
return Empty()

class GatewayService(pb2_grpc.GatewayServicer):
"""Implements gateway service interface.
Expand All @@ -43,7 +55,7 @@ class GatewayService(pb2_grpc.GatewayServicer):
spdk_rpc_client: Client of SPDK RPC server
"""

def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, omap_lock: OmapLock, spdk_rpc_client) -> None:
def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, omap_lock: OmapLock, group_id: int, spdk_rpc_client) -> None:
"""Constructor"""
self.logger = logging.getLogger(__name__)
ver = os.getenv("NVMEOF_VERSION")
Expand Down Expand Up @@ -75,6 +87,7 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, om
self.rpc_lock = threading.Lock()
self.gateway_state = gateway_state
self.omap_lock = omap_lock
self.group_id = group_id
self.spdk_rpc_client = spdk_rpc_client
self.gateway_name = self.config.get("gateway", "name")
if not self.gateway_name:
Expand Down Expand Up @@ -309,8 +322,10 @@ def create_subsystem_safe(self, request, context=None):
if self.is_discovery_nqn(request.subsystem_nqn):
raise Exception(f"Can't create a discovery subsystem")

min_cntlid = self.config.getint_with_default("gateway", "min_controller_id", 1)
max_cntlid = self.config.getint_with_default("gateway", "max_controller_id", 65519)
# Assuming max of 32 gateways and protocol min 1 max 65519
offset = (self.group_id - 1) * 2040
min_cntlid = offset + 1
max_cntlid = offset + 2040
if not request.serial_number:
random.seed()
randser = random.randint(2, 99999999999999)
Expand Down
40 changes: 38 additions & 2 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import json
import logging
import signal
import threading
from concurrent import futures
from google.protobuf import json_format

Expand All @@ -25,10 +26,14 @@

from .proto import gateway_pb2 as pb2
from .proto import gateway_pb2_grpc as pb2_grpc
from .proto import monitor_pb2
from .proto import monitor_pb2_grpc
from .state import GatewayState, LocalGatewayState, OmapLock, OmapGatewayState, GatewayStateHandler
from .grpc import GatewayService
from .grpc import GatewayService, MonitorService
from .discovery import DiscoveryService
from .config import GatewayConfig
from .grpc import MAX_ANA_GROUPS
from typing import Callable

def sigchld_handler(signum, frame):
"""Handle SIGCHLD, runs when a spdk process terminates."""
Expand Down Expand Up @@ -68,6 +73,7 @@ def __init__(self, config: GatewayConfig):
self.server = None
self.discovery_pid = None
self.spdk_rpc_socket_path = None
self.monitor_event = threading.Event()

self.name = self.config.get("gateway", "name")
if not self.name:
Expand Down Expand Up @@ -99,6 +105,25 @@ def __exit__(self, exc_type, exc_value, traceback):

self.logger.info("Exiting the gateway process.")

def set_group_id(self, id: int):
self.logger.info(f"Gateway {self.name} group {id=}")
assert id >= 1 and id <= MAX_ANA_GROUPS
self.group_id = id
self.monitor_event.set()

def _wait_for_group_id(self):
"""Waits for the monitor notification of this gatway's group id"""
self.monitor_server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
monitor_pb2_grpc.add_MonitorServicer_to_server(MonitorService(self.set_group_id), self.monitor_server)
self.monitor_server.add_insecure_port(self._monitor_address())
self.monitor_server.start()
self.logger.info(f"Monitor server is listening on {self._monitor_address()} for group id")
self.monitor_event.wait()
self.monitor_event = None
self.logger.info("Stopping the monitor server...")
self.monitor_server.stop(None)
self.monitor_server = None

def serve(self):
"""Starts gateway server."""
self.logger.debug("Starting serve")
Expand All @@ -109,6 +134,9 @@ def serve(self):
# install SIGCHLD handler
signal.signal(signal.SIGCHLD, sigchld_handler)

# wait for monitor notification of the group id
self._wait_for_group_id()

# Start SPDK
self._start_spdk(omap_state)

Expand All @@ -118,7 +146,7 @@ def serve(self):
# Register service implementation with server
gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller)
omap_lock = OmapLock(omap_state, gateway_state)
self.gateway_rpc = GatewayService(self.config, gateway_state, omap_lock, self.spdk_rpc_client)
self.gateway_rpc = GatewayService(self.config, gateway_state, omap_lock, self.group_id, self.spdk_rpc_client)
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server)

Expand Down Expand Up @@ -190,6 +218,14 @@ def _gateway_address(self):
gateway_addr = GatewayConfig.escape_address_if_ipv6(gateway_addr)
return "{}:{}".format(gateway_addr, gateway_port)

def _monitor_address(self):
"""Calculate gateway gRPC address string."""
monitor_addr = self.config.get("gateway", "addr")
monitor_port = self.config.get("gateway", "port") - 1
# We need to enclose IPv6 addresses in brackets before concatenating a colon and port number to it
monitor_addr = GatewayConfig.escape_address_if_ipv6(monitor_addr)
return "{}:{}".format(monitor_addr, monitor_port)

def _add_server_listener(self):
"""Adds listener port to server."""

Expand Down
12 changes: 12 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from control.proto import monitor_pb2
from control.proto.monitor_pb2_grpc import MonitorStub
from control.server import GatewayServer
import grpc

def set_group_id(group_id: int, server: GatewayServer):
"""Set group ID od the gateway, mocking nvmeof gateway monitor client"""
channel = grpc.insecure_channel(server._monitor_address())
stub = MonitorStub(channel)
request = monitor_pb2.group_id_req(id=group_id)
stub.group_id(request)
print(f"Set group id {group_id=} using address {server._monitor_address()} successfully.")
9 changes: 4 additions & 5 deletions tests/test_multi_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from control.server import GatewayServer
from control.proto import gateway_pb2 as pb2
from control.proto import gateway_pb2_grpc as pb2_grpc
from . import set_group_id

update_notify = True
update_interval_sec = 5
Expand All @@ -19,18 +20,14 @@ def conn(config):
configA.config["gateway"]["name"] = "GatewayA"
configA.config["gateway"]["group"] = "Group1"
configA.config["gateway"]["state_update_notify"] = str(update_notify)
configA.config["gateway"]["min_controller_id"] = "1"
configA.config["gateway"]["max_controller_id"] = "20000"
configA.config["gateway"]["enable_spdk_discovery_controller"] = "True"
configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayA.sock"
configB = copy.deepcopy(configA)
addr = configA.get("gateway", "addr")
portA = configA.getint("gateway", "port")
portB = portA + 1
portB = portA + 2
configB.config["gateway"]["name"] = "GatewayB"
configB.config["gateway"]["port"] = str(portB)
configA.config["gateway"]["min_controller_id"] = "20001"
configA.config["gateway"]["max_controller_id"] = "40000"
configB.config["gateway"]["state_update_interval_sec"] = str(
update_interval_sec)
configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock"
Expand All @@ -42,10 +39,12 @@ def conn(config):
GatewayServer(configB) as gatewayB,
):
gatewayA.serve()
set_group_id(1, gatewayA)
# Delete existing OMAP state
gatewayA.gateway_rpc.gateway_state.delete_state()
# Create new
gatewayB.serve()
set_group_id(2, gatewayB)

# Bind the client and Gateways A & B
channelA = grpc.insecure_channel(f"{addr}:{portA}")
Expand Down
9 changes: 4 additions & 5 deletions tests/test_omap_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from control.server import GatewayServer
from control.proto import gateway_pb2 as pb2
from control.proto import gateway_pb2_grpc as pb2_grpc
from . import set_group_id
import spdk.rpc.bdev as rpc_bdev

image = "mytestdevimage"
Expand Down Expand Up @@ -37,18 +38,14 @@ def conn(config, request):
configA.config["gateway"]["state_update_interval_sec"] = str(update_interval_sec)
configA.config["gateway"]["omap_file_disable_unlock"] = str(disable_unlock)
configA.config["gateway"]["omap_file_lock_duration"] = str(lock_duration)
configA.config["gateway"]["min_controller_id"] = "1"
configA.config["gateway"]["max_controller_id"] = "20000"
configA.config["gateway"]["enable_spdk_discovery_controller"] = "True"
configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayA.sock"
configB = copy.deepcopy(configA)
addr = configA.get("gateway", "addr")
portA = configA.getint("gateway", "port")
portB = portA + 1
portB = portA + 2
configB.config["gateway"]["name"] = "GatewayB"
configB.config["gateway"]["port"] = str(portB)
configB.config["gateway"]["min_controller_id"] = "20001"
configB.config["gateway"]["max_controller_id"] = "40000"
configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock"
configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02"

Expand All @@ -58,10 +55,12 @@ def conn(config, request):
GatewayServer(configB) as gatewayB,
):
gatewayA.serve()
set_group_id(1, gatewayA)
# Delete existing OMAP state
gatewayA.gateway_rpc.gateway_state.delete_state()
# Create new
gatewayB.serve()
set_group_id(2, gatewayB)

# Bind the client and Gateways A & B
channelA = grpc.insecure_channel(f"{addr}:{portA}")
Expand Down

0 comments on commit 67397b7

Please sign in to comment.