Skip to content

Commit

Permalink
Fix OMAP lock test.
Browse files Browse the repository at this point in the history
Fixes ceph#327

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman committed Nov 22, 2023
1 parent 2acfd4a commit e40b9b8
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 31 deletions.
9 changes: 9 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ def test_create_bdev_ana_ipv6(self, caplog, gateway):

def test_create_subsystem_ana(self, caplog, gateway):
caplog.clear()
with pytest.raises(Exception) as ex:
try:
cli(["create_subsystem", "-n", subsystem, "-t"])
except SystemExit as sysex:
# should fail with non-zero return code
assert sysex != 0
pass
assert "HA enabled but ANA-reporting is disabled" in str(ex.value)
caplog.clear()
cli(["create_subsystem", "-n", subsystem, "-a", "-t"])
assert f"Created subsystem {subsystem}: True" in caplog.text
assert "ana reporting: True" in caplog.text
Expand Down
2 changes: 1 addition & 1 deletion tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_create_get_subsys(caplog, config):

for i in range(created_resource_count):
create_resource_by_index(i)
assert "Failed" not in caplog.text
assert "failed" not in caplog.text.lower()

caplog.clear()

Expand Down
189 changes: 159 additions & 30 deletions tests/test_omap_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,13 @@
subsystem_prefix = "nqn.2016-06.io.spdk:cnode"
created_resource_count = 500

@pytest.fixture(scope="function")
def conn(config, request):
@pytest.fixture(scope="module")
def conn_omap_reread(config, request):
"""Sets up and tears down Gateways A and B."""
update_notify = True
update_interval_sec = 5
update_notify = False
update_interval_sec = 300
disable_unlock = False
lock_duration = 60
if request.node.name == "test_multi_gateway_omap_reread":
update_notify = False
update_interval_sec = 300
elif request.node.name == "test_trying_to_lock_twice":
disable_unlock = True
lock_duration = 100 # This should be bigger than lock retries * retry sleep interval

# Setup GatewayA and GatewayB configs
configA = copy.deepcopy(config)
Expand Down Expand Up @@ -75,10 +69,120 @@ def conn(config, request):
gatewayB.server.stop(grace=1)
gatewayB.gateway_rpc.gateway_state.delete_state()

def test_multi_gateway_omap_reread(config, conn, caplog):
@pytest.fixture(scope="module")
def conn_lock_twice(config, request):
"""Sets up and tears down Gateways A and B."""
update_notify = True
update_interval_sec = 5
disable_unlock = True
lock_duration = 100 # This should be bigger than lock retries * retry sleep interval

# Setup GatewayA and GatewayB configs
configA = copy.deepcopy(config)
configA.config["gateway"]["name"] = "GatewayAA"
configA.config["gateway"]["group"] = "Group2"
configA.config["gateway"]["state_update_notify"] = str(update_notify)
configA.config["gateway"]["state_update_interval_sec"] = str(update_interval_sec)
configA.config["gateway"]["omap_file_disable_unlock"] = str(disable_unlock)
configA.config["gateway"]["omap_file_lock_duration"] = str(lock_duration)
configA.config["gateway"]["min_controller_id"] = "1"
configA.config["gateway"]["max_controller_id"] = "20000"
configA.config["gateway"]["enable_spdk_discovery_controller"] = "True"
configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayAA.sock"
portA = configA.getint("gateway", "port") + 2
configA.config["gateway"]["port"] = str(portA)
configB = copy.deepcopy(configA)
addr = configA.get("gateway", "addr")
portB = portA + 1
configB.config["gateway"]["name"] = "GatewayBB"
configB.config["gateway"]["port"] = str(portB)
configB.config["gateway"]["min_controller_id"] = "20001"
configB.config["gateway"]["max_controller_id"] = "40000"
configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayBB.sock"
configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02"

