Skip to content

Commit

Permalink
Clean protobuf from unused features, hard code TCP for transport type…
Browse files Browse the repository at this point in the history
… and only allow IP addr family

Fixes ceph#397

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman committed Jan 30, 2024
1 parent 59e82d9 commit 44ab81b
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 140 deletions.
26 changes: 6 additions & 20 deletions control/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ def argument(*name_or_flags, **kwargs):
"""Helper function to format arguments for argparse command decorator."""
return (list(name_or_flags), kwargs)

def get_enum_keys_list(e_type, include_first = False):
def get_enum_keys_list(e_type):
k_list = []
for k in e_type.keys():
k_list.append(k.lower())
k_list.append(k.upper())
if not include_first:
k_list = k_list[2:]

return k_list

Expand Down Expand Up @@ -748,23 +746,19 @@ def listener_add(self, args):
args.trsvcid = 4420
elif args.trsvcid <= 0:
self.cli.parser.error("trsvcid value must be positive")
if not args.trtype:
args.trtype = "TCP"
elif args.trsvcid > 0xffff:
self.cli.parser.error("trsvcid value must be smaller than 65536")
if not args.adrfam:
args.adrfam = "IPV4"

traddr = GatewayUtils.escape_address_if_ipv6(args.traddr)
trtype = None
adrfam = None
if args.trtype:
trtype = args.trtype.upper()
if args.adrfam:
adrfam = args.adrfam.lower()

