diff --git a/.env b/.env index bd43ab4b..952fe861 100644 --- a/.env +++ b/.env @@ -1,6 +1,6 @@ # Globals -VERSION="0.0.3" -CEPH_VERSION="17.2.6" +VERSION="0.0.5" +CEPH_VERSION="18.2.0" SPDK_VERSION="23.01.1" CONTAINER_REGISTRY="quay.io/ceph" QUAY_SPDK="${CONTAINER_REGISTRY}/spdk" @@ -8,6 +8,8 @@ QUAY_CEPH="${CONTAINER_REGISTRY}/vstart-cluster" QUAY_NVMEOF="${CONTAINER_REGISTRY}/nvmeof" QUAY_NVMEOFCLI="${CONTAINER_REGISTRY}/nvmeof-cli" MAINTAINER="Ceph Developers " +COMPOSE_PROJECT_NAME="ceph-nvmeof" +NVMEOF_CONTAINER_NAME="${COMPOSE_PROJECT_NAME}-nvmeof-1" # Performance NVMEOF_NOFILE=20480 # Max number of open files (depends on number of hosts connected) @@ -18,18 +20,19 @@ HUGEPAGES_DIR="/sys/kernel/mm/hugepages/hugepages-2048kB/nr_hugepages" NVMEOF_VERSION="${VERSION}" NVMEOF_CONFIG="./ceph-nvmeof.conf" NVMEOF_SPDK_VERSION="${SPDK_VERSION}" +NVMEOF_CEPH_VERSION="${CEPH_VERSION}" NVMEOF_NAME="ceph-nvmeof" NVMEOF_SUMMARY="Ceph NVMe over Fabrics Gateway" NVMEOF_DESCRIPTION="Service to provide block storage on top of Ceph for platforms (e.g.: VMWare) without native Ceph support (RBD), replacing existing approaches (iSCSI) with a newer and more versatile standard (NVMe-oF)." NVMEOF_URL="https://github.com/ceph/ceph-nvmeof" NVMEOF_TAGS="ceph,nvme-of,nvme-of gateway,rbd,block storage" NVMEOF_WANTS="ceph,rbd" -NVMEOF_IP_ADDRESS="192.168.13.3" +NVMEOF_IP_ADDRESS=192.168.13.3 +NVMEOF_IPV6_ADDRESS=2001:db8::3 NVMEOF_IO_PORT=4420 NVMEOF_GW_PORT=5500 NVMEOF_DISC_PORT=8009 NVMEOF_EXPOSE_SERVICES="${NVMEOF_IO_PORT}/tcp:nvme,${NVMEOF_GW_PORT}/tcp:grpc,${NVMEOF_DISC_PORT}/tcp:nvme-disc" -NVMEOF_GIT_REPO="https://github.com/ceph/ceph-nvmeof.git" # NVMe-oF CLI MVMEOF_CLI_VERSION="${VERSION}" @@ -47,6 +50,7 @@ SPDK_URL="https://spdk.io" SPDK_PKGDEP_ARGS="--rbd" SPDK_CONFIGURE_ARGS="--with-rbd --disable-tests --disable-unit-tests --disable-examples --enable-debug" SPDK_MAKEFLAGS= +SPDK_DISABLE_VPCLMULQDQ= SPDK_CENTOS_BASE="https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/" SPDK_CENTOS_REPO_VER="9.0-21.el9" @@ -55,10 +59,10 @@ CEPH_CLUSTER_VERSION="${CEPH_VERSION}" CEPH_VSTART_ARGS="--without-dashboard --memstore" # Demo settings -RBD_POOL="rbd" -RBD_IMAGE_NAME="demo_image" -RBD_IMAGE_SIZE="10M" -BDEV_NAME="demo_bdev" +RBD_POOL=rbd +RBD_IMAGE_NAME=demo_image +RBD_IMAGE_SIZE=10M +BDEV_NAME=demo_bdev NQN="nqn.2016-06.io.spdk:cnode1" SERIAL="SPDK00000000000001" diff --git a/.github/workflows/build-container.yml b/.github/workflows/build-container.yml index ebd58bed..19f6a1eb 100644 --- a/.github/workflows/build-container.yml +++ b/.github/workflows/build-container.yml @@ -23,7 +23,7 @@ jobs: submodules: recursive - name: Build container images - run: make build + run: make build SPDK_DISABLE_VPCLMULQDQ="yes" - name: Save container images run: | @@ -65,7 +65,7 @@ jobs: strategy: fail-fast: false matrix: - test: ["cli", "state", "multi_gateway", "server"] + test: ["cli", "state", "multi_gateway", "server", "grpc", "omap_lock"] runs-on: ubuntu-latest env: HUGEPAGES: 512 # for multi gateway test, approx 256 per gateway instance @@ -87,6 +87,27 @@ jobs: docker load < nvmeof-devel.tar docker load < vstart-cluster.tar + - name: Clear space on disk + run: | + if [[ -d /usr/share/dotnet ]]; then + /usr/bin/du -sh /usr/share/dotnet + rm -rf /usr/share/dotnet + fi + if [[ -d /opt/ghc ]]; then + /usr/bin/du -sh /opt/ghc + rm -rf /opt/ghc + fi + if [[ -d /usr/local/share/boost ]]; then + /usr/bin/du -sh /usr/local/share/boost + rm -rf /usr/local/share/boost + fi + if [[ -n "$AGENT_TOOLSDIRECTORY" ]]; then + if [[ -d "$AGENT_TOOLSDIRECTORY" ]]; then + /usr/bin/du -sh "$AGENT_TOOLSDIRECTORY" + rm -rf "$AGENT_TOOLSDIRECTORY" + fi + fi + - name: Start ceph cluster run: | make up SVC=ceph OPTS="--detach" @@ -115,11 +136,14 @@ jobs: echo "💁 ls rbd:" make exec SVC=ceph OPTS="-T" CMD="rbd ls rbd" + - name: Run protoc + run: | + make protoc + - name: Run ${{ matrix.test }} test run: | # Run tests code in current dir # Managing pytest’s output: https://docs.pytest.org/en/7.1.x/how-to/output.html - make protoc make run SVC="nvmeof-devel" OPTS="--volume=$(pwd)/tests:/src/tests --entrypoint=python3" CMD="-m pytest --show-capture=all -s --full-trace -vv -rA tests/test_${{ matrix.test }}.py" - name: Check coredump existence @@ -197,7 +221,7 @@ jobs: - name: Test run: | - make demo OPTS=-T + make demo OPTS=-T NVMEOF_CONTAINER_NAME="ceph-nvmeof_nvmeof_1" - name: Get subsystems run: | @@ -205,6 +229,7 @@ jobs: shopt -s expand_aliases eval $(make alias) nvmeof-cli get_subsystems + nvmeof-cli-ipv6 get_subsystems - name: Run bdevperf run: | @@ -261,6 +286,10 @@ jobs: discovery: needs: build + strategy: + fail-fast: false + matrix: + integration: ["container", "embedded"] runs-on: ubuntu-latest env: HUGEPAGES: 768 # 3 spdk instances @@ -285,10 +314,12 @@ jobs: docker load < bdevperf.tar - name: Start discovery controller + if: matrix.integration == 'container' run: | docker-compose up --detach discovery - name: Wait for discovery controller to be listening + if: matrix.integration == 'container' timeout-minutes: 3 run: | . .env @@ -392,7 +423,14 @@ jobs: echo "ℹ️ bdevperf start up logs" make logs SVC=bdevperf eval $(make run SVC=bdevperf OPTS="--entrypoint=env" | grep BDEVPERF_SOCKET | tr -d '\n\r' ) - ip=$(container_ip $DISC1) + + if [ "${{ matrix.integration }}" == "embedded" ]; then + ip=$(container_ip $GW1) + echo "ℹ️ Using discovery service in gateway $GW1 ip $ip" + else + ip=$(container_ip $DISC1) + echo "ℹ️ Using standalone discovery container $DISC1 ip $ip" + fi rpc="/usr/libexec/spdk/scripts/rpc.py" echo "ℹ️ bdevperf bdev_nvme_set_options" make exec SVC=bdevperf OPTS=-T CMD="$rpc -v -s $BDEVPERF_SOCKET bdev_nvme_set_options -r -1" @@ -412,14 +450,14 @@ jobs: id: check_coredumps uses: andstor/file-existence-action@20b4d2e596410855db8f9ca21e96fbe18e12930b # v2, pinned to SHA for security reasons with: - files: "/var/lib/systemd/coredump/*" + files: "/tmp/coredump/core.*" - name: Upload demo core dumps if: steps.check_coredumps.outputs.files_exists == 'true' uses: actions/upload-artifact@v1 with: name: ceph_nvmeof_demo_cores-${{ github.run_number }} - path: /var/lib/systemd/coredump/* + path: /tmp/coredump/core.* - name: Display logs if: success() || failure() @@ -437,6 +475,9 @@ jobs: runs-on: ubuntu-latest steps: + - name: Checkout code + uses: actions/checkout@v3 + - name: Download container images uses: actions/download-artifact@v3 with: diff --git a/Dockerfile b/Dockerfile index 38f23621..c001fc4e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -52,7 +52,19 @@ ARG NVMEOF_NAME \ BUILD_DATE \ NVMEOF_GIT_REPO \ NVMEOF_GIT_BRANCH \ - NVMEOF_GIT_COMMIT + NVMEOF_GIT_COMMIT \ + NVMEOF_SPDK_VERSION \ + NVMEOF_CEPH_VERSION \ + NVMEOF_GIT_MODIFIED_FILES + +ENV NVMEOF_VERSION="${NVMEOF_VERSION}" \ + NVMEOF_GIT_REPO="${NVMEOF_GIT_REPO}" \ + NVMEOF_GIT_BRANCH="${NVMEOF_GIT_BRANCH}" \ + NVMEOF_GIT_COMMIT="${NVMEOF_GIT_COMMIT}" \ + BUILD_DATE="${BUILD_DATE}" \ + NVMEOF_SPDK_VERSION="${NVMEOF_SPDK_VERSION}" \ + NVMEOF_CEPH_VERSION="${NVMEOF_CEPH_VERSION}" \ + NVMEOF_GIT_MODIFIED_FILES="${NVMEOF_GIT_MODIFIED_FILES}" # Generic labels LABEL name="$NVMEOF_NAME" \ diff --git a/Dockerfile.ceph b/Dockerfile.ceph index 3a166983..d452e494 100644 --- a/Dockerfile.ceph +++ b/Dockerfile.ceph @@ -38,9 +38,6 @@ RUN rpm -vih https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch. RUN rpm --import 'https://download.ceph.com/keys/release.asc' RUN cd /etc/yum.repos.d/ && curl -O https://copr.fedorainfracloud.org/coprs/ceph/el9/repo/epel-9/ceph-el9-epel-9.repo -## WORKAROUND: remove when https://tracker.ceph.com/issues/61882 is fixed -RUN rpm -vih "https://buildlogs.centos.org/centos/9-stream/storage/x86_64/ceph-quincy/Packages/t/thrift-0.14.0-7.el9s.x86_64.rpm" - RUN \ --mount=type=cache,target=/var/cache/microdnf \ microdnf install -y \ diff --git a/Dockerfile.spdk b/Dockerfile.spdk index 76fb7b9e..ab46675b 100644 --- a/Dockerfile.spdk +++ b/Dockerfile.spdk @@ -24,7 +24,8 @@ EOF ARG SPDK_PKGDEP_ARGS \ SPDK_CONFIGURE_ARGS \ - SPDK_MAKEFLAGS + SPDK_MAKEFLAGS \ + SPDK_DISABLE_VPCLMULQDQ WORKDIR /src COPY . . @@ -38,9 +39,19 @@ RUN \ && dnf install -y \ rpm-build \ git-core \ + && rpm -vih https://buildlogs.centos.org/centos/9-stream/storage/x86_64/ceph-reef/Packages/t/thrift-0.15.0-3.el9s.x86_64.rpm \ && scripts/pkgdep.sh $SPDK_PKGDEP_ARGS \ && dnf update -y +# Disable RDSEED, see https://github.com/ceph/ceph-nvmeof/issues/259 +RUN \ + sed -i "s/^\( \+'RDSEED'\)/#\1/" dpdk/config/x86/meson.build +# If we run from github disable also VPCLMULQDQ as some of github's runners don't have it +RUN \ + if [[ -n "$SPDK_DISABLE_VPCLMULQDQ" ]]; then sed -i "s/^\( \+'VPCLMULQDQ'\)/#\1/" dpdk/config/x86/meson.build ; fi +RUN \ + cat dpdk/config/x86/meson.build + RUN \ --mount=type=cache,target=/var/cache/dnf \ --mount=type=cache,target=/var/lib/dnf \ @@ -114,6 +125,7 @@ RUN \ --mount=type=cache,target=/var/lib/dnf \ rpm -vih $SPDK_CENTOS_BASE/centos-stream-repos-$SPDK_CENTOS_REPO_VER.noarch.rpm \ $SPDK_CENTOS_BASE/centos-gpg-keys-$SPDK_CENTOS_REPO_VER.noarch.rpm \ + https://buildlogs.centos.org/centos/9-stream/storage/x86_64/ceph-reef/Packages/t/thrift-0.15.0-3.el9s.x86_64.rpm \ && dnf install -y /rpm/$(uname -m)/*.rpm \ && dnf update -y diff --git a/Makefile b/Makefile index db01c41a..dcba0c29 100644 --- a/Makefile +++ b/Makefile @@ -24,12 +24,14 @@ setup: ## Configure huge-pages (requires sudo/root password) build pull logs: SVC ?= spdk bdevperf nvmeof nvmeof-devel nvmeof-cli discovery ceph +build: export NVMEOF_GIT_REPO != git remote get-url origin build: export NVMEOF_GIT_BRANCH != git name-rev --name-only HEAD build: export NVMEOF_GIT_COMMIT != git rev-parse HEAD build: export SPDK_GIT_REPO != git -C spdk remote get-url origin build: export SPDK_GIT_BRANCH != git -C spdk name-rev --name-only HEAD build: export SPDK_GIT_COMMIT != git rev-parse HEAD:spdk -build: export BUILD_DATE != date -u +"%Y-%m-%dT%H:%M:%SZ" +build: export BUILD_DATE != date -u +"%Y-%m-%d %H:%M:%S %Z" +build: export NVMEOF_GIT_MODIFIED_FILES != git status -s | grep -e "^ *M" | sed 's/^ *M //' | xargs up: ## Launch services up: SVC ?= ceph nvmeof ## Services diff --git a/ceph-nvmeof.conf b/ceph-nvmeof.conf index f05cd065..07666a58 100644 --- a/ceph-nvmeof.conf +++ b/ceph-nvmeof.conf @@ -17,7 +17,11 @@ state_update_notify = True state_update_interval_sec = 5 #min_controller_id = 1 #max_controller_id = 65519 -enable_discovery_controller = false +enable_spdk_discovery_controller = False +#omap_file_lock_duration = 60 +#omap_file_lock_retries = 15 +#omap_file_lock_retry_sleep_interval = 5 +#omap_file_update_reloads = 10 [discovery] addr = 0.0.0.0 @@ -37,7 +41,8 @@ client_cert = ./client.crt [spdk] tgt_path = /usr/local/bin/nvmf_tgt -rpc_socket = /var/tmp/spdk.sock +#rpc_socket_dir = /var/tmp/ +#rpc_socket_name = spdk.sock #tgt_cmd_extra_args = --env-context="--no-huge -m1024" --iova-mode=va timeout = 60.0 log_level = WARN diff --git a/control/__main__.py b/control/__main__.py index 78b49536..2544fd71 100644 --- a/control/__main__.py +++ b/control/__main__.py @@ -31,6 +31,7 @@ args = parser.parse_args() config = GatewayConfig(args.config) + config.dump_config_file(logger) with GatewayServer(config) as gateway: gateway.serve() gateway.keep_alive() diff --git a/control/cli.py b/control/cli.py index e7c022a1..97fbc317 100644 --- a/control/cli.py +++ b/control/cli.py @@ -17,7 +17,7 @@ from .proto import gateway_pb2_grpc as pb2_grpc from .proto import gateway_pb2 as pb2 - +from .config import GatewayConfig def argument(*name_or_flags, **kwargs): """Helper function to format arguments for argparse command decorator.""" @@ -124,7 +124,9 @@ def stub(self): def connect(self, host, port, client_key, client_cert, server_cert): """Connects to server and sets stub.""" - server = "{}:{}".format(host, port) + # We need to enclose IPv6 addresses in brackets before concatenating a colon and port number to it + host = GatewayConfig.escape_address_if_ipv6(host) + server = f"{host}:{port}" if client_key and client_cert: # Create credentials for mutual TLS and a secure channel @@ -187,12 +189,16 @@ def delete_bdev(self, args): argument("-n", "--subnqn", help="Subsystem NQN", required=True), argument("-s", "--serial", help="Serial number", required=False), argument("-m", "--max-namespaces", help="Maximum number of namespaces", type=int, default=0, required=False), + argument("-a", "--ana-reporting", help="Enable ANA reporting", action='store_true', required=False), + argument("-t", "--enable-ha", help="Enable automatic HA", action='store_true', required=False), ]) def create_subsystem(self, args): """Creates a subsystem.""" req = pb2.create_subsystem_req(subsystem_nqn=args.subnqn, serial_number=args.serial, - max_namespaces=args.max_namespaces) + max_namespaces=args.max_namespaces, + ana_reporting=args.ana_reporting, + enable_ha=args.enable_ha) ret = self.stub.create_subsystem(req) self.logger.info(f"Created subsystem {args.subnqn}: {ret.status}") @@ -209,15 +215,20 @@ def delete_subsystem(self, args): argument("-n", "--subnqn", help="Subsystem NQN", required=True), argument("-b", "--bdev", help="Bdev name", required=True), argument("-i", "--nsid", help="Namespace ID", type=int), + argument("-a", "--anagrpid", help="ANA group ID", type=int), ]) def add_namespace(self, args): """Adds a namespace to a subsystem.""" + if args.anagrpid == 0: + args.anagrpid = 1 + req = pb2.add_namespace_req(subsystem_nqn=args.subnqn, bdev_name=args.bdev, - nsid=args.nsid) + nsid=args.nsid, + anagrpid=args.anagrpid) ret = self.stub.add_namespace(req) self.logger.info( - f"Added namespace {ret.nsid} to {args.subnqn}: {ret.status}") + f"Added namespace {ret.nsid} to {args.subnqn}, ANA group id {args.anagrpid} : {ret.status}") @cli.cmd([ argument("-n", "--subnqn", help="Subsystem NQN", required=True), diff --git a/control/config.py b/control/config.py index cd6277a8..ccec4fc6 100644 --- a/control/config.py +++ b/control/config.py @@ -17,6 +17,8 @@ class GatewayConfig: config: Config parser object """ def __init__(self, conffile): + self.filepath = conffile + self.conffile_logged = False with open(conffile) as f: self.config = configparser.ConfigParser() self.config.read_file(f) @@ -44,3 +46,28 @@ def getint_with_default(self, section, param, value): def getfloat_with_default(self, section, param, value): return self.config.getfloat(section, param, fallback=value) + + def dump_config_file(self, logger): + if self.conffile_logged: + return + + try: + logger.info(f"Using configuration file {self.filepath}") + with open(self.filepath) as f: + logger.info( + f"====================================== Configuration file content ======================================") + for line in f: + line = line.rstrip() + logger.info(f"{line}") + logger.info( + f"========================================================================================================") + self.conffile_logged = True + except Exception: + pass + + # We need to enclose IPv6 addresses in brackets before concatenating a colon and port number to it + def escape_address_if_ipv6(addr) -> str: + ret_addr = addr + if ":" in addr and not addr.strip().startswith("["): + ret_addr = f"[{addr}]" + return ret_addr diff --git a/control/discovery.py b/control/discovery.py index d91ee23d..652cbef8 100644 --- a/control/discovery.py +++ b/control/discovery.py @@ -304,35 +304,27 @@ class DiscoveryService: discovery_port: Discovery controller's listening port """ - BDEV_PREFIX = "bdev_" - NAMESPACE_PREFIX = "namespace_" - SUBSYSTEM_PREFIX = "subsystem_" - HOST_PREFIX = "host_" - LISTENER_PREFIX = "listener_" + DISCOVERY_NQN = "nqn.2014-08.org.nvmexpress.discovery" def __init__(self, config): self.version = 1 self.config = config self.lock = threading.Lock() + self.omap_state = OmapGatewayState(self.config) self.logger = logging.getLogger(__name__) - log_level = self.config.get("discovery", "debug") - self.logger.setLevel(level=int(log_level)) + log_level = self.config.getint_with_default("discovery", "debug", 20) + self.logger.setLevel(level=log_level) - gateway_group = self.config.get("gateway", "group") + gateway_group = self.config.get_with_default("gateway", "group", "") self.omap_name = f"nvmeof.{gateway_group}.state" \ if gateway_group else "nvmeof.state" self.logger.info(f"log pages info from omap: {self.omap_name}") - ceph_pool = self.config.get("ceph", "pool") - ceph_conf = self.config.get("ceph", "config_file") - conn = rados.Rados(conffile=ceph_conf) - conn.connect() - self.ioctx = conn.open_ioctx(ceph_pool) - - self.discovery_addr = self.config.get("discovery", "addr") - self.discovery_port = self.config.get("discovery", "port") - if self.discovery_addr == '' or self.discovery_port == '': + self.ioctx = self.omap_state.open_rados_connection(config) + self.discovery_addr = self.config.get_with_default("discovery", "addr", "0.0.0.0") + self.discovery_port = self.config.get_with_default("discovery", "port", "8009") + if not self.discovery_addr or not self.discovery_port: self.logger.error("discovery addr/port are empty.") assert 0 self.logger.info(f"discovery addr: {self.discovery_addr} port: {self.discovery_port}") @@ -344,10 +336,7 @@ def __init__(self, config): def _read_all(self) -> Dict[str, str]: """Reads OMAP and returns dict of all keys and values.""" - with rados.ReadOpCtx() as read_op: - iter, _ = self.ioctx.get_omap_vals(read_op, "", "", -1) - self.ioctx.operate_read_op(read_op, self.omap_name) - omap_dict = dict(iter) + omap_dict = self.omap_state.get_state() return omap_dict def _get_vals(self, omap_dict, prefix): @@ -675,8 +664,8 @@ def reply_get_log_page(self, conn, data, cmd_id): self.logger.debug("handle get log page request.") self_conn = self.conn_vals[conn.fileno()] my_omap_dict = self._read_all() - listeners = self._get_vals(my_omap_dict, self.LISTENER_PREFIX) - hosts = self._get_vals(my_omap_dict, self.HOST_PREFIX) + listeners = self._get_vals(my_omap_dict, GatewayState.LISTENER_PREFIX) + hosts = self._get_vals(my_omap_dict, GatewayState.HOST_PREFIX) if len(self_conn.nvmeof_connect_data_hostnqn) != 256: self.logger.error("error hostnqn.") return -1 @@ -744,12 +733,12 @@ def reply_get_log_page(self, conn, data, cmd_id): log_entry_counter = 0 while log_entry_counter < len(allow_listeners): log_entry = DiscoveryLogEntry() - trtype = TRANSPORT_TYPES[allow_listeners[log_entry_counter]["trtype"]] + trtype = TRANSPORT_TYPES[allow_listeners[log_entry_counter]["trtype"].upper()] if trtype is None: self.logger.error("unsupported transport type") else: log_entry.trtype = trtype - adrfam = ADRFAM_TYPES[allow_listeners[log_entry_counter]["adrfam"]] + adrfam = ADRFAM_TYPES[allow_listeners[log_entry_counter]["adrfam"].lower()] if adrfam is None: self.logger.error("unsupported adress family") else: @@ -1030,10 +1019,9 @@ def start_service(self): t = threading.Thread(target=self.handle_timeout) t.start() - omap_state = OmapGatewayState(self.config) local_state = LocalGatewayState() gateway_state = GatewayStateHandler(self.config, local_state, - omap_state, self._state_notify_update) + self.omap_state, self._state_notify_update) gateway_state.start_update() try: diff --git a/control/grpc.py b/control/grpc.py index dd12fda8..e58bd960 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -13,6 +13,9 @@ import uuid import random import logging +import os +import threading +import errno import spdk.rpc.bdev as rpc_bdev import spdk.rpc.nvmf as rpc_nvmf @@ -21,7 +24,11 @@ from google.protobuf import json_format from .proto import gateway_pb2 as pb2 from .proto import gateway_pb2_grpc as pb2_grpc +from .config import GatewayConfig +from .discovery import DiscoveryService +from .state import GatewayState +MAX_ANA_GROUPS = 4 class GatewayService(pb2_grpc.GatewayServicer): """Implements gateway service interface. @@ -36,11 +43,38 @@ class GatewayService(pb2_grpc.GatewayServicer): spdk_rpc_client: Client of SPDK RPC server """ - def __init__(self, config, gateway_state, spdk_rpc_client) -> None: + def __init__(self, config, gateway_state, omap_lock, spdk_rpc_client) -> None: """Constructor""" self.logger = logging.getLogger(__name__) + ver = os.getenv("NVMEOF_VERSION") + if ver: + self.logger.info(f"Using NVMeoF gateway version {ver}") + spdk_ver = os.getenv("NVMEOF_SPDK_VERSION") + if spdk_ver: + self.logger.info(f"Using SPDK version {spdk_ver}") + ceph_ver = os.getenv("NVMEOF_CEPH_VERSION") + if ceph_ver: + self.logger.info(f"Using vstart cluster version based on {ceph_ver}") + build_date = os.getenv("BUILD_DATE") + if build_date: + self.logger.info(f"NVMeoF gateway built on: {build_date}") + git_rep = os.getenv("NVMEOF_GIT_REPO") + if git_rep: + self.logger.info(f"NVMeoF gateway Git repository: {git_rep}") + git_branch = os.getenv("NVMEOF_GIT_BRANCH") + if git_branch: + self.logger.info(f"NVMeoF gateway Git branch: {git_branch}") + git_commit = os.getenv("NVMEOF_GIT_COMMIT") + if git_commit: + self.logger.info(f"NVMeoF gateway Git commit: {git_commit}") + git_modified = os.getenv("NVMEOF_GIT_MODIFIED_FILES") + if git_modified: + self.logger.info(f"NVMeoF gateway uncommitted modified files: {git_modified}") self.config = config + config.dump_config_file(self.logger) + self.rpc_lock = threading.Lock() self.gateway_state = gateway_state + self.omap_lock = omap_lock self.spdk_rpc_client = spdk_rpc_client self.gateway_name = self.config.get("gateway", "name") if not self.gateway_name: @@ -87,7 +121,14 @@ def _alloc_cluster(self) -> str: ) return name - def create_bdev(self, request, context=None): + def _grpc_function_with_lock(self, func, request, context): + with self.rpc_lock: + return func(request, context) + + def execute_grpc_function(self, func, request, context): + return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context) + + def create_bdev_safe(self, request, context=None): """Creates a bdev from an RBD image.""" if not request.uuid: @@ -96,412 +137,635 @@ def create_bdev(self, request, context=None): name = request.uuid if not request.bdev_name else request.bdev_name self.logger.info(f"Received request to create bdev {name} from" f" {request.rbd_pool_name}/{request.rbd_image_name}" - f" with block size {request.block_size}") - try: - bdev_name = rpc_bdev.bdev_rbd_create( - self.spdk_rpc_client, - name=name, - cluster_name=self._get_cluster(), - pool_name=request.rbd_pool_name, - rbd_name=request.rbd_image_name, - block_size=request.block_size, - uuid=request.uuid, - ) - self.logger.info(f"create_bdev: {bdev_name}") - except Exception as ex: - self.logger.error(f"create_bdev failed with: \n {ex}") + f" with block size {request.block_size}, context: {context}") + with self.omap_lock(context=context): + try: + bdev_name = rpc_bdev.bdev_rbd_create( + self.spdk_rpc_client, + name=name, + cluster_name=self._get_cluster(), + pool_name=request.rbd_pool_name, + rbd_name=request.rbd_image_name, + block_size=request.block_size, + uuid=request.uuid, + ) + self.logger.info(f"create_bdev: {bdev_name}") + except Exception as ex: + self.logger.error(f"create_bdev failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.bdev() + if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.bdev() + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_bdev(bdev_name, json_req) + except Exception as ex: + self.logger.error( + f"Error persisting create_bdev {bdev_name}: {ex}") + raise - if context: - # Update gateway state + return pb2.bdev(bdev_name=bdev_name, status=True) + + def create_bdev(self, request, context=None): + return self.execute_grpc_function(self.create_bdev_safe, request, context) + + def get_bdev_namespaces(self, bdev_name) -> list: + ns_list = [] + local_state_dict = self.gateway_state.local.get_state() + for key, val in local_state_dict.items(): + if not key.startswith(self.gateway_state.local.NAMESPACE_PREFIX): + continue try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_bdev(bdev_name, json_req) + ns = json.loads(val) + if ns["bdev_name"] == bdev_name: + nsid = ns["nsid"] + nqn = ns["subsystem_nqn"] + ns_list.insert(0, {"nqn" : nqn, "nsid" : nsid}) except Exception as ex: - self.logger.error( - f"Error persisting create_bdev {bdev_name}: {ex}") - raise + self.logger.error(f"Got exception trying to get bdev {bdev_name} namespaces: {ex}") + pass - return pb2.bdev(bdev_name=bdev_name, status=True) + return ns_list - def delete_bdev(self, request, context=None): - """Deletes a bdev.""" + def delete_bdev_handle_exception(self, context, ex): + self.logger.error(f"delete_bdev failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() - self.logger.info(f"Received request to delete bdev {request.bdev_name}") - use_excep = None - req_get_subsystems = pb2.get_subsystems_req() - ret = self.get_subsystems(req_get_subsystems, context) - subsystems = json.loads(ret.subsystems) - for subsystem in subsystems: - for namespace in subsystem['namespaces']: - if namespace['bdev_name'] == request.bdev_name: - # We found a namespace still using this bdev. If --force was used we will try to remove this namespace. - # Otherwise fail with EBUSY - if request.force: - self.logger.info(f"Will remove namespace {namespace['nsid']} from {subsystem['nqn']} as it is using bdev {request.bdev_name}") - try: - req_rm_ns = pb2.remove_namespace_req(subsystem_nqn=subsystem['nqn'], nsid=namespace['nsid']) - ret = self.remove_namespace(req_rm_ns, context) - self.logger.info( - f"Removed namespace {namespace['nsid']} from {subsystem['nqn']}: {ret.status}") - except Exception as ex: - self.logger.error(f"Error removing namespace {namespace['nsid']} from {subsystem['nqn']}, will delete bdev {request.bdev_name} anyway: {ex}") - pass - else: - self.logger.error(f"Namespace {namespace['nsid']} from {subsystem['nqn']} is still using bdev {request.bdev_name}. You need to either remove it or use the '--force' command line option") - req = {"name": request.bdev_name, "method": "bdev_rbd_delete", "req_id": 0} - ret = {"code": -16, "message": "Device or resource busy"} - msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), - "Got JSON-RPC error response", - "response:", - json.dumps(ret, indent=2)]) - use_excep = Exception(msg) + def delete_bdev_safe(self, request, context=None): + """Deletes a bdev.""" - try: - if use_excep: - raise use_excep - ret = rpc_bdev.bdev_rbd_delete( - self.spdk_rpc_client, - request.bdev_name, - ) - self.logger.info(f"delete_bdev {request.bdev_name}: {ret}") - except Exception as ex: - self.logger.error(f"delete_bdev failed with: \n {ex}") + self.logger.info(f"Received request to delete bdev {request.bdev_name}, context: {context}") + ns_list = [] + with self.omap_lock(context=context): if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + ns_list = self.get_bdev_namespaces(request.bdev_name) + for namespace in ns_list: + # We found a namespace still using this bdev. If --force was used we will try to remove the namespace from OMAP. + # Otherwise fail with EBUSY + try: + ns_nsid = namespace["nsid"] + ns_nqn = namespace["nqn"] + except Exception as ex: + self.logger.error(f"Got exception while trying to remove namespace: {namespace} which stil uses bdev {request.bdev_name}: {ex}") + continue + + if request.force: + self.logger.info(f"Will remove namespace {ns_nsid} from {ns_nqn} as it is using bdev {request.bdev_name}") + try: + self.gateway_state.remove_namespace(ns_nqn, str(ns_nsid)) + self.logger.info(f"Removed namespace {ns_nsid} from {ns_nqn}") + except Exception as ex: + self.logger.error(f"Error removing namespace {ns_nsid} from {ns_nqn}, will delete bdev {request.bdev_name} anyway: {ex}") + pass + else: + self.logger.error(f"Namespace {ns_nsid} from {ns_nqn} is still using bdev {request.bdev_name}. You need to either remove it or use the '--force' command line option") + req = {"name": request.bdev_name, "method": "bdev_rbd_delete", "req_id": 0} + ret = {"code": -errno.EBUSY, "message": os.strerror(errno.EBUSY)} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent = 2), + "Got JSON-RPC error response", "response:", json.dumps(ret, indent = 2)]) + return self.delete_bdev_handle_exception(context, Exception(msg)) - if context: - # Update gateway state try: - self.gateway_state.remove_bdev(request.bdev_name) + ret = rpc_bdev.bdev_rbd_delete( + self.spdk_rpc_client, + request.bdev_name, + ) + self.logger.info(f"delete_bdev {request.bdev_name}: {ret}") except Exception as ex: - self.logger.error( - f"Error persisting delete_bdev {request.bdev_name}: {ex}") - raise + return self.delete_bdev_handle_exception(context, ex) + + if context: + # Update gateway state + try: + self.gateway_state.remove_bdev(request.bdev_name) + except Exception as ex: + self.logger.error( + f"Error persisting delete_bdev {request.bdev_name}: {ex}") + raise return pb2.req_status(status=ret) - def create_subsystem(self, request, context=None): + def delete_bdev(self, request, context=None): + return self.execute_grpc_function(self.delete_bdev_safe, request, context) + + def is_discovery_nqn(self, nqn) -> bool: + return nqn == DiscoveryService.DISCOVERY_NQN + + def serial_number_already_used(self, context, serial) -> str: + if not context: + return None + state = self.gateway_state.local.get_state() + for key, val in state.items(): + if not key.startswith(self.gateway_state.local.SUBSYSTEM_PREFIX): + continue + try: + subsys = json.loads(val) + sn = subsys["serial_number"] + if serial == sn: + return subsys["subsystem_nqn"] + except Exception: + self.logger.warning("Got exception while parsing {val}: {ex}") + continue + return None + + def create_subsystem_safe(self, request, context=None): """Creates a subsystem.""" self.logger.info( - f"Received request to create subsystem {request.subsystem_nqn}") + f"Received request to create subsystem {request.subsystem_nqn}, ana reporting: {request.ana_reporting}, context: {context}") + + if self.is_discovery_nqn(request.subsystem_nqn): + raise Exception(f"Can't create a discovery subsystem") + min_cntlid = self.config.getint_with_default("gateway", "min_controller_id", 1) max_cntlid = self.config.getint_with_default("gateway", "max_controller_id", 65519) if not request.serial_number: random.seed() randser = random.randint(2, 99999999999999) request.serial_number = f"SPDK{randser}" - try: - ret = rpc_nvmf.nvmf_create_subsystem( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - serial_number=request.serial_number, - max_namespaces=request.max_namespaces, - min_cntlid=min_cntlid, - max_cntlid=max_cntlid, - ) - self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"create_subsystem failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + self.logger.info(f"No serial number specified, will use {request.serial_number}") - if context: - # Update gateway state + with self.omap_lock(context=context): try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_subsystem(request.subsystem_nqn, - json_req) + subsys_using_serial = self.serial_number_already_used(context, request.serial_number) + if subsys_using_serial: + self.logger.error(f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}") + req = {"subsystem_nqn": request.subsystem_nqn, + "serial_number": request.serial_number, + "max_namespaces": request.max_namespaces, + "ana_reporting": request.ana_reporting, + "enable_ha": request.enable_ha, + "method": "nvmf_create_subsystem", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + raise Exception(msg) + ret = rpc_nvmf.nvmf_create_subsystem( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + serial_number=request.serial_number, + max_namespaces=request.max_namespaces, + min_cntlid=min_cntlid, + max_cntlid=max_cntlid, + ana_reporting = request.ana_reporting, + ) + self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Error persisting create_subsystem" - f" {request.subsystem_nqn}: {ex}") - raise + self.logger.error(f"create_subsystem failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_subsystem(request.subsystem_nqn, + json_req) + except Exception as ex: + self.logger.error(f"Error persisting create_subsystem" + f" {request.subsystem_nqn}: {ex}") + raise return pb2.req_status(status=ret) - def delete_subsystem(self, request, context=None): + def create_subsystem(self, request, context=None): + return self.execute_grpc_function(self.create_subsystem_safe, request, context) + + def delete_subsystem_safe(self, request, context=None): """Deletes a subsystem.""" self.logger.info( - f"Received request to delete subsystem {request.subsystem_nqn}") - try: - ret = rpc_nvmf.nvmf_delete_subsystem( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - ) - self.logger.info(f"delete_subsystem {request.subsystem_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"delete_subsystem failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + f"Received request to delete subsystem {request.subsystem_nqn}, context: {context}") - if context: - # Update gateway state + if self.is_discovery_nqn(request.subsystem_nqn): + raise Exception(f"Can't delete a discovery subsystem") + + with self.omap_lock(context=context): try: - self.gateway_state.remove_subsystem(request.subsystem_nqn) + ret = rpc_nvmf.nvmf_delete_subsystem( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + ) + self.logger.info(f"delete_subsystem {request.subsystem_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Error persisting delete_subsystem" - f" {request.subsystem_nqn}: {ex}") - raise + self.logger.error(f"delete_subsystem failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_subsystem(request.subsystem_nqn) + except Exception as ex: + self.logger.error(f"Error persisting delete_subsystem" + f" {request.subsystem_nqn}: {ex}") + raise return pb2.req_status(status=ret) - def add_namespace(self, request, context=None): - """Adds a namespace to a subsystem.""" + def delete_subsystem(self, request, context=None): + return self.execute_grpc_function(self.delete_subsystem_safe, request, context) + def add_namespace_safe(self, request, context=None): + """Adds a namespace to a subsystem.""" + self.logger.info(f"Received request to add {request.bdev_name} to" - f" {request.subsystem_nqn}") - try: - nsid = rpc_nvmf.nvmf_subsystem_add_ns( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - bdev_name=request.bdev_name, - nsid=request.nsid, - ) - self.logger.info(f"add_namespace: {nsid}") - except Exception as ex: - self.logger.error(f"add_namespace failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.nsid() + f" {request.subsystem_nqn}, context: {context}") - if context: - # Update gateway state + if request.anagrpid > MAX_ANA_GROUPS: + raise Exception(f"Error group ID {request.anagrpid} is more than configured maximum {MAX_ANA_GROUPS}") + + if self.is_discovery_nqn(request.subsystem_nqn): + raise Exception(f"Can't add a namespace to a discovery subsystem") + + with self.omap_lock(context=context): try: - if not request.nsid: - request.nsid = nsid - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_namespace(request.subsystem_nqn, - str(nsid), json_req) + nsid = rpc_nvmf.nvmf_subsystem_add_ns( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + bdev_name=request.bdev_name, + nsid=request.nsid, + anagrpid=request.anagrpid, + ) + self.logger.info(f"add_namespace: {nsid}") except Exception as ex: - self.logger.error( - f"Error persisting add_namespace {nsid}: {ex}") - raise + self.logger.error(f"add_namespace failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.nsid_status() - return pb2.nsid(nsid=nsid, status=True) + if context: + # Update gateway state + try: + if not request.nsid: + request.nsid = nsid + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_namespace(request.subsystem_nqn, + str(nsid), json_req) + except Exception as ex: + self.logger.error( + f"Error persisting add_namespace {nsid}: {ex}") + raise + + return pb2.nsid_status(nsid=nsid, status=True) - def remove_namespace(self, request, context=None): + def add_namespace(self, request, context=None): + return self.execute_grpc_function(self.add_namespace_safe, request, context) + + def remove_namespace_safe(self, request, context=None): """Removes a namespace from a subsystem.""" - self.logger.info(f"Received request to remove {request.nsid} from" - f" {request.subsystem_nqn}") - try: - ret = rpc_nvmf.nvmf_subsystem_remove_ns( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - nsid=request.nsid, - ) - self.logger.info(f"remove_namespace {request.nsid}: {ret}") - except Exception as ex: - self.logger.error(f"remove_namespace failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + self.logger.info(f"Received request to remove nsid {request.nsid} from" + f" {request.subsystem_nqn}, context: {context}") - if context: - # Update gateway state + if self.is_discovery_nqn(request.subsystem_nqn): + raise Exception(f"Can't remove a namespace from a discovery subsystem") + + with self.omap_lock(context=context): try: - self.gateway_state.remove_namespace(request.subsystem_nqn, - str(request.nsid)) + ret = rpc_nvmf.nvmf_subsystem_remove_ns( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + nsid=request.nsid, + ) + self.logger.info(f"remove_namespace {request.nsid}: {ret}") except Exception as ex: - self.logger.error( - f"Error persisting remove_namespace {request.nsid}: {ex}") - raise + self.logger.error(f"remove_namespace failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_namespace(request.subsystem_nqn, + str(request.nsid)) + except Exception as ex: + self.logger.error( + f"Error persisting remove_namespace {request.nsid}: {ex}") + raise return pb2.req_status(status=ret) - def add_host(self, request, context=None): + def remove_namespace(self, request, context=None): + return self.execute_grpc_function(self.remove_namespace_safe, request, context) + + def matching_host_exists(self, context, subsys_nqn, host_nqn) -> bool: + if not context: + return False + host_key = GatewayState.build_host_key(subsys_nqn, host_nqn) + state = self.gateway_state.local.get_state() + if state.get(host_key): + return True + else: + return False + + def add_host_safe(self, request, context=None): """Adds a host to a subsystem.""" - try: - if request.host_nqn == "*": # Allow any host access to subsystem - self.logger.info(f"Received request to allow any host to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_allow_any_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - disable=False, - ) - self.logger.info(f"add_host *: {ret}") - else: # Allow single host access to subsystem - self.logger.info( - f"Received request to add host {request.host_nqn} to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_add_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - host=request.host_nqn, - ) - self.logger.info(f"add_host {request.host_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"add_host failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + if self.is_discovery_nqn(request.subsystem_nqn): + raise Exception(f"Can't allow a host to a discovery subsystem") - if context: - # Update gateway state + if self.is_discovery_nqn(request.host_nqn): + raise Exception(f"Can't use a discovery NQN as host NQN") + + with self.omap_lock(context=context): try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_host(request.subsystem_nqn, - request.host_nqn, json_req) + host_already_exist = self.matching_host_exists(context, request.subsystem_nqn, request.host_nqn) + if host_already_exist: + if request.host_nqn == "*": + self.logger.error(f"All hosts already allowed to {request.subsystem_nqn}") + req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn, + "method": "nvmf_subsystem_allow_any_host", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"All hosts already allowed to {request.subsystem_nqn}"} + else: + self.logger.error(f"Host {request.host_nqn} already added to {request.subsystem_nqn}") + req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn, + "method": "nvmf_subsystem_add_host", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"Host {request.host_nqn} already added to {request.subsystem_nqn}"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + raise Exception(msg) + if request.host_nqn == "*": # Allow any host access to subsystem + self.logger.info(f"Received request to allow any host to" + f" {request.subsystem_nqn}, context: {context}") + ret = rpc_nvmf.nvmf_subsystem_allow_any_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + disable=False, + ) + self.logger.info(f"add_host *: {ret}") + else: # Allow single host access to subsystem + self.logger.info( + f"Received request to add host {request.host_nqn} to" + f" {request.subsystem_nqn}, context: {context}") + ret = rpc_nvmf.nvmf_subsystem_add_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + host=request.host_nqn, + ) + self.logger.info(f"add_host {request.host_nqn}: {ret}") except Exception as ex: - self.logger.error( - f"Error persisting add_host {request.host_nqn}: {ex}") - raise + self.logger.error(f"add_host failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_host(request.subsystem_nqn, + request.host_nqn, json_req) + except Exception as ex: + self.logger.error( + f"Error persisting add_host {request.host_nqn}: {ex}") + raise return pb2.req_status(status=ret) - def remove_host(self, request, context=None): + def add_host(self, request, context=None): + return self.execute_grpc_function(self.add_host_safe, request, context) + + def remove_host_safe(self, request, context=None): """Removes a host from a subsystem.""" - try: - if request.host_nqn == "*": # Disable allow any host access - self.logger.info( - f"Received request to disable any host access to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_allow_any_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - disable=True, - ) - self.logger.info(f"remove_host *: {ret}") - else: # Remove single host access to subsystem - self.logger.info( - f"Received request to remove host_{request.host_nqn} from" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_remove_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - host=request.host_nqn, - ) - self.logger.info(f"remove_host {request.host_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"remove_host failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + if self.is_discovery_nqn(request.subsystem_nqn): + raise Exception(f"Can't remove a host from a discovery subsystem") - if context: - # Update gateway state + if self.is_discovery_nqn(request.host_nqn): + raise Exception(f"Can't use a discovery NQN as host NQN") + + with self.omap_lock(context=context): try: - self.gateway_state.remove_host(request.subsystem_nqn, - request.host_nqn) + if request.host_nqn == "*": # Disable allow any host access + self.logger.info( + f"Received request to disable any host access to" + f" {request.subsystem_nqn}, context: {context}") + ret = rpc_nvmf.nvmf_subsystem_allow_any_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + disable=True, + ) + self.logger.info(f"remove_host *: {ret}") + else: # Remove single host access to subsystem + self.logger.info( + f"Received request to remove host_{request.host_nqn} from" + f" {request.subsystem_nqn}, context: {context}") + ret = rpc_nvmf.nvmf_subsystem_remove_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + host=request.host_nqn, + ) + self.logger.info(f"remove_host {request.host_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Error persisting remove_host: {ex}") - raise + self.logger.error(f"remove_host failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_host(request.subsystem_nqn, + request.host_nqn) + except Exception as ex: + self.logger.error(f"Error persisting remove_host: {ex}") + raise return pb2.req_status(status=ret) - def create_listener(self, request, context=None): - """Creates a listener for a subsystem at a given IP/Port.""" + def remove_host(self, request, context=None): + return self.execute_grpc_function(self.remove_host_safe, request, context) + + def matching_listener_exists(self, context, nqn, gw_name, trtype, traddr, trsvcid) -> bool: + if not context: + return False + listener_key = GatewayState.build_listener_key(nqn, gw_name, trtype, traddr, trsvcid) + state = self.gateway_state.local.get_state() + if state.get(listener_key): + return True + else: + return False + def create_listener_safe(self, request, context=None): + """Creates a listener for a subsystem at a given IP/Port.""" ret = True + traddr = GatewayConfig.escape_address_if_ipv6(request.traddr) self.logger.info(f"Received request to create {request.gateway_name}" f" {request.trtype} listener for {request.nqn} at" - f" {request.traddr}:{request.trsvcid}.") - try: - if request.gateway_name == self.gateway_name: - ret = rpc_nvmf.nvmf_subsystem_add_listener( - self.spdk_rpc_client, - nqn=request.nqn, - trtype=request.trtype, - traddr=request.traddr, - trsvcid=request.trsvcid, - adrfam=request.adrfam, - ) - self.logger.info(f"create_listener: {ret}") - else: - raise Exception(f"Gateway name must match current gateway" - f" ({self.gateway_name})") - except Exception as ex: - self.logger.error(f"create_listener failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + f" {traddr}:{request.trsvcid}., context: {context}") - if context: - # Update gateway state + if self.is_discovery_nqn(request.nqn): + raise Exception(f"Can't create a listener for a discovery subsystem") + + with self.omap_lock(context=context): try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_listener(request.nqn, - request.gateway_name, - request.trtype, request.traddr, - request.trsvcid, json_req) + if request.gateway_name == self.gateway_name: + listener_already_exist = self.matching_listener_exists( + context, request.nqn, request.gateway_name, request.trtype, request.traddr, request.trsvcid) + if listener_already_exist: + self.logger.error(f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}") + req = {"nqn": request.nqn, "trtype": request.trtype, "traddr": request.traddr, + "gateway_name": request.gateway_name, + "trsvcid": request.trsvcid, "adrfam": request.adrfam, + "method": "nvmf_subsystem_add_listener", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + raise Exception(msg) + ret = rpc_nvmf.nvmf_subsystem_add_listener( + self.spdk_rpc_client, + nqn=request.nqn, + trtype=request.trtype, + traddr=request.traddr, + trsvcid=request.trsvcid, + adrfam=request.adrfam, + ) + self.logger.info(f"create_listener: {ret}") + else: + raise Exception(f"Gateway name must match current gateway" + f" ({self.gateway_name})") except Exception as ex: - self.logger.error( - f"Error persisting add_listener {request.trsvcid}: {ex}") - raise + self.logger.error(f"create_listener failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + state = self.gateway_state.local.get_state() + enable_ha = False + subsys_str = state.get(GatewayState.build_subsystem_key(request.nqn)) + if subsys_str: + self.logger.debug(f"value of sub-system: {subsys_str}") + try: + subsys_dict = json.loads(subsys_str) + try: + enable_ha = subsys_dict["enable_ha"] + except KeyError: + enable_ha = False + self.logger.info(f"enable_ha: {enable_ha}") + except Exception as ex: + self.logger.error(f"Got exception trying to parse subsystem {request.nqn}: {ex}") + pass + else: + self.logger.info(f"No subsystem for {request.nqn}") + + if enable_ha: + for x in range (MAX_ANA_GROUPS): + try: + ret = rpc_nvmf.nvmf_subsystem_listener_set_ana_state( + self.spdk_rpc_client, + nqn=request.nqn, + ana_state="inaccessible", + trtype=request.trtype, + traddr=request.traddr, + trsvcid=request.trsvcid, + adrfam=request.adrfam, + anagrpid=(x+1) ) + except Exception as ex: + self.logger.error(f" set_listener_ana_state failed with: \n {ex}") + raise + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_listener(request.nqn, + request.gateway_name, + request.trtype, request.traddr, + request.trsvcid, json_req) + except Exception as ex: + self.logger.error( + f"Error persisting add_listener {request.trsvcid}: {ex}") + raise return pb2.req_status(status=ret) - def delete_listener(self, request, context=None): + def create_listener(self, request, context=None): + return self.execute_grpc_function(self.create_listener_safe, request, context) + + def delete_listener_safe(self, request, context=None): """Deletes a listener from a subsystem at a given IP/Port.""" ret = True + traddr = GatewayConfig.escape_address_if_ipv6(request.traddr) self.logger.info(f"Received request to delete {request.gateway_name}" f" {request.trtype} listener for {request.nqn} at" - f" {request.traddr}:{request.trsvcid}.") - try: - if request.gateway_name == self.gateway_name: - ret = rpc_nvmf.nvmf_subsystem_remove_listener( - self.spdk_rpc_client, - nqn=request.nqn, - trtype=request.trtype, - traddr=request.traddr, - trsvcid=request.trsvcid, - adrfam=request.adrfam, - ) - self.logger.info(f"delete_listener: {ret}") - else: - raise Exception(f"Gateway name must match current gateway" - f" ({self.gateway_name})") - except Exception as ex: - self.logger.error(f"delete_listener failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + f" {traddr}:{request.trsvcid}., context: {context}") - if context: - # Update gateway state + if self.is_discovery_nqn(request.nqn): + raise Exception(f"Can't delete a listener from a discovery subsystem") + + with self.omap_lock(context=context): try: - self.gateway_state.remove_listener(request.nqn, - request.gateway_name, - request.trtype, - request.traddr, - request.trsvcid) + if request.gateway_name == self.gateway_name: + ret = rpc_nvmf.nvmf_subsystem_remove_listener( + self.spdk_rpc_client, + nqn=request.nqn, + trtype=request.trtype, + traddr=request.traddr, + trsvcid=request.trsvcid, + adrfam=request.adrfam, + ) + self.logger.info(f"delete_listener: {ret}") + else: + raise Exception(f"Gateway name must match current gateway" + f" ({self.gateway_name})") except Exception as ex: - self.logger.error( - f"Error persisting delete_listener {request.trsvcid}: {ex}") - raise + self.logger.error(f"delete_listener failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_listener(request.nqn, + request.gateway_name, + request.trtype, + request.traddr, + request.trsvcid) + except Exception as ex: + self.logger.error( + f"Error persisting delete_listener {request.trsvcid}: {ex}") + raise return pb2.req_status(status=ret) - def get_subsystems(self, request, context): + def delete_listener(self, request, context=None): + return self.execute_grpc_function(self.delete_listener_safe, request, context) + + def get_subsystems_safe(self, request, context): """Gets subsystems.""" - self.logger.info(f"Received request to get subsystems") + self.logger.info(f"Received request to get subsystems, context: {context}") try: ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client) self.logger.info(f"get_subsystems: {ret}") @@ -513,6 +777,10 @@ def get_subsystems(self, request, context): return pb2.subsystems_info(subsystems=json.dumps(ret)) + def get_subsystems(self, request, context): + with self.rpc_lock: + return self.get_subsystems_safe(request, context) + def get_spdk_nvmf_log_flags_and_level(self, request, context): """Gets spdk nvmf log flags, log level and log print level""" self.logger.info(f"Received request to get SPDK nvmf log flags and level") @@ -580,4 +848,4 @@ def disable_spdk_nvmf_logs(self, request, context): context.set_details(f"{ex}") return pb2.req_status() - return pb2.req_status(status=all(ret)) + return pb2.req_status(status=all(ret)) \ No newline at end of file diff --git a/control/proto/gateway.proto b/control/proto/gateway.proto index 951cd0de..418d7e15 100644 --- a/control/proto/gateway.proto +++ b/control/proto/gateway.proto @@ -24,7 +24,7 @@ service Gateway { rpc delete_subsystem(delete_subsystem_req) returns(req_status) {} // Adds a namespace to a subsystem - rpc add_namespace(add_namespace_req) returns(nsid) {} + rpc add_namespace(add_namespace_req) returns(nsid_status) {} // Removes a namespace from a subsystem rpc remove_namespace(remove_namespace_req) returns(req_status) {} @@ -73,6 +73,8 @@ message create_subsystem_req { string subsystem_nqn = 1; string serial_number = 2; int32 max_namespaces = 3; + bool ana_reporting = 4; + bool enable_ha = 5; } message delete_subsystem_req { @@ -83,6 +85,7 @@ message add_namespace_req { string subsystem_nqn = 1; string bdev_name = 2; optional int32 nsid = 3; + optional int32 anagrpid = 4; } message remove_namespace_req { @@ -144,7 +147,7 @@ message req_status { bool status = 1; } -message nsid { +message nsid_status { int32 nsid = 1; bool status = 2; } diff --git a/control/server.py b/control/server.py index bc740fc7..7675c2e8 100644 --- a/control/server.py +++ b/control/server.py @@ -16,7 +16,6 @@ import json import logging import signal -import traceback from concurrent import futures from google.protobuf import json_format @@ -26,8 +25,10 @@ from .proto import gateway_pb2 as pb2 from .proto import gateway_pb2_grpc as pb2_grpc -from .state import GatewayState, LocalGatewayState, OmapGatewayState, GatewayStateHandler +from .state import GatewayState, LocalGatewayState, OmapLock, OmapGatewayState, GatewayStateHandler from .grpc import GatewayService +from .discovery import DiscoveryService +from .config import GatewayConfig def sigchld_handler(signum, frame): """Handle SIGCHLD, runs when a spdk process terminates.""" @@ -43,7 +44,7 @@ def sigchld_handler(signum, frame): exit_code = os.waitstatus_to_exitcode(wait_status) # GW process should exit now - raise SystemExit(f"spdk subprocess terminated {pid=} {exit_code=}") + raise SystemExit(f"Gateway subprocess terminated {pid=} {exit_code=}") class GatewayServer: """Runs SPDK and receives client requests for the gateway service. @@ -56,6 +57,7 @@ class GatewayServer: spdk_rpc_client: Client of SPDK RPC server spdk_rpc_ping_client: Ping client of SPDK RPC server spdk_process: Subprocess running SPDK NVMEoF target application + discovery_pid: Subprocess running Ceph nvmeof discovery service """ def __init__(self, config): @@ -64,6 +66,8 @@ def __init__(self, config): self.spdk_process = None self.gateway_rpc = None self.server = None + self.discovery_pid = None + self.spdk_rpc_socket_path = None self.name = self.config.get("gateway", "name") if not self.name: @@ -79,6 +83,7 @@ def __exit__(self, exc_type, exc_value, traceback): if exc_type is not None: self.logger.exception("GatewayServer exception occurred:") + signal.signal(signal.SIGCHLD, signal.SIG_IGN) if self.spdk_process is not None: self._stop_spdk() @@ -86,22 +91,31 @@ def __exit__(self, exc_type, exc_value, traceback): self.logger.info("Stopping the server...") self.server.stop(None) + if self.discovery_pid: + self._stop_discovery() + self.logger.info("Exiting the gateway process.") def serve(self): """Starts gateway server.""" self.logger.debug("Starting serve") + omap_state = OmapGatewayState(self.config) + local_state = LocalGatewayState() + + # install SIGCHLD handler + signal.signal(signal.SIGCHLD, sigchld_handler) + # Start SPDK - self._start_spdk() + self._start_spdk(omap_state) + + # Start discovery service + self._start_discovery_service() # Register service implementation with server - omap_state = OmapGatewayState(self.config) - local_state = LocalGatewayState() - gateway_state = GatewayStateHandler(self.config, local_state, - omap_state, self.gateway_rpc_caller) - self.gateway_rpc = GatewayService(self.config, gateway_state, - self.spdk_rpc_client) + gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller) + omap_lock = OmapLock(omap_state, gateway_state) + self.gateway_rpc = GatewayService(self.config, gateway_state, omap_lock, self.spdk_rpc_client) self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server) @@ -113,13 +127,30 @@ def serve(self): # Start server self.server.start() - enable_discovery_controller = self.config.getboolean_with_default("gateway", "enable_discovery_controller", False) - if not enable_discovery_controller: - try: - rpc_nvmf.nvmf_delete_subsystem(self.spdk_rpc_ping_client, "nqn.2014-08.org.nvmexpress.discovery") - except Exception as ex: - self.logger.error(f" Delete Discovery subsystem returned with error: \n {ex}") - raise + + + def _start_discovery_service(self): + """Runs either SPDK on CEPH NVMEOF Discovery Service.""" + enable_spdk_discovery_controller = self.config.getboolean_with_default("gateway", "enable_spdk_discovery_controller", False) + if enable_spdk_discovery_controller: + self.logger.info("Using SPDK discovery service") + return + + try: + rpc_nvmf.nvmf_delete_subsystem(self.spdk_rpc_ping_client, DiscoveryService.DISCOVERY_NQN) + except Exception as ex: + self.logger.error(f" Delete Discovery subsystem returned with error: \n {ex}") + raise + + # run ceph nvmeof discovery service in sub-process + assert self.discovery_pid is None + self.discovery_pid = os.fork() + if self.discovery_pid == 0: + self.logger.info("Starting ceph nvmeof discovery service") + DiscoveryService(self.config).start_service() + os._exit(0) + else: + self.logger.info(f"Discovery service process id: {self.discovery_pid}") def _add_server_listener(self): """Adds listener port to server.""" @@ -127,6 +158,8 @@ def _add_server_listener(self): enable_auth = self.config.getboolean("gateway", "enable_auth") gateway_addr = self.config.get("gateway", "addr") gateway_port = self.config.get("gateway", "port") + # We need to enclose IPv6 addresses in brackets before concatenating a colon and port number to it + gateway_addr = GatewayConfig.escape_address_if_ipv6(gateway_addr) if enable_auth: # Read in key and certificates for authentication server_key = self.config.get("mtls", "server_key") @@ -154,25 +187,42 @@ def _add_server_listener(self): self.server.add_insecure_port("{}:{}".format( gateway_addr, gateway_port)) - def _start_spdk(self): + def _get_spdk_rpc_socket_path(self, omap_state) -> str: + # For backward compatibility, try first to get the old attribute + spdk_rpc_socket = self.config.get_with_default("spdk", "rpc_socket", "") + if spdk_rpc_socket: + return spdk_rpc_socket + + spdk_rpc_socket_dir = self.config.get_with_default("spdk", "rpc_socket_dir", "") + if not spdk_rpc_socket_dir: + spdk_rpc_socket_dir = "/var/run/ceph/" + if omap_state.ceph_fsid: + spdk_rpc_socket_dir += omap_state.ceph_fsid + "/" + if not spdk_rpc_socket_dir.endswith("/"): + spdk_rpc_socket_dir += "/" + try: + os.makedirs(spdk_rpc_socket_dir, 0o777, True) + except Exception: + pass + spdk_rpc_socket = spdk_rpc_socket_dir + self.config.get_with_default("spdk", "rpc_socket_name", "spdk.sock") + return spdk_rpc_socket + + def _start_spdk(self, omap_state): """Starts SPDK process.""" # Start target self.logger.debug(f"Configuring server {self.name}") spdk_tgt_path = self.config.get("spdk", "tgt_path") self.logger.info(f"SPDK Target Path: {spdk_tgt_path}") - spdk_rpc_socket = self.config.get("spdk", "rpc_socket") - self.logger.info(f"SPDK Socket: {spdk_rpc_socket}") + self.spdk_rpc_socket_path = self._get_spdk_rpc_socket_path(omap_state) + self.logger.info(f"SPDK Socket: {self.spdk_rpc_socket_path}") spdk_tgt_cmd_extra_args = self.config.get_with_default( "spdk", "tgt_cmd_extra_args", "") - cmd = [spdk_tgt_path, "-u", "-r", spdk_rpc_socket] + cmd = [spdk_tgt_path, "-u", "-r", self.spdk_rpc_socket_path] if spdk_tgt_cmd_extra_args: cmd += shlex.split(spdk_tgt_cmd_extra_args) self.logger.info(f"Starting {' '.join(cmd)}") try: - # install SIGCHLD handler - signal.signal(signal.SIGCHLD, sigchld_handler) - # start spdk process self.spdk_process = subprocess.Popen(cmd) except Exception as ex: @@ -184,20 +234,21 @@ def _start_spdk(self): log_level = self.config.get("spdk", "log_level") # connect timeout: spdk client retries 5 times per sec conn_retries = int(timeout * 5) + self.logger.info(f"SPDK process id: {self.spdk_process.pid}") self.logger.info( - f"Attempting to initialize SPDK: rpc_socket: {spdk_rpc_socket}," + f"Attempting to initialize SPDK: rpc_socket: {self.spdk_rpc_socket_path}," f" conn_retries: {conn_retries}, timeout: {timeout}" ) try: self.spdk_rpc_client = rpc_client.JSONRPCClient( - spdk_rpc_socket, + self.spdk_rpc_socket_path, None, timeout, log_level=log_level, conn_retries=conn_retries, ) self.spdk_rpc_ping_client = rpc_client.JSONRPCClient( - spdk_rpc_socket, + self.spdk_rpc_socket_path, None, timeout, log_level=log_level, @@ -218,10 +269,8 @@ def _stop_spdk(self): assert self.spdk_process is not None # should be verified by the caller return_code = self.spdk_process.returncode - rpc_socket = self.config.get("spdk", "rpc_socket") # Terminate spdk process - signal.signal(signal.SIGCHLD, signal.SIG_DFL) if return_code is not None: self.logger.error(f"SPDK({self.name}) pid {self.spdk_process.pid} " f"already terminated, exit code: {return_code}") @@ -238,12 +287,27 @@ def _stop_spdk(self): self.spdk_process.kill() # kill -9, send KILL signal # Clean spdk rpc socket - if os.path.exists(rpc_socket): + if self.spdk_rpc_socket_path and os.path.exists(self.spdk_rpc_socket_path): try: - os.remove(rpc_socket) + os.remove(self.spdk_rpc_socket_path) except Exception: self.logger.exception(f"An error occurred while removing " - f"rpc socket {rpc_socket}:") + f"rpc socket {self.spdk_rpc_socket_path}:") + + def _stop_discovery(self): + """Stops Discovery service process.""" + assert self.discovery_pid is not None # should be verified by the caller + + self.logger.info("Terminating discovery service...") + # discovery service selector loop should exit due to KeyboardInterrupt exception + try: + os.kill(self.discovery_pid, signal.SIGINT) + os.waitpid(self.discovery_pid, 0) + except ChildProcessError: + pass # ignore + self.logger.info("Discovery service terminated") + + self.discovery_pid = None def _create_transport(self, trtype): """Initializes a transport type.""" diff --git a/control/state.py b/control/state.py index b99664c3..36d8c0ce 100644 --- a/control/state.py +++ b/control/state.py @@ -11,6 +11,7 @@ import threading import rados import logging +import errno from typing import Dict from collections import defaultdict from abc import ABC, abstractmethod @@ -29,6 +30,30 @@ class GatewayState(ABC): HOST_PREFIX = "host_" LISTENER_PREFIX = "listener_" + def build_bdev_key(bdev_name: str) -> str: + return GatewayState.BDEV_PREFIX + bdev_name + + def build_namespace_key(subsystem_nqn: str, nsid) -> str: + key = GatewayState.NAMESPACE_PREFIX + subsystem_nqn + if nsid is not None: + key = key + "_" + nsid + return key + + def build_subsystem_key(subsystem_nqn: str) -> str: + return GatewayState.SUBSYSTEM_PREFIX + subsystem_nqn + + def build_host_key(subsystem_nqn: str, host_nqn) -> str: + key = GatewayState.HOST_PREFIX + subsystem_nqn + if host_nqn is not None: + key = key + "_" + host_nqn + return key + + def build_partial_listener_key(subsystem_nqn: str) -> str: + return GatewayState.LISTENER_PREFIX + subsystem_nqn + + def build_listener_key(subsystem_nqn: str, gateway: str, trtype: str, traddr: str, trsvcid: str) -> str: + return GatewayState.build_partial_listener_key(subsystem_nqn) + "_" + gateway + "_" + trtype + "_" + traddr + "_" + trsvcid + @abstractmethod def get_state(self) -> Dict[str, str]: """Returns the state dictionary.""" @@ -46,64 +71,62 @@ def _remove_key(self, key: str): def add_bdev(self, bdev_name: str, val: str): """Adds a bdev to the state data store.""" - key = self.BDEV_PREFIX + bdev_name + key = GatewayState.build_bdev_key(bdev_name) self._add_key(key, val) def remove_bdev(self, bdev_name: str): """Removes a bdev from the state data store.""" - key = self.BDEV_PREFIX + bdev_name + key = GatewayState.build_bdev_key(bdev_name) self._remove_key(key) def add_namespace(self, subsystem_nqn: str, nsid: str, val: str): """Adds a namespace to the state data store.""" - key = self.NAMESPACE_PREFIX + subsystem_nqn + "_" + nsid + key = GatewayState.build_namespace_key(subsystem_nqn, nsid) self._add_key(key, val) def remove_namespace(self, subsystem_nqn: str, nsid: str): """Removes a namespace from the state data store.""" - key = self.NAMESPACE_PREFIX + subsystem_nqn + "_" + nsid + key = GatewayState.build_namespace_key(subsystem_nqn, nsid) self._remove_key(key) def add_subsystem(self, subsystem_nqn: str, val: str): """Adds a subsystem to the state data store.""" - key = self.SUBSYSTEM_PREFIX + subsystem_nqn + key = GatewayState.build_subsystem_key(subsystem_nqn) self._add_key(key, val) def remove_subsystem(self, subsystem_nqn: str): """Removes a subsystem from the state data store.""" - key = self.SUBSYSTEM_PREFIX + subsystem_nqn + key = GatewayState.build_subsystem_key(subsystem_nqn) self._remove_key(key) # Delete all keys related to subsystem state = self.get_state() for key in state.keys(): - if (key.startswith(self.NAMESPACE_PREFIX + subsystem_nqn) or - key.startswith(self.HOST_PREFIX + subsystem_nqn) or - key.startswith(self.LISTENER_PREFIX + subsystem_nqn)): + if (key.startswith(GatewayState.build_namespace_key(subsystem_nqn, None)) or + key.startswith(GatewayState.build_host_key(subsystem_nqn, None)) or + key.startswith(GatewayState.build_partial_listener_key(subsystem_nqn))): self._remove_key(key) def add_host(self, subsystem_nqn: str, host_nqn: str, val: str): """Adds a host to the state data store.""" - key = "{}{}_{}".format(self.HOST_PREFIX, subsystem_nqn, host_nqn) + key = GatewayState.build_host_key(subsystem_nqn, host_nqn) self._add_key(key, val) def remove_host(self, subsystem_nqn: str, host_nqn: str): """Removes a host from the state data store.""" - key = "{}{}_{}".format(self.HOST_PREFIX, subsystem_nqn, host_nqn) + key = GatewayState.build_host_key(subsystem_nqn, host_nqn) self._remove_key(key) def add_listener(self, subsystem_nqn: str, gateway: str, trtype: str, traddr: str, trsvcid: str, val: str): """Adds a listener to the state data store.""" - key = "{}{}_{}_{}_{}_{}".format(self.LISTENER_PREFIX, subsystem_nqn, - gateway, trtype, traddr, trsvcid) + key = GatewayState.build_listener_key(subsystem_nqn, gateway, trtype, traddr, trsvcid) self._add_key(key, val) def remove_listener(self, subsystem_nqn: str, gateway: str, trtype: str, traddr: str, trsvcid: str): """Removes a listener from the state data store.""" - key = "{}{}_{}_{}_{}_{}".format(self.LISTENER_PREFIX, subsystem_nqn, - gateway, trtype, traddr, trsvcid) + key = GatewayState.build_listener_key(subsystem_nqn, gateway, trtype, traddr, trsvcid) self._remove_key(key) @abstractmethod @@ -143,6 +166,124 @@ def reset(self, omap_state): self.state = omap_state +class OmapLock: + OMAP_FILE_LOCK_NAME = "omap_file_lock" + OMAP_FILE_LOCK_COOKIE = "omap_file_cookie" + + def __init__(self, omap_state, gateway_state) -> None: + self.logger = omap_state.logger + self.omap_state = omap_state + self.gateway_state = gateway_state + self.omap_file_lock_duration = self.omap_state.config.getint_with_default("gateway", "omap_file_lock_duration", 60) + self.omap_file_update_reloads = self.omap_state.config.getint_with_default("gateway", "omap_file_update_reloads", 10) + self.omap_file_lock_retries = self.omap_state.config.getint_with_default("gateway", "omap_file_lock_retries", 15) + self.omap_file_lock_retry_sleep_interval = self.omap_state.config.getint_with_default("gateway", + "omap_file_lock_retry_sleep_interval", 5) + # This is used for testing purposes only. To allow us testing locking from two gateways at the same time + self.omap_file_disable_unlock = self.omap_state.config.getboolean_with_default("gateway", "omap_file_disable_unlock", False) + if self.omap_file_disable_unlock: + self.logger.warning(f"Will not unlock OMAP file for testing purposes") + self.enter_args = {} + + def __call__(self, **kwargs): + self.enter_args.clear() + self.enter_args.update(kwargs) + return self + + # + # We pass the context from the different functions here. It should point to a real object in case we come from a real + # resource changing function, resulting from a CLI command. It will be None in case we come from an automatic update + # which is done because the local state is out of date. In case context is None, that is we're in the middle of an update + # we should not try to lock the OMAP file as the code will not try to make changes there, only the local spdk calls + # are done in such a case. + # + def __enter__(self): + context = self.enter_args.get("context") + if context and self.omap_file_lock_duration > 0: + self.lock_omap() + return self + + def __exit__(self, typ, value, traceback): + context = self.enter_args.get("context") + self.enter_args.clear() + if context and self.omap_file_lock_duration > 0: + self.unlock_omap() + + # + # This function accepts a function in which there is Omap locking. It will execute this function + # and in case the Omap is not current, will reload it and try again + # + def execute_omap_locking_function(self, grpc_func, omap_locking_func, request, context): + for i in range(1, self.omap_file_update_reloads): + need_to_update = False + try: + return grpc_func(omap_locking_func, request, context) + except OSError as err: + if err.errno == errno.EAGAIN: + need_to_update = True + else: + raise + + assert need_to_update + for j in range(10): + if self.gateway_state.update(): + # update was succesful, we can stop trying + break + time.sleep(1) + + if need_to_update: + raise Exception(f"Unable to lock OMAP file after reloading {self.omap_file_update_reloads} times, exiting") + + def lock_omap(self): + got_lock = False + + for i in range(1, self.omap_file_lock_retries): + try: + self.omap_state.ioctx.lock_exclusive(self.omap_state.omap_name, self.OMAP_FILE_LOCK_NAME, + self.OMAP_FILE_LOCK_COOKIE, "OMAP file changes lock", self.omap_file_lock_duration, 0) + got_lock = True + if i > 1: + self.logger.info(f"Succeeded to lock OMAP file after {i} tries") + break + except rados.ObjectExists as ex: + self.logger.info(f"We already locked the OMAP file") + got_lock = True + break + except rados.ObjectBusy as ex: + self.logger.warning( + f"The OMAP file is locked, will try again in {self.omap_file_lock_retry_sleep_interval} seconds") + time.sleep(self.omap_file_lock_retry_sleep_interval) + except Exception as ex: + self.logger.error(f"Unable to lock OMAP file, exiting: {ex}") + raise + + if not got_lock: + self.logger.error(f"Unable to lock OMAP file after {self.omap_file_lock_retries} tries. Exiting!") + raise Exception("Unable to lock OMAP file") + + omap_version = self.omap_state.get_omap_version() + local_version = self.omap_state.get_local_version() + + if omap_version > local_version: + self.logger.warning( + f"Local version {local_version} differs from OMAP file version {omap_version}." + f" The file is not current, will reload it and try again") + self.unlock_omap() + raise OSError(errno.EAGAIN, "Unable to lock OMAP file, file not current", self.omap_state.omap_name) + + def unlock_omap(self): + if self.omap_file_disable_unlock: + self.logger.warning(f"OMAP file unlock was disabled, will not unlock file") + return + + try: + self.omap_state.ioctx.unlock(self.omap_state.omap_name, self.OMAP_FILE_LOCK_NAME, self.OMAP_FILE_LOCK_COOKIE) + except rados.ObjectNotFound as ex: + self.logger.warning(f"No such lock, the lock duration might have passed") + except Exception as ex: + self.logger.error(f"Unable to unlock OMAP file: {ex}") + pass + class OmapGatewayState(GatewayState): """Persists gateway NVMeoF target state to an OMAP object. @@ -170,14 +311,10 @@ def __init__(self, config): self.watch = None gateway_group = self.config.get("gateway", "group") self.omap_name = f"nvmeof.{gateway_group}.state" if gateway_group else "nvmeof.state" - ceph_pool = self.config.get("ceph", "pool") - ceph_conf = self.config.get("ceph", "config_file") - rados_id = self.config.get_with_default("ceph", "id", "") + self.ceph_fsid = None try: - conn = rados.Rados(conffile=ceph_conf, rados_id=rados_id) - conn.connect() - self.ioctx = conn.open_ioctx(ceph_pool) + self.ioctx = self.open_rados_connection(self.config) # Create a new gateway persistence OMAP object with rados.WriteOpCtx() as write_op: # Set exclusive parameter to fail write_op if object exists @@ -198,6 +335,37 @@ def __exit__(self, exc_type, exc_value, traceback): self.watch.close() self.ioctx.close() + def fetch_and_display_ceph_version(self, conn): + try: + rply = conn.mon_command('{"prefix":"mon versions"}', b'') + ceph_ver = rply[1].decode().removeprefix("{").strip().split(":")[0].removeprefix('"').removesuffix('"') + ceph_ver = ceph_ver.removeprefix("ceph version ") + self.logger.info(f"Connected to Ceph with version \"{ceph_ver}\"") + except Exception as ex: + self.logger.debug(f"Got exception trying to fetch Ceph version: {ex}") + pass + + def fetch_ceph_fsid(self, conn) -> str: + fsid = None + try: + fsid = conn.get_fsid() + except Exception as ex: + self.logger.debug(f"Got exception trying to fetch Ceph fsid: {ex}") + pass + + return fsid + + def open_rados_connection(self, config): + ceph_pool = config.get("ceph", "pool") + ceph_conf = config.get("ceph", "config_file") + rados_id = config.get_with_default("ceph", "id", "") + conn = rados.Rados(conffile=ceph_conf, rados_id=rados_id) + conn.connect() + self.fetch_and_display_ceph_version(conn) + self.ceph_fsid = self.fetch_ceph_fsid(conn) + ioctx = conn.open_ioctx(ceph_pool) + return ioctx + def get_local_version(self) -> int: """Returns local version.""" return self.version @@ -224,10 +392,16 @@ def get_omap_version(self) -> int: def get_state(self) -> Dict[str, str]: """Returns dict of all OMAP keys and values.""" - with rados.ReadOpCtx() as read_op: - i, _ = self.ioctx.get_omap_vals(read_op, "", "", -1) - self.ioctx.operate_read_op(read_op, self.omap_name) - omap_dict = dict(i) + omap_list = [("", 0)] # Dummy, non empty, list value. Just so we would enter the while + omap_dict = {} + # The number of items returned is limited by Ceph, so we need to read in a loop until no more items are returned + while len(omap_list) > 0: + last_key_read = omap_list[-1][0] + with rados.ReadOpCtx() as read_op: + i, _ = self.ioctx.get_omap_vals(read_op, last_key_read, "", -1) + self.ioctx.operate_read_op(read_op, self.omap_name) + omap_list = list(i) + omap_dict.update(dict(omap_list)) return omap_dict def _add_key(self, key: str, val: str): @@ -335,6 +509,7 @@ def __init__(self, config, local, omap, gateway_rpc_caller): self.update_interval = 1 self.use_notify = self.config.getboolean("gateway", "state_update_notify") + self.update_is_active_lock = threading.Lock() def add_bdev(self, bdev_name: str, val: str): """Adds a bdev to the state data stores.""" @@ -421,53 +596,67 @@ def _update_caller(self, notify_event): notify_event.wait(max(update_time - time.time(), 0)) notify_event.clear() - def update(self): + def compare_state_values(self, val1, val2) -> bool: + # We sometimes get one value as type bytes and the other as type str, so convert them both to str for the comparison + val1_str = val1.decode() if type(val1) == type(b'') else val1 + val2_str = val2.decode() if type(val2) == type(b'') else val2 + return val1_str == val2_str + + def update(self) -> bool: """Checks for updated omap state and initiates local update.""" - prefix_list = [ - GatewayState.BDEV_PREFIX, GatewayState.SUBSYSTEM_PREFIX, - GatewayState.NAMESPACE_PREFIX, GatewayState.HOST_PREFIX, - GatewayState.LISTENER_PREFIX - ] - - # Get version and state from OMAP - omap_state_dict = self.omap.get_state() - omap_version = int(omap_state_dict[self.omap.OMAP_VERSION_KEY]) - - if self.omap.get_local_version() < omap_version: - local_state_dict = self.local.get_state() - local_state_keys = local_state_dict.keys() - omap_state_keys = omap_state_dict.keys() - - # Find OMAP additions - added_keys = omap_state_keys - local_state_keys - added = {key: omap_state_dict[key] for key in added_keys} - grouped_added = self._group_by_prefix(added, prefix_list) - # Find OMAP changes - same_keys = omap_state_keys & local_state_keys - changed = { - key: omap_state_dict[key] - for key in same_keys - if omap_state_dict[key] != local_state_dict[key] - } - grouped_changed = self._group_by_prefix(changed, prefix_list) - # Find OMAP removals - removed_keys = local_state_keys - omap_state_keys - removed = {key: local_state_dict[key] for key in removed_keys} - grouped_removed = self._group_by_prefix(removed, prefix_list) - - # Handle OMAP removals and remove outdated changed components - grouped_removed.update(grouped_changed) - if grouped_removed: - self._update_call_rpc(grouped_removed, False, prefix_list) - # Handle OMAP additions and add updated changed components - grouped_added.update(grouped_changed) - if grouped_added: - self._update_call_rpc(grouped_added, True, prefix_list) - - # Update local state and version - self.local.reset(omap_state_dict) - self.omap.set_local_version(omap_version) - self.logger.debug("Update complete.") + + if self.update_is_active_lock.locked(): + self.logger.warning(f"An update is already running, ignore") + return False + + with self.update_is_active_lock: + prefix_list = [ + GatewayState.BDEV_PREFIX, GatewayState.SUBSYSTEM_PREFIX, + GatewayState.NAMESPACE_PREFIX, GatewayState.HOST_PREFIX, + GatewayState.LISTENER_PREFIX + ] + + # Get version and state from OMAP + omap_state_dict = self.omap.get_state() + omap_version = int(omap_state_dict[self.omap.OMAP_VERSION_KEY]) + + if self.omap.get_local_version() < omap_version: + local_state_dict = self.local.get_state() + local_state_keys = local_state_dict.keys() + omap_state_keys = omap_state_dict.keys() + + # Find OMAP additions + added_keys = omap_state_keys - local_state_keys + added = {key: omap_state_dict[key] for key in added_keys} + grouped_added = self._group_by_prefix(added, prefix_list) + # Find OMAP changes + same_keys = omap_state_keys & local_state_keys + changed = { + key: omap_state_dict[key] + for key in same_keys + if not self.compare_state_values(local_state_dict[key], omap_state_dict[key]) + } + grouped_changed = self._group_by_prefix(changed, prefix_list) + # Find OMAP removals + removed_keys = local_state_keys - omap_state_keys + removed = {key: local_state_dict[key] for key in removed_keys} + grouped_removed = self._group_by_prefix(removed, prefix_list) + + # Handle OMAP removals and remove outdated changed components + grouped_removed.update(grouped_changed) + if grouped_removed: + self._update_call_rpc(grouped_removed, False, prefix_list) + # Handle OMAP additions and add updated changed components + grouped_added.update(grouped_changed) + if grouped_added: + self._update_call_rpc(grouped_added, True, prefix_list) + + # Update local state and version + self.local.reset(omap_state_dict) + self.omap.set_local_version(omap_version) + self.logger.debug("Update complete.") + + return True def _group_by_prefix(self, state_update, prefix_list): """Groups state update by key prefixes.""" diff --git a/docker-compose.yaml b/docker-compose.yaml index 2a6828c2..1781ecde 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -19,6 +19,7 @@ services: SPDK_DESCRIPTION: SPDK_URL: SPDK_MAINTAINER: $MAINTAINER + SPDK_DISABLE_VPCLMULQDQ: BUILD_DATE: SPDK_GIT_REPO: SPDK_GIT_BRANCH: @@ -78,6 +79,7 @@ services: networks: default: ipv4_address: 192.168.13.2 + ipv6_address: 2001:db8::2 nvmeof-base: build: context: . @@ -97,6 +99,8 @@ services: NVMEOF_GIT_REPO: NVMEOF_GIT_BRANCH: NVMEOF_GIT_COMMIT: + NVMEOF_GIT_MODIFIED_FILES: + NVMEOF_CEPH_VERSION: labels: io.ceph.nvmeof: volumes: @@ -211,6 +215,8 @@ volumes: ceph-conf: networks: default: + enable_ipv6: true ipam: config: - subnet: 192.168.13.0/24 + - subnet: 2001:0DB8::/112 diff --git a/mk/containerized.mk b/mk/containerized.mk index 2969fa4e..1960dd80 100644 --- a/mk/containerized.mk +++ b/mk/containerized.mk @@ -22,9 +22,13 @@ build: DOCKER_COMPOSE_ENV = DOCKER_BUILDKIT=1 COMPOSE_DOCKER_CLI_BUILD=1 GIT_LATEST_TAG != git describe --tags --abbrev=0 push: ## Push nvmeof and nvmeof-cli containers images to quay.io registries docker tag $(QUAY_NVMEOF):$(VERSION) $(QUAY_NVMEOF):$(GIT_LATEST_TAG) - docker push $(QUAY_NVMEOF):$(GIT_LATEST_TAG) docker tag $(QUAY_NVMEOFCLI):$(VERSION) $(QUAY_NVMEOFCLI):$(GIT_LATEST_TAG) + docker tag $(QUAY_NVMEOF):$(VERSION) $(QUAY_NVMEOF):latest + docker tag $(QUAY_NVMEOFCLI):$(VERSION) $(QUAY_NVMEOFCLI):latest + docker push $(QUAY_NVMEOF):$(GIT_LATEST_TAG) docker push $(QUAY_NVMEOFCLI):$(GIT_LATEST_TAG) + docker push $(QUAY_NVMEOF):latest + docker push $(QUAY_NVMEOFCLI):latest run: ## Run command CMD inside SVC containers run: override OPTS += --rm diff --git a/mk/demo.mk b/mk/demo.mk index 4f2f9c80..c132aa3b 100644 --- a/mk/demo.mk +++ b/mk/demo.mk @@ -7,12 +7,15 @@ rbd: CMD = bash -c "rbd -p $(RBD_POOL) info $(RBD_IMAGE_NAME) || rbd -p $(RBD_PO # demo # the fist gateway in docker enviroment, hostname defaults to container id -demo: export NVMEOF_HOSTNAME != docker ps -q -f name=ceph-nvmeof_nvmeof_1 +demo: export NVMEOF_HOSTNAME != docker ps -q -f name=$(NVMEOF_CONTAINER_NAME) demo: rbd ## Expose RBD_IMAGE_NAME as NVMe-oF target $(NVMEOF_CLI) create_bdev --pool $(RBD_POOL) --image $(RBD_IMAGE_NAME) --bdev $(BDEV_NAME) + $(NVMEOF_CLI_IPV6) create_bdev --pool $(RBD_POOL) --image $(RBD_IMAGE_NAME) --bdev $(BDEV_NAME)_ipv6 $(NVMEOF_CLI) create_subsystem --subnqn $(NQN) $(NVMEOF_CLI) add_namespace --subnqn $(NQN) --bdev $(BDEV_NAME) + $(NVMEOF_CLI) add_namespace --subnqn $(NQN) --bdev $(BDEV_NAME)_ipv6 $(NVMEOF_CLI) create_listener --subnqn $(NQN) --gateway-name $(NVMEOF_HOSTNAME) --traddr $(NVMEOF_IP_ADDRESS) --trsvcid $(NVMEOF_IO_PORT) + $(NVMEOF_CLI_IPV6) create_listener --subnqn $(NQN) --gateway-name $(NVMEOF_HOSTNAME) --traddr '$(NVMEOF_IPV6_ADDRESS)' --trsvcid $(NVMEOF_IO_PORT) --adrfam IPV6 $(NVMEOF_CLI) add_host --subnqn $(NQN) --host "*" .PHONY: demo rbd diff --git a/mk/misc.mk b/mk/misc.mk index 021a8340..34766b3e 100644 --- a/mk/misc.mk +++ b/mk/misc.mk @@ -2,8 +2,9 @@ # nvmeof_cli NVMEOF_CLI = $(DOCKER_COMPOSE_ENV) $(DOCKER_COMPOSE) run --rm nvmeof-cli --server-address $(NVMEOF_IP_ADDRESS) --server-port $(NVMEOF_GW_PORT) +NVMEOF_CLI_IPV6 = $(DOCKER_COMPOSE_ENV) $(DOCKER_COMPOSE) run --rm nvmeof-cli --server-address $(NVMEOF_IPV6_ADDRESS) --server-port $(NVMEOF_GW_PORT) alias: ## Print bash alias command for the nvmeof-cli. Usage: "eval $(make alias)" - @echo alias nvmeof-cli=\"$(NVMEOF_CLI)\" + @echo alias nvmeof-cli=\"$(NVMEOF_CLI)\" \; alias nvmeof-cli-ipv6=\'$(NVMEOF_CLI_IPV6)\' .PHONY: alias diff --git a/pyproject.toml b/pyproject.toml index 74636de3..2b68f91c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "pdm.backend" [project] name = "ceph-nvmeof" -version = "0.0.3" +version = "0.0.5" description = "Service to provide Ceph storage over NVMe-oF protocol" readme = "README.md" requires-python = "~=3.9" diff --git a/spdk b/spdk index 7c8a4f2e..668268f7 160000 --- a/spdk +++ b/spdk @@ -1 +1 @@ -Subproject commit 7c8a4f2e10e8490c4a8af7d4b42587b15d0959d0 +Subproject commit 668268f74ea147f3343b9f8136df3e6fcc61f4cf diff --git a/tests/test_cli.py b/tests/test_cli.py index 853d75a6..45169b72 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -12,10 +12,15 @@ serial = "SPDK00000000000001" host_list = ["nqn.2016-06.io.spdk:host1", "*"] nsid = "1" +nsid_ipv6 = "2" +anagrpid = "2" trtype = "TCP" gateway_name = socket.gethostname() addr = "127.0.0.1" +addr_ipv6 = "::1" +server_addr_ipv6 = "2001:db8::3" listener_list = [["-g", gateway_name, "-a", addr, "-s", "5001"], ["-g", gateway_name, "-a", addr,"-s", "5002"]] +listener_list_ipv6 = [["-g", gateway_name, "-a", addr_ipv6, "-s", "5003"], ["-g", gateway_name, "-a", addr_ipv6, "-s", "5004"]] config = "ceph-nvmeof.conf" @pytest.fixture(scope="module") @@ -37,6 +42,10 @@ def test_get_subsystems(self, caplog, gateway): cli(["get_subsystems"]) assert "Failed to get" not in caplog.text + def test_get_subsystems_ipv6(self, caplog, gateway): + cli(["--server-address", server_addr_ipv6, "get_subsystems"]) + assert "Failed to get" not in caplog.text + class TestCreate: def test_create_bdev(self, caplog, gateway): @@ -45,13 +54,22 @@ def test_create_bdev(self, caplog, gateway): cli(["create_bdev", "-i", image, "-p", pool, "-b", bdev1]) assert "Failed to create" not in caplog.text + def test_create_bdev_ipv6(self, caplog, gateway): + cli(["--server-address", server_addr_ipv6, "create_bdev", "-i", image, "-p", pool, "-b", bdev + "_ipv6"]) + assert "Failed to create" not in caplog.text + cli(["--server-address", server_addr_ipv6, "create_bdev", "-i", image, "-p", pool, "-b", bdev1 + "_ipv6"]) + assert "Failed to create" not in caplog.text + def test_create_subsystem(self, caplog, gateway): cli(["create_subsystem", "-n", subsystem]) assert "Failed to create" not in caplog.text + assert "ana reporting: False" in caplog.text cli(["get_subsystems"]) assert serial not in caplog.text + caplog.clear() cli(["create_subsystem", "-n", subsystem2, "-s", serial]) assert "Failed to create" not in caplog.text + assert "ana reporting: False" in caplog.text cli(["get_subsystems"]) assert serial in caplog.text @@ -61,6 +79,12 @@ def test_add_namespace(self, caplog, gateway): cli(["add_namespace", "-n", subsystem, "-b", bdev1]) assert "Failed to add" not in caplog.text + def test_add_namespace_ipv6(self, caplog, gateway): + cli(["--server-address", server_addr_ipv6, "add_namespace", "-n", subsystem, "-b", bdev + "_ipv6"]) + assert "Failed to add" not in caplog.text + cli(["--server-address", server_addr_ipv6, "add_namespace", "-n", subsystem, "-b", bdev1 + "_ipv6"]) + assert "Failed to add" not in caplog.text + @pytest.mark.parametrize("host", host_list) def test_add_host(self, caplog, host): cli(["add_host", "-n", subsystem, "-t", host]) @@ -71,6 +95,11 @@ def test_create_listener(self, caplog, listener, gateway): cli(["create_listener", "-n", subsystem] + listener) assert "Failed to create" not in caplog.text + @pytest.mark.parametrize("listener_ipv6", listener_list_ipv6) + def test_create_listener_ipv6(self, caplog, listener_ipv6, gateway): + cli(["--server-address", server_addr_ipv6, "create_listener", "-n", subsystem, "--adrfam", "IPV6"] + listener_ipv6) + assert "Failed to create" not in caplog.text + class TestDelete: @pytest.mark.parametrize("host", host_list) @@ -83,18 +112,81 @@ def test_delete_listener(self, caplog, listener, gateway): cli(["delete_listener", "-n", subsystem] + listener) assert "Failed to delete" not in caplog.text + @pytest.mark.parametrize("listener_ipv6", listener_list_ipv6) + def test_delete_listener_ipv6(self, caplog, listener_ipv6, gateway): + cli(["--server-address", server_addr_ipv6, "delete_listener", "-n", subsystem, "--adrfam", "IPV6"] + listener_ipv6) + assert "Failed to delete" not in caplog.text + def test_remove_namespace(self, caplog, gateway): cli(["remove_namespace", "-n", subsystem, "-i", nsid]) assert "Failed to remove" not in caplog.text + cli(["remove_namespace", "-n", subsystem, "-i", nsid_ipv6]) + assert "Failed to remove" not in caplog.text def test_delete_bdev(self, caplog, gateway): cli(["delete_bdev", "-b", bdev, "-f"]) assert "Failed to delete" not in caplog.text cli(["delete_bdev", "-b", bdev1, "--force"]) assert "Failed to delete" not in caplog.text + cli(["delete_bdev", "-b", bdev + "_ipv6", "-f"]) + assert "Failed to delete" not in caplog.text + cli(["delete_bdev", "-b", bdev1 + "_ipv6", "--force"]) + assert "Failed to delete" not in caplog.text def test_delete_subsystem(self, caplog, gateway): cli(["delete_subsystem", "-n", subsystem]) assert "Failed to delete" not in caplog.text cli(["delete_subsystem", "-n", subsystem2]) assert "Failed to delete" not in caplog.text + + +class TestCreateWithAna: + def test_create_bdev_ana(self, caplog, gateway): + cli(["create_bdev", "-i", image, "-p", pool, "-b", bdev]) + assert "Failed to create" not in caplog.text + + def test_create_bdev_ana_ipv6(self, caplog, gateway): + cli(["--server-address", server_addr_ipv6, "create_bdev", "-i", image, "-p", pool, "-b", bdev + "_ipv6"]) + assert "Failed to create" not in caplog.text + + + def test_create_subsystem_ana(self, caplog, gateway): + caplog.clear() + cli(["create_subsystem", "-n", subsystem, "-a", "-t"]) + assert "Failed to create" not in caplog.text + assert "ana reporting: True" in caplog.text + cli(["get_subsystems"]) + assert serial not in caplog.text + + def test_add_namespace_ana(self, caplog, gateway): + cli(["add_namespace", "-n", subsystem, "-b", bdev, "-a", anagrpid]) + assert "Failed to add" not in caplog.text + + @pytest.mark.parametrize("listener", listener_list) + def test_create_listener_ana(self, caplog, listener, gateway): + cli(["create_listener", "-n", subsystem] + listener) + assert "Failed to create" not in caplog.text + assert "enable_ha: True" in caplog.text + + +class TestDeleteAna: + + @pytest.mark.parametrize("listener", listener_list) + def test_delete_listener_ana(self, caplog, listener, gateway): + cli(["delete_listener", "-n", subsystem] + listener) + assert "Failed to delete" not in caplog.text + + def test_remove_namespace_ana(self, caplog, gateway): + cli(["remove_namespace", "-n", subsystem, "-i", nsid]) + assert "Failed to remove" not in caplog.text + + def test_delete_bdev_ana(self, caplog, gateway): + cli(["delete_bdev", "-b", bdev, "-f"]) + assert "Failed to delete" not in caplog.text + cli(["delete_bdev", "-b", bdev + "_ipv6", "-f"]) + assert "Failed to delete" not in caplog.text + + def test_delete_subsystem_ana(self, caplog, gateway): + cli(["delete_subsystem", "-n", subsystem]) + assert "Failed to delete" not in caplog.text + diff --git a/tests/test_grpc.py b/tests/test_grpc.py new file mode 100644 index 00000000..e92a11e3 --- /dev/null +++ b/tests/test_grpc.py @@ -0,0 +1,59 @@ +import pytest +import time +from control.server import GatewayServer +from control.cli import main as cli +import logging +import warnings + +# Set up a logger +logger = logging.getLogger(__name__) +image = "mytestdevimage" +pool = "rbd" +bdev_prefix = "Ceph0" +subsystem_prefix = "nqn.2016-06.io.spdk:cnode" +created_resource_count = 500 +get_subsys_count = 100 + +def create_resource_by_index(i): + bdev = f"{bdev_prefix}_{i}" + cli(["create_bdev", "-i", image, "-p", pool, "-b", bdev]) + subsystem = f"{subsystem_prefix}{i}" + cli(["create_subsystem", "-n", subsystem ]) + cli(["add_namespace", "-n", subsystem, "-b", bdev]) + +def check_resource_by_index(i, caplog): + bdev = f"{bdev_prefix}_{i}" + # notice that this also verifies the namespace as the bdev name is in the namespaces section + assert f"{bdev}" in caplog.text + subsystem = f"{subsystem_prefix}{i}" + assert f"{subsystem}" in caplog.text + +# We want to fail in case we got an exception about invalid data in pb2 functions but this is just a warning +# for pytest. In order for the test to fail in such a case we need to ask pytest to regard this as an error +@pytest.mark.filterwarnings("error::pytest.PytestUnhandledThreadExceptionWarning") +def test_create_get_subsys(caplog, config): + with GatewayServer(config) as gateway: + gateway.serve() + + for i in range(created_resource_count): + create_resource_by_index(i) + assert "Failed" not in caplog.text + + caplog.clear() + + # restart the gateway here + with GatewayServer(config) as gateway: + gateway.serve() + + for i in range(get_subsys_count): + cli(["get_subsystems"]) + assert "Exception" not in caplog.text + time.sleep(0.1) + + time.sleep(20) # Make sure update() is over + caplog.clear() + cli(["get_subsystems"]) + assert "Exception" not in caplog.text + assert "get_subsystems: []" not in caplog.text + for i in range(created_resource_count): + check_resource_by_index(i, caplog) diff --git a/tests/test_multi_gateway.py b/tests/test_multi_gateway.py index 8b57d6ec..a3cf217b 100644 --- a/tests/test_multi_gateway.py +++ b/tests/test_multi_gateway.py @@ -20,6 +20,8 @@ def conn(config): configA.config["gateway"]["state_update_notify"] = str(update_notify) configA.config["gateway"]["min_controller_id"] = "1" configA.config["gateway"]["max_controller_id"] = "20000" + configA.config["gateway"]["enable_spdk_discovery_controller"] = "True" + configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayA.sock" configB = copy.deepcopy(configA) addr = configA.get("gateway", "addr") portA = configA.getint("gateway", "port") @@ -30,7 +32,7 @@ def conn(config): configA.config["gateway"]["max_controller_id"] = "40000" configB.config["gateway"]["state_update_interval_sec"] = str( update_interval_sec) - configB.config["spdk"]["rpc_socket"] = "/var/tmp/spdk_GatewayB.sock" + configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock" configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02" # Start servers @@ -71,9 +73,6 @@ def test_multi_gateway_coordination(config, image, conn): num_subsystems = 2 pool = config.get("ceph", "pool") - enable_discovery_controller = config.getboolean_with_default("gateway", "enable_discovery_controller", False) - if not enable_discovery_controller: - num_subsystems -= 1 # Send requests to create a subsystem with one namespace to GatewayA bdev_req = pb2.create_bdev_req(bdev_name=bdev, diff --git a/tests/test_omap_lock.py b/tests/test_omap_lock.py new file mode 100644 index 00000000..107afc3d --- /dev/null +++ b/tests/test_omap_lock.py @@ -0,0 +1,208 @@ +import pytest +import copy +import grpc +import json +import time +from control.server import GatewayServer +from control.proto import gateway_pb2 as pb2 +from control.proto import gateway_pb2_grpc as pb2_grpc +import spdk.rpc.bdev as rpc_bdev + +image = "mytestdevimage" +pool = "rbd" +bdev_prefix = "Ceph_" +subsystem_prefix = "nqn.2016-06.io.spdk:cnode" +created_resource_count = 500 + +@pytest.fixture(scope="function") +def conn(config, request): + """Sets up and tears down Gateways A and B.""" + update_notify = True + update_interval_sec = 5 + disable_unlock = False + lock_duration = 60 + if request.node.name == "test_multi_gateway_omap_reread": + update_notify = False + update_interval_sec = 300 + elif request.node.name == "test_trying_to_lock_twice": + disable_unlock = True + lock_duration = 100 # This should be bigger than lock retries * retry sleep interval + + # Setup GatewayA and GatewayB configs + configA = copy.deepcopy(config) + configA.config["gateway"]["name"] = "GatewayA" + configA.config["gateway"]["group"] = "Group1" + configA.config["gateway"]["state_update_notify"] = str(update_notify) + configA.config["gateway"]["state_update_interval_sec"] = str(update_interval_sec) + configA.config["gateway"]["omap_file_disable_unlock"] = str(disable_unlock) + configA.config["gateway"]["omap_file_lock_duration"] = str(lock_duration) + configA.config["gateway"]["min_controller_id"] = "1" + configA.config["gateway"]["max_controller_id"] = "20000" + configA.config["gateway"]["enable_spdk_discovery_controller"] = "True" + configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayA.sock" + configB = copy.deepcopy(configA) + addr = configA.get("gateway", "addr") + portA = configA.getint("gateway", "port") + portB = portA + 1 + configB.config["gateway"]["name"] = "GatewayB" + configB.config["gateway"]["port"] = str(portB) + configB.config["gateway"]["min_controller_id"] = "20001" + configB.config["gateway"]["max_controller_id"] = "40000" + configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock" + configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02" + + # Start servers + with ( + GatewayServer(configA) as gatewayA, + GatewayServer(configB) as gatewayB, + ): + gatewayA.serve() + # Delete existing OMAP state + gatewayA.gateway_rpc.gateway_state.delete_state() + # Create new + gatewayB.serve() + + # Bind the client and Gateways A & B + channelA = grpc.insecure_channel(f"{addr}:{portA}") + stubA = pb2_grpc.GatewayStub(channelA) + channelB = grpc.insecure_channel(f"{addr}:{portB}") + stubB = pb2_grpc.GatewayStub(channelB) + yield stubA, stubB, gatewayA.gateway_rpc, gatewayB.gateway_rpc + + # Stop gateways + gatewayA.server.stop(grace=1) + gatewayB.server.stop(grace=1) + gatewayB.gateway_rpc.gateway_state.delete_state() + +def test_multi_gateway_omap_reread(config, conn, caplog): + """Tests reading out of date OMAP file + """ + stubA, stubB, gatewayA, gatewayB = conn + bdev = bdev_prefix + "X0" + bdev2 = bdev_prefix + "X1" + bdev3 = bdev_prefix + "X2" + nqn = subsystem_prefix + "X1" + serial = "SPDK00000000000001" + nsid = 10 + num_subsystems = 2 + + # Send requests to create a subsystem with one namespace to GatewayA + bdev_req = pb2.create_bdev_req(bdev_name=bdev, + rbd_pool_name=pool, + rbd_image_name=image, + block_size=4096) + subsystem_req = pb2.create_subsystem_req(subsystem_nqn=nqn, + serial_number=serial) + namespace_req = pb2.add_namespace_req(subsystem_nqn=nqn, + bdev_name=bdev, + nsid=nsid) + get_subsystems_req = pb2.get_subsystems_req() + ret_bdev = stubA.create_bdev(bdev_req) + ret_subsystem = stubA.create_subsystem(subsystem_req) + ret_namespace = stubA.add_namespace(namespace_req) + assert ret_bdev.status is True + assert ret_subsystem.status is True + assert ret_namespace.status is True + + # Until we create some resource on GW-B it shouldn't still have the resrouces created on GW-A, only the discovery subsystem + watchB = stubB.get_subsystems(get_subsystems_req) + listB = json.loads(watchB.subsystems) + assert len(listB) == 1 + + watchA = stubA.get_subsystems(get_subsystems_req) + listA = json.loads(watchA.subsystems) + assert len(listA) == num_subsystems + + bdev2_req = pb2.create_bdev_req(bdev_name=bdev2, + rbd_pool_name=pool, + rbd_image_name=image, + block_size=4096) + ret_bdev2 = stubB.create_bdev(bdev2_req) + assert ret_bdev2.status is True + assert "The file is not current, will reload it and try again" in caplog.text + + # Make sure that after reading the OMAP file GW-B has the subsystem and namespace created on GW-A + watchB = stubB.get_subsystems(get_subsystems_req) + listB = json.loads(watchB.subsystems) + assert len(listB) == num_subsystems + assert listB[num_subsystems-1]["nqn"] == nqn + assert listB[num_subsystems-1]["serial_number"] == serial + assert listB[num_subsystems-1]["namespaces"][0]["nsid"] == nsid + assert listB[num_subsystems-1]["namespaces"][0]["bdev_name"] == bdev + + caplog.clear() + bdev3_req = pb2.create_bdev_req(bdev_name=bdev3, + rbd_pool_name=pool, + rbd_image_name=image, + block_size=4096) + ret_bdev3 = stubB.create_bdev(bdev3_req) + assert ret_bdev3.status is True + assert "The file is not current, will reload it and try again" not in caplog.text + + bdevsA = rpc_bdev.bdev_get_bdevs(gatewayA.spdk_rpc_client) + bdevsB = rpc_bdev.bdev_get_bdevs(gatewayB.spdk_rpc_client) + # GW-B should have the bdev created on GW-A after reading the OMAP file plus the two we created on it + # GW-A should only have the bdev created on it as we didn't update it after creating the bdev on GW-B + assert len(bdevsA) == 1 + assert len(bdevsB) == 3 + assert bdevsA[0]["name"] == bdev + assert bdevsB[0]["name"] == bdev + assert bdevsB[1]["name"] == bdev2 + assert bdevsB[2]["name"] == bdev3 + +def test_trying_to_lock_twice(config, image, conn, caplog): + """Tests an attempt to lock the OMAP file from two gateways at the same time + """ + caplog.clear() + stubA, stubB, gatewayA, gatewayB = conn + + with pytest.raises(Exception) as ex: + create_resource_by_index(stubA, 0) + create_resource_by_index(stubB, 1) + assert "OMAP file unlock was disabled, will not unlock file" in caplog.text + assert "The OMAP file is locked, will try again in" in caplog.text + assert "Unable to lock OMAP file" in caplog.text + time.sleep(120) # Wait enough time for OMAP lock to be released + +def create_resource_by_index(stub, i): + bdev = f"{bdev_prefix}{i}" + bdev_req = pb2.create_bdev_req(bdev_name=bdev, + rbd_pool_name=pool, + rbd_image_name=image, + block_size=4096) + ret_bdev = stub.create_bdev(bdev_req) + assert ret_bdev + subsystem = f"{subsystem_prefix}{i}" + subsystem_req = pb2.create_subsystem_req(subsystem_nqn=subsystem) + ret_subsystem = stub.create_subsystem(subsystem_req) + assert ret_subsystem + namespace_req = pb2.add_namespace_req(subsystem_nqn=subsystem, + bdev_name=bdev) + ret_namespace = stub.add_namespace(namespace_req) + assert ret_namespace + +def check_resource_by_index(i, caplog): + bdev = f"{bdev_prefix}{i}" + # notice that this also verifies the namespace as the bdev name is in the namespaces section + assert f"{bdev}" in caplog.text + subsystem = f"{subsystem_prefix}{i}" + assert f"{subsystem}" in caplog.text + +def test_multi_gateway_concurrent_changes(config, image, conn, caplog): + """Tests concurrent changes to the OMAP from two gateways + """ + caplog.clear() + stubA, stubB, gatewayA, gatewayB = conn + + for i in range(created_resource_count): + if i % 2: + stub = stubA + else: + stub = stubB + create_resource_by_index(stub, i) + assert "Failed" not in caplog.text + + # Let the update some time to bring both gateways to the same page + time.sleep(15) + for i in range(created_resource_count): + check_resource_by_index(i, caplog) diff --git a/tests/test_server.py b/tests/test_server.py index 42a38797..cd199749 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -12,7 +12,7 @@ def _config(self, config): self.config = config def validate_exception(self, e): - pattern = r'spdk subprocess terminated pid=(\d+) exit_code=(\d+)' + pattern = r'Gateway subprocess terminated pid=(\d+) exit_code=(\d+)' m = re.match(pattern, e.code) assert(m) pid = int(m.group(1)) @@ -55,7 +55,7 @@ def test_spdk_multi_gateway_exception(self): configB = copy.deepcopy(configA) configB.config["gateway"]["name"] = "GatewayB" configB.config["gateway"]["port"] = str(configA.getint("gateway", "port") + 1) - configB.config["spdk"]["rpc_socket"] = "/var/tmp/spdk_GatewayB.sock" + configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock" # invalid arg, spdk would exit with code 1 at start up configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x343435545" diff --git a/tests/test_state.py b/tests/test_state.py index 9f7daf3e..acd988d8 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -4,14 +4,10 @@ from control.state import LocalGatewayState, OmapGatewayState, GatewayStateHandler -@pytest.fixture(scope="module") -def ioctx(config): +@pytest.fixture +def ioctx(omap_state, config): """Opens IO context to ceph pool.""" - ceph_pool = config.get("ceph", "pool") - ceph_conf = config.get("ceph", "config_file") - conn = rados.Rados(conffile=ceph_conf) - conn.connect() - ioctx = conn.open_ioctx(ceph_pool) + ioctx = omap_state.open_rados_connection(config) yield ioctx ioctx.close()