# Start servers
with (
GatewayServer(configA) as gatewayA,
GatewayServer(configB) as gatewayB,
):
gatewayA.serve()
# Delete existing OMAP state
gatewayA.gateway_rpc.gateway_state.delete_state()
# Create new
gatewayB.serve()

# Bind the client and Gateways A & B
channelA = grpc.insecure_channel(f"{addr}:{portA}")
stubA = pb2_grpc.GatewayStub(channelA)
channelB = grpc.insecure_channel(f"{addr}:{portB}")
stubB = pb2_grpc.GatewayStub(channelB)
yield stubA, stubB

# Stop gateways
gatewayA.server.stop(grace=1)
gatewayB.server.stop(grace=1)
gatewayB.gateway_rpc.gateway_state.delete_state()

@pytest.fixture(scope="module")
def conn_concurrent(config, request):
"""Sets up and tears down Gateways A and B."""
update_notify = True
update_interval_sec = 5
disable_unlock = False
lock_duration = 60

# Setup GatewayA and GatewayB configs
configA = copy.deepcopy(config)
configA.config["gateway"]["name"] = "GatewayAAA"
configA.config["gateway"]["group"] = "Group3"
configA.config["gateway"]["state_update_notify"] = str(update_notify)
configA.config["gateway"]["state_update_interval_sec"] = str(update_interval_sec)
configA.config["gateway"]["omap_file_disable_unlock"] = str(disable_unlock)
configA.config["gateway"]["omap_file_lock_duration"] = str(lock_duration)
configA.config["gateway"]["min_controller_id"] = "1"
configA.config["gateway"]["max_controller_id"] = "20000"
configA.config["gateway"]["enable_spdk_discovery_controller"] = "True"
configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayAAA.sock"
portA = configA.getint("gateway", "port") + 4
configA.config["gateway"]["port"] = str(portA)
configB = copy.deepcopy(configA)
addr = configA.get("gateway", "addr")
portB = portA + 1
configB.config["gateway"]["name"] = "GatewayBBB"
configB.config["gateway"]["port"] = str(portB)
configB.config["gateway"]["min_controller_id"] = "20001"
configB.config["gateway"]["max_controller_id"] = "40000"
configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayBBB.sock"
configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02"

# Start servers
with (
GatewayServer(configA) as gatewayA,
GatewayServer(configB) as gatewayB,
):
gatewayA.serve()
# Delete existing OMAP state
gatewayA.gateway_rpc.gateway_state.delete_state()
# Create new
gatewayB.serve()

# Bind the client and Gateways A & B
channelA = grpc.insecure_channel(f"{addr}:{portA}")
stubA = pb2_grpc.GatewayStub(channelA)
channelB = grpc.insecure_channel(f"{addr}:{portB}")
stubB = pb2_grpc.GatewayStub(channelB)
yield stubA, stubB

# Stop gateways
gatewayA.server.stop(grace=1)
gatewayB.server.stop(grace=1)
gatewayB.gateway_rpc.gateway_state.delete_state()

def test_multi_gateway_omap_reread(config, conn_omap_reread, caplog):
"""Tests reading out of date OMAP file
"""
stubA, stubB, gatewayA, gatewayB = conn
stubA, stubB, gatewayA, gatewayB = conn_omap_reread
bdev = bdev_prefix + "X0"
bdev2 = bdev_prefix + "X1"
bdev3 = bdev_prefix + "X2"
Expand Down Expand Up @@ -112,7 +216,7 @@ def test_multi_gateway_omap_reread(config, conn, caplog):
assert len(listB) == 1

listA = json.loads(json_format.MessageToJson(
stubB.get_subsystems(get_subsystems_req),
stubA.get_subsystems(get_subsystems_req),
preserving_proto_field_name=True))['subsystems']
assert len(listA) == num_subsystems

Expand Down Expand Up @@ -154,59 +258,84 @@ def test_multi_gateway_omap_reread(config, conn, caplog):
assert bdevsB[1]["name"] == bdev2
assert bdevsB[2]["name"] == bdev3

