Skip to content

Commit

Permalink
tests: test producing/consuming to disabled partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed Nov 28, 2023
1 parent 343115c commit b441e63
Showing 1 changed file with 197 additions and 40 deletions.
237 changes: 197 additions & 40 deletions tests/rptest/tests/recovery_mode_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@
from rptest.util import wait_until_result


def _produce(test, topic, msg_count=1000, partition=None):
producer = RpkProducer(context=test.test_context,
redpanda=test.redpanda,
topic=topic,
msg_size=4096,
msg_count=msg_count,
partition=partition)
try:
producer.run()
finally:
producer.free()


def assert_rpk_fails(cmd, error_msg):
try:
cmd()
except RpkException:
pass
else:
assert False, error_msg


class RecoveryModeTest(RedpandaTest):
def __init__(self, *args, **kwargs):
super().__init__(*args, num_brokers=4, **kwargs)
Expand All @@ -29,17 +51,6 @@ def setUp(self):
# start the nodes manually
pass

def _produce(self, topic):
producer = RpkProducer(context=self.test_context,
redpanda=self.redpanda,
topic=topic,
msg_size=4096,
msg_count=1000)
try:
producer.run()
finally:
producer.free()

@cluster(num_nodes=5)
def test_recovery_mode(self):
"""Test that after restarting the cluster in recovery mode, produce/consume is forbidden,
Expand All @@ -62,7 +73,7 @@ def test_recovery_mode(self):
rpk.create_topic("mytopic1", partitions=5, replicas=3)
rpk.create_topic("mytopic2", partitions=5, replicas=3)

self._produce("mytopic1")
_produce(self, "mytopic1")

partitions = list(rpk.describe_topic("mytopic1", tolerant=False))
assert len(partitions) == 5
Expand Down Expand Up @@ -91,32 +102,19 @@ def test_recovery_mode(self):
assert len(partitions) == 5
assert all(p.load_error is not None for p in partitions)

try:
rpk.produce('mytopic1', 'key', 'msg')
except RpkException:
pass
else:
assert False, "producing should fail"

try:
rpk.consume('mytopic1', n=1000, quiet=True, timeout=10)
# rpk will retry indefinitely even in the presence of non-retryable errors,
# so just wait for the timeout to occur.
except RpkException:
pass
else:
assert False, "consuming should fail"

try:
rpk.consume('mytopic1',
n=1000,
group='mygroup3',
quiet=True,
timeout=10)
except RpkException:
pass
else:
assert False, "group consuming should fail"
assert_rpk_fails(lambda: rpk.produce('mytopic1', 'key', 'msg'),
"producing should fail")

# rpk will retry indefinitely even in the presence of non-retryable errors,
# so just wait for the timeout to occur.
assert_rpk_fails(
lambda: rpk.consume('mytopic1', n=1000, quiet=True, timeout=10),
"consuming should fail")

assert_rpk_fails(
lambda: rpk.consume(
'mytopic1', n=1000, group='mygroup3', quiet=True, timeout=10),
"group consuming should fail")

# check consumer group ops

Expand Down Expand Up @@ -177,7 +175,7 @@ def test_recovery_mode(self):

# check that produce and consume work

self._produce("mytopic1")
_produce(self, "mytopic1")

def partitions_ready():
partitions = list(rpk.describe_topic("mytopic1", tolerant=False))
Expand All @@ -200,7 +198,7 @@ def partitions_ready():
assert len(consumed) == 2000


@dataclasses.dataclass
@dataclasses.dataclass(frozen=True)
class PartitionInfo:
ns: str
topic: str
Expand Down Expand Up @@ -352,3 +350,162 @@ def check_everything():
self.redpanda.wait_for_membership(first_start=False)

check_everything()

@cluster(num_nodes=5)
def test_disable(self):
"""
Test that disabled partitions are shut down and that producing/consuming
to them errors out, while producing/consuming to other partitions is
still possible.
"""

rpk = RpkTool(self.redpanda)
admin = Admin(self.redpanda)

topics = ["mytopic1", "mytopic2"]
for topic in topics:
rpk.create_topic(topic, partitions=2, replicas=3)

def get_node_partitions(node):
return [
f'{p["topic"]}/{p["partition_id"]}'
for p in admin.get_partitions(node=node)
if p["topic"].startswith("mytopic")
]

def get_partition_counts():
ret = dict()
for n in self.redpanda.nodes:
for p in get_node_partitions(n):
ret[p] = ret.setdefault(p, 0) + 1
self.logger.debug(f"partition counts: {ret}")
return ret

def all_created():
pc = get_partition_counts()
return len(pc) == 4 and all(c == 3 for c in pc.values())

wait_until(all_created,
timeout_sec=30,
backoff_sec=1,
err_msg="failed to wait until all partitions are created")

_produce(self, "mytopic1")
_produce(self, "mytopic2")

def get_hwms(topic=None):
ts = topics if topic is None else [topic]
ret = dict()
for topic in ts:
for p in rpk.describe_topic(topic):
ret[f"{topic}/{p.id}"] = p.high_watermark
return ret

orig_hwms = get_hwms()
assert len(orig_hwms) == 4
for topic in topics:
assert sum(hwm for ntp, hwm in orig_hwms.items()
if ntp.startswith(topic)) == 1000

self.logger.info("disabling partitions")

admin.set_partitions_disabled(ns="kafka",
topic="mytopic1",
partition=1)
admin.set_partitions_disabled(ns="kafka", topic="mytopic2")

def all_disabled_shut_down():
pc = get_partition_counts()
return set(pc.keys()) == {"mytopic1/0"} and all(
c == 3 for c in pc.values())

wait_until(
all_disabled_shut_down,
timeout_sec=30,
backoff_sec=1,
err_msg="failed to wait until all disabled partitions are shut down"
)

hwms2 = get_hwms()
assert len(hwms2) == 1
assert hwms2["mytopic1/0"] == orig_hwms["mytopic1/0"]

# test that producing to disabled partitions fails, while producing to
# the remaining partition is still possible

_produce(self, "mytopic1", partition=0)

assert_rpk_fails(
lambda: rpk.produce('mytopic1', 'key', 'msg', partition=1),
"producing should fail")

assert_rpk_fails(lambda: rpk.produce('mytopic2', 'key', 'msg'),
"producing should fail")

hwms3 = get_hwms()
assert len(hwms2) == 1
assert hwms3["mytopic1/0"] == hwms2["mytopic1/0"] + 1000

# the same with consuming

assert len(
rpk.consume(
'mytopic1', n=hwms3["mytopic1/0"],
quiet=True).rstrip().split('\n')) == hwms3["mytopic1/0"]

assert_rpk_fails(
lambda: rpk.consume(
'mytopic1', n=hwms3["mytopic1/0"] + 1, timeout=10),
"consuming should fail")

assert_rpk_fails(lambda: rpk.consume('mytopic2', n=1, timeout=10),
"consuming should fail")

self.logger.info("restarting cluster")

self.redpanda.restart_nodes(self.redpanda.nodes)
self.redpanda.wait_for_membership(first_start=False)

# test that partitions are still disabled after restart

assert all_disabled_shut_down()

assert get_hwms() == hwms3

self.logger.info("enabling partitions back")

admin.set_partitions_disabled(ns="kafka",
topic="mytopic1",
value=False)
admin.set_partitions_disabled(ns="kafka",
topic="mytopic2",
value=False)

wait_until(all_created,
timeout_sec=30,
backoff_sec=1,
err_msg="failed to wait until all partitions are created")

wait_until(lambda: set(get_hwms().keys()) == set(orig_hwms.keys()),
timeout_sec=30,
backoff_sec=1,
err_msg="failed to wait until all leaders elected")

# test that producing and consuming works after re-enabling all partitions

hwms4 = get_hwms()
assert set(hwms4.keys()) == set(orig_hwms.keys())
for ntp, hwm in hwms4.items():
if ntp == "mytopic1/0":
assert hwm == orig_hwms[ntp] + 1000
else:
assert hwm == orig_hwms[ntp]

_produce(self, "mytopic1")
_produce(self, "mytopic2")

assert sum(hwm for hwm in get_hwms("mytopic1").values()) == 3000
assert sum(hwm for hwm in get_hwms("mytopic2").values()) == 2000

rpk.consume('mytopic1', n=3000, quiet=True)
rpk.consume('mytopic2', n=2000, quiet=True)

0 comments on commit b441e63

Please sign in to comment.