req = pb2.create_listener_req(
nqn=args.subsystem,
gateway_name=args.gateway_name,
trtype=trtype,
adrfam=adrfam,
traddr=traddr,
trsvcid=args.trsvcid,
Expand Down Expand Up @@ -812,23 +806,19 @@ def listener_del(self, args):
self.cli.parser.error("--trsvcid argument is mandatory for del command")
if args.trsvcid <= 0:
self.cli.parser.error("trsvcid value must be positive")
if not args.trtype:
args.trtype = "TCP"
elif args.trsvcid > 0xffff:
self.cli.parser.error("trsvcid value must be smaller than 65536")
if not args.adrfam:
args.adrfam = "IPV4"

traddr = GatewayUtils.escape_address_if_ipv6(args.traddr)
trtype = None
adrfam = None
if args.trtype:
trtype = args.trtype.upper()
if args.adrfam:
adrfam = args.adrfam.lower()

req = pb2.delete_listener_req(
nqn=args.subsystem,
gateway_name=args.gateway_name,
trtype=trtype,
adrfam=adrfam,
traddr=traddr,
trsvcid=args.trsvcid,
Expand Down Expand Up @@ -871,8 +861,6 @@ def listener_list(self, args):
self.cli.parser.error("--gateway-name argument is not allowed for list command")
if args.traddr != None:
self.cli.parser.error("--traddr argument is not allowed for list command")
if args.trtype:
self.cli.parser.error("--trtype argument is not allowed for list command")
if args.adrfam:
self.cli.parser.error("--adrfam argument is not allowed for list command")
if args.trsvcid != None:
Expand All @@ -890,8 +878,7 @@ def listener_list(self, args):
for l in listeners_info.listeners:
adrfam = GatewayEnumUtils.get_key_from_value(pb2.AddressFamily, l.adrfam)
adrfam = self.format_adrfam(adrfam)
trtype = GatewayEnumUtils.get_key_from_value(pb2.TransportType, l.trtype)
listeners_list.append([l.gateway_name, trtype, adrfam, f"{l.traddr}:{l.trsvcid}"])
listeners_list.append([l.gateway_name, l.trtype, adrfam, f"{l.traddr}:{l.trsvcid}"])
if len(listeners_list) > 0:
if args.format == "text":
table_format = "fancy_grid"
Expand Down Expand Up @@ -927,7 +914,6 @@ def listener_list(self, args):
argument("listener_command", help="listener sub-command", choices=["add", "del", "list"]),
argument("--subsystem", "-n", help="Subsystem NQN", required=True),
argument("--gateway-name", "-g", help="Gateway name", required=False),
argument("--trtype", "-t", help="Transport type", default="", choices=get_enum_keys_list(pb2.TransportType)),
argument("--adrfam", "-f", help="Address family", default="", choices=get_enum_keys_list(pb2.AddressFamily)),
argument("--traddr", "-a", help="NVMe host IP", required=False),
argument("--trsvcid", "-s", help="Port number", type=int, required=False),
Expand Down
27 changes: 17 additions & 10 deletions control/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import logging
from .config import GatewayConfig
from .state import GatewayState, LocalGatewayState, OmapGatewayState, GatewayStateHandler
from .utils import GatewayEnumUtils
from .utils import GatewayLogger
from .proto import gateway_pb2 as pb2

Expand Down Expand Up @@ -85,6 +84,21 @@ class NVMF_SUBTYPE(enum.IntFlag):
# NVMe type for NVM subsystem
NVME = 0x2

# NVMe over Fabrics transport types
class TRANSPORT_TYPES(enum.IntFlag):
RDMA = 0x1
FC = 0x2
TCP = 0x3
INTRA_HOST = 0xfe

# Address family types
class ADRFAM_TYPES(enum.IntFlag):
ipv4 = 0x1
ipv6 = 0x2
ib = 0x3
fc = 0x4
intra_host = 0xfe

# Transport requirement, secure channel requirements
# Connections shall be made over a fabric secure channel
class NVMF_TREQ_SECURE_CHANNEL(enum.IntFlag):
Expand Down Expand Up @@ -717,16 +731,9 @@ def reply_get_log_page(self, conn, data, cmd_id):
log_entry_counter = 0
while log_entry_counter < len(allow_listeners):
log_entry = DiscoveryLogEntry()
log_trtype = allow_listeners[log_entry_counter]["trtype"]
log_entry.trtype = TRANSPORT_TYPES.TCP
log_adrfam = allow_listeners[log_entry_counter]["adrfam"]
trtype = GatewayEnumUtils.get_value_from_key(pb2.TransportType, log_trtype, True)
adrfam = GatewayEnumUtils.get_value_from_key(pb2.AddressFamily, log_adrfam, True)

if trtype is None:
self.logger.error(f"unsupported transport type {log_trtype}")
else:
log_entry.trtype = trtype

adrfam = ADRFAM_TYPES[log_adrfam.lower()]
if adrfam is None:
self.logger.error(f"unsupported address family {log_adrfam}")
else:
Expand Down
70 changes: 32 additions & 38 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@

MAX_ANA_GROUPS = 4

class BdevStatus:
def __init__(self, status, error_message, bdev_name = ""):
self.status = status
self.error_message = error_message
self.bdev_name = bdev_name

class GatewayService(pb2_grpc.GatewayServicer):
"""Implements gateway service interface.
Expand Down Expand Up @@ -230,7 +236,7 @@ def create_bdev(self, name, uuid, rbd_pool_name, rbd_image_name, block_size, cre
if create_image:
rc = self.ceph_utils.pool_exists(rbd_pool_name)
if not rc:
return pb2.bdev_status(status=errno.ENODEV,
return BdevStatus(status=errno.ENODEV,
error_message=f"Failure creating bdev {name}: RBD pool {rbd_pool_name} doesn't exist")

try:
Expand All @@ -241,7 +247,7 @@ def create_bdev(self, name, uuid, rbd_pool_name, rbd_image_name, block_size, cre
self.logger.info(f"Image {rbd_image_name} already exists")
except Exception:
self.logger.exception(f"Can't create RBD image {rbd_image_name}")
return pb2.bdev_status(status=errno.ENODEV, error_message=f"Failure creating bdev {name}: Can't create RBD image {rbd_image_name}")
return BdevStatus(status=errno.ENODEV, error_message=f"Failure creating bdev {name}: Can't create RBD image {rbd_image_name}")

try:
bdev_name = rpc_bdev.bdev_rbd_create(
Expand All @@ -262,18 +268,18 @@ def create_bdev(self, name, uuid, rbd_pool_name, rbd_image_name, block_size, cre
if resp:
status = resp["code"]
errmsg = f"Failure creating bdev {name}: {resp['message']}"
return pb2.bdev_status(status=status, error_message=errmsg)
return BdevStatus(status=status, error_message=errmsg)

# Just in case SPDK failed with no exception
if not bdev_name:
errmsg = f"Can't create bdev {name}"
self.logger.error(errmsg)
return pb2.bdev_status(status=errno.ENODEV, error_message=errmsg)
return BdevStatus(status=errno.ENODEV, error_message=errmsg)

if name != bdev_name:
self.logger.warning(f"Created bdev name {bdev_name} differs from requested name {name}")

return pb2.bdev_status(bdev_name=name, status=0, error_message=os.strerror(0))
return BdevStatus(status=0, error_message=os.strerror(0), bdev_name=name)

def resize_bdev(self, bdev_name, new_size):
"""Resizes a bdev."""
Expand Down Expand Up @@ -1598,7 +1604,6 @@ def list_connections_safe(self, request, context):
traddr = ""
trsvcid = 0
adrfam = ""
trtype = ""
hostnqn = conn["hostnqn"]
connected = False

Expand Down Expand Up @@ -1664,10 +1669,10 @@ def get_subsystem_ha_status(self, nqn) -> bool:
self.logger.warning(f"Subsystem {nqn} not found")
return enable_ha

def matching_listener_exists(self, context, nqn, gw_name, trtype, traddr, trsvcid) -> bool:
def matching_listener_exists(self, context, nqn, gw_name, traddr, trsvcid) -> bool:
if not context:
return False
listener_key = GatewayState.build_listener_key(nqn, gw_name, trtype, traddr, trsvcid)
listener_key = GatewayState.build_listener_key(nqn, gw_name, "TCP", traddr, trsvcid)
state = self.gateway_state.local.get_state()
if state.get(listener_key):
return True
Expand All @@ -1681,12 +1686,6 @@ def create_listener_safe(self, request, context):
traddr = GatewayUtils.escape_address_if_ipv6(request.traddr)
create_listener_error_prefix = f"Failure adding {request.nqn} listener at {traddr}:{request.trsvcid}"

trtype = GatewayEnumUtils.get_key_from_value(pb2.TransportType, request.trtype)
if trtype == None:
errmsg=f"{create_listener_error_prefix}: Unknown transport type {request.trtype}"
self.logger.error(f"{errmsg}")
return pb2.req_status(status=errno.ENOKEY, error_message=errmsg)

adrfam = GatewayEnumUtils.get_key_from_value(pb2.AddressFamily, request.adrfam)
if adrfam == None:
errmsg=f"{create_listener_error_prefix}: Unknown address family {request.adrfam}"
Expand All @@ -1700,7 +1699,7 @@ def create_listener_safe(self, request, context):
return pb2.req_status(status=errno.ENOKEY, error_message=errmsg)

self.logger.info(f"Received request to create {request.gateway_name}"
f" {trtype} {adrfam} listener for {request.nqn} at"
f" TCP {adrfam} listener for {request.nqn} at"
f" {traddr}:{request.trsvcid}, auto HA state: {auto_ha_state}, context: {context}")

if GatewayUtils.is_discovery_nqn(request.nqn):
Expand All @@ -1712,15 +1711,15 @@ def create_listener_safe(self, request, context):
try:
if request.gateway_name == self.gateway_name:
listener_already_exist = self.matching_listener_exists(
context, request.nqn, request.gateway_name, trtype, request.traddr, request.trsvcid)
context, request.nqn, request.gateway_name, request.traddr, request.trsvcid)
if listener_already_exist:
self.logger.error(f"{request.nqn} already listens on address {traddr}:{request.trsvcid}")
return pb2.req_status(status=errno.EEXIST,
error_message=f"{create_listener_error_prefix}: Subsystem already listens on this address")
ret = rpc_nvmf.nvmf_subsystem_add_listener(
self.spdk_rpc_client,
nqn=request.nqn,
trtype=trtype,
trtype="TCP",
traddr=request.traddr,
trsvcid=str(request.trsvcid),
adrfam=adrfam,
Expand Down Expand Up @@ -1787,7 +1786,7 @@ def create_listener_safe(self, request, context):
self.spdk_rpc_client,
nqn=request.nqn,
ana_state="inaccessible",
trtype=trtype,
trtype="TCP",
traddr=request.traddr,
trsvcid=str(request.trsvcid),
adrfam=adrfam,
Expand All @@ -1809,7 +1808,7 @@ def create_listener_safe(self, request, context):
request, preserving_proto_field_name=True, including_default_value_fields=True)
self.gateway_state.add_listener(request.nqn,
request.gateway_name,
trtype, request.traddr,
"TCP", request.traddr,
request.trsvcid, json_req)
except Exception as ex:
errmsg = f"Error persisting listener {traddr}:{request.trsvcid}:\n{ex}"
Expand All @@ -1821,15 +1820,15 @@ def create_listener_safe(self, request, context):
def create_listener(self, request, context=None):
return self.execute_grpc_function(self.create_listener_safe, request, context)

def remove_listener_from_state(self, nqn, gw_name, trtype, traddr, port, context):
def remove_listener_from_state(self, nqn, gw_name, traddr, port, context):
if not context:
return pb2.req_status(status=0, error_message=os.strerror(0))

if context:
assert self.omap_lock.locked()
# Update gateway state
try:
self.gateway_state.remove_listener(nqn, gw_name, trtype, traddr, port)
self.gateway_state.remove_listener(nqn, gw_name, "TCP", traddr, port)
except Exception as ex:
errmsg = f"Error persisting deletion of listener {traddr}:{port} from {nqn}:\n{ex}"
self.logger.error(errmsg)
Expand All @@ -1843,20 +1842,14 @@ def delete_listener_safe(self, request, context):
traddr = GatewayUtils.escape_address_if_ipv6(request.traddr)
delete_listener_error_prefix = f"Failure deleting listener {traddr}:{request.trsvcid} from {request.nqn}"

trtype = GatewayEnumUtils.get_key_from_value(pb2.TransportType, request.trtype)
if trtype == None:
errmsg=f"{delete_listener_error_prefix}: Unknown transport type {request.trtype}"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOKEY, error_message=errmsg)

adrfam = GatewayEnumUtils.get_key_from_value(pb2.AddressFamily, request.adrfam)
if adrfam == None:
errmsg=f"{delete_listener_error_prefix}: Unknown address family {request.adrfam}"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOKEY, error_message=errmsg)

self.logger.info(f"Received request to delete {request.gateway_name}"
f" {trtype} listener for {request.nqn} at"
f" TCP listener for {request.nqn} at"
f" {traddr}:{request.trsvcid}, context: {context}")

if GatewayUtils.is_discovery_nqn(request.nqn):
Expand All @@ -1870,7 +1863,7 @@ def delete_listener_safe(self, request, context):
ret = rpc_nvmf.nvmf_subsystem_remove_listener(
self.spdk_rpc_client,
nqn=request.nqn,
trtype=trtype,
trtype="TCP",
traddr=request.traddr,
trsvcid=str(request.trsvcid),
adrfam=adrfam,
Expand All @@ -1883,7 +1876,7 @@ def delete_listener_safe(self, request, context):
except Exception as ex:
errmsg = f"{delete_listener_error_prefix}:\n{ex}"
self.logger.error(errmsg)
self.remove_listener_from_state(request.nqn, request.gateway_name, trtype,
self.remove_listener_from_state(request.nqn, request.gateway_name,
request.traddr, request.trsvcid, context)
resp = self.parse_json_exeption(ex)
status = errno.EINVAL
Expand All @@ -1895,11 +1888,11 @@ def delete_listener_safe(self, request, context):
# Just in case SPDK failed with no exception
if not ret:
self.logger.error(delete_listener_error_prefix)
self.remove_listener_from_state(request.nqn, request.gateway_name, trtype,
self.remove_listener_from_state(request.nqn, request.gateway_name,
request.traddr, request.trsvcid, context)
return pb2.req_status(status=errno.EINVAL, error_message=delete_listener_error_prefix)

return self.remove_listener_from_state(request.nqn, request.gateway_name, trtype,
return self.remove_listener_from_state(request.nqn, request.gateway_name,
request.traddr, request.trsvcid, context)

def delete_listener(self, request, context=None):
Expand Down Expand Up @@ -2044,14 +2037,15 @@ def set_spdk_nvmf_logs_safe(self, request, context):
print_level = None
ret_log = False
ret_print = False
if request.log_level:

if request.HasField("log_level"):
log_level = GatewayEnumUtils.get_key_from_value(pb2.LogLevel, request.log_level)
if log_level == None:
errmsg=f"Unknown log level {request.log_level}"
self.logger.error(f"{errmsg}")
return pb2.req_status(status=errno.ENOKEY, error_message=errmsg)

if request.print_level:
if request.HasField("print_level"):
print_level = GatewayEnumUtils.get_key_from_value(pb2.LogLevel, request.print_level)
if print_level == None:
errmsg=f"Unknown print level {request.print_level}"
Expand All @@ -2067,10 +2061,10 @@ def set_spdk_nvmf_logs_safe(self, request, context):
ret = [rpc_log.log_set_flag(
self.spdk_rpc_client, flag=flag) for flag in nvmf_log_flags]
self.logger.info(f"Set SPDK nvmf log flags {nvmf_log_flags} to TRUE: {ret}")
if log_level:
if log_level != None:
ret_log = rpc_log.log_set_level(self.spdk_rpc_client, level=log_level)
self.logger.info(f"Set log level to {log_level}: {ret_log}")
if print_level:
if print_level != None:
ret_print = rpc_log.log_set_print_level(
self.spdk_rpc_client, level=print_level)
self.logger.info(f"Set log print level to {print_level}: {ret_print}")
Expand All @@ -2088,10 +2082,10 @@ def set_spdk_nvmf_logs_safe(self, request, context):

status = 0
errmsg = os.strerror(0)
if log_level and not ret_log:
if log_level != None and not ret_log:
status = errno.EINVAL
errmsg = "Failure setting SPDK log level"
elif print_level and not ret_print:
elif print_level != None and not ret_print:
status = errno.EINVAL
errmsg = "Failure setting SPDK print log level"
elif not all(ret):
Expand Down
Loading

0 comments on commit 44ab81b

Please sign in to comment.