def test_trying_to_lock_twice(config, image, conn, caplog):
def test_trying_to_lock_twice(config, image, conn_lock_twice, caplog):
"""Tests an attempt to lock the OMAP file from two gateways at the same time
"""
caplog.clear()
stubA, stubB, gatewayA, gatewayB = conn
stubA, stubB = conn_lock_twice

with pytest.raises(Exception) as ex:
create_resource_by_index(stubA, 0)
create_resource_by_index(stubB, 1)
create_resource_by_index(stubA, 100000, None)
create_resource_by_index(stubB, 100001, None)
assert "OMAP file unlock was disabled, will not unlock file" in caplog.text
assert "The OMAP file is locked, will try again in" in caplog.text
assert "Unable to lock OMAP file" in caplog.text
time.sleep(120) # Wait enough time for OMAP lock to be released

def create_resource_by_index(stub, i):
def create_resource_by_index(stub, i, caplog):
bdev = f"{bdev_prefix}{i}"
bdev_req = pb2.create_bdev_req(bdev_name=bdev,
rbd_pool_name=pool,
rbd_image_name=image,
block_size=4096)
ret_bdev = stub.create_bdev(bdev_req)
assert ret_bdev
if caplog != None:
assert f"create_bdev: {bdev}" in caplog.text
assert "create_bdev failed" not in caplog.text
subsystem = f"{subsystem_prefix}{i}"
subsystem_req = pb2.create_subsystem_req(subsystem_nqn=subsystem)
ret_subsystem = stub.create_subsystem(subsystem_req)
assert ret_subsystem
if caplog != None:
assert f"create_subsystem {subsystem}: True" in caplog.text
assert "create_subsystem failed" not in caplog.text
namespace_req = pb2.add_namespace_req(subsystem_nqn=subsystem,
bdev_name=bdev)
ret_namespace = stub.add_namespace(namespace_req)
assert ret_namespace

def check_resource_by_index(i, caplog):
bdev = f"{bdev_prefix}{i}"
def check_resource_by_index(i, resource_list):
# notice that this also verifies the namespace as the bdev name is in the namespaces section
assert f"{bdev}" in caplog.text
bdev = f"{bdev_prefix}{i}"
subsystem = f"{subsystem_prefix}{i}"
assert f"{subsystem}" in caplog.text
found = False
for res in resource_list:
try:
if res["nqn"] != subsystem:
continue
for ns in res["namespaces"]:
if ns["bdev_name"] == bdev:
found = True
break
break
except Exception:
pass
assert found

def test_multi_gateway_concurrent_changes(config, image, conn, caplog):
def test_multi_gateway_concurrent_changes(config, image, conn_concurrent, caplog):
"""Tests concurrent changes to the OMAP from two gateways
"""
caplog.clear()
stubA, stubB, gatewayA, gatewayB = conn
stubA, stubB = conn_concurrent

for i in range(created_resource_count):
if i % 2:
stub = stubA
if i % 2 == 0:
create_resource_by_index(stubA, i, caplog)
else:
stub = stubB
create_resource_by_index(stub, i)
assert "Failed" not in caplog.text
create_resource_by_index(stubB, i, caplog)
assert "failed" not in caplog.text.lower()

# Let the update some time to bring both gateways to the same page
time.sleep(15)
caplog.clear()
get_subsystems_req = pb2.get_subsystems_req()
listA = json.loads(json_format.MessageToJson(
stubA.get_subsystems(get_subsystems_req),
preserving_proto_field_name=True))['subsystems']
listB = json.loads(json_format.MessageToJson(
stubB.get_subsystems(get_subsystems_req),
preserving_proto_field_name=True))['subsystems']
for i in range(created_resource_count):
check_resource_by_index(i, caplog)
check_resource_by_index(i, listA)
check_resource_by_index(i, listB)

0 comments on commit e40b9b8

Please sign in to comment.