Skip to content

Commit

Permalink
Merge pull request #11748 from mmaslankaprv/ff-incremental-fix
Browse files Browse the repository at this point in the history
Fixed inter-operation between follower fetching and incremental fetch requests
  • Loading branch information
mmaslankaprv authored Jun 29, 2023
2 parents d14fd22 + 6dc0c9a commit 745b12b
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/v/kafka/server/fetch_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace kafka {
struct fetch_session_partition {
model::ktp_with_hash topic_partition;
int32_t max_bytes;
model::offset start_offset;
model::offset fetch_offset;
model::offset high_watermark;
model::offset last_stable_offset;
Expand Down
17 changes: 17 additions & 0 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,17 @@ bool update_fetch_partition(
include = true;
partition.last_stable_offset = model::offset(resp.last_stable_offset);
}
if (partition.start_offset != resp.log_start_offset) {
include = true;
partition.start_offset = model::offset(resp.log_start_offset);
}
/**
* Always include partition in a response if it contains information about
* the preferred replica
*/
if (resp.preferred_read_replica != -1) {
include = true;
}
if (include) {
return include;
}
Expand Down Expand Up @@ -1013,6 +1024,8 @@ ss::future<response_ptr> op_context::send_response() && {
.last_stable_offset = it->partition_response->last_stable_offset,
.log_start_offset = it->partition_response->log_start_offset,
.aborted = std::move(it->partition_response->aborted),
.preferred_read_replica
= it->partition_response->preferred_read_replica,
.records = std::move(it->partition_response->records)};

final_response.data.topics.back().partitions.push_back(std::move(r));
Expand Down Expand Up @@ -1108,6 +1121,10 @@ std::optional<model::node_id> rack_aware_replica_selector::select_replica(
std::vector<replica_info> rack_replicas;
model::offset highest_hw;
for (auto& replica : p_info.replicas) {
// filter out replicas which are not responsive
if (!replica.is_alive) {
continue;
}
if (
_md_cache.get_node_rack_id(replica.id) == c_info.rack_id
&& replica.log_end_offset >= c_info.fetch_offset) {
Expand Down
34 changes: 26 additions & 8 deletions tests/rptest/services/kafka_cli_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import re
import threading
import time
from ducktape.services.background_thread import BackgroundThreadService

from rptest.clients.kafka_cli_tools import KafkaCliTools
Expand Down Expand Up @@ -41,17 +42,23 @@ def __init__(self,
self._stopping = threading.Event()
self._instance_name = "cli-consumer" if instance_name is None else instance_name
self._done = None
self._progress_reporter = None
self._last_consumed = time.time()
self._lock = threading.Lock()
assert self._partitions is not None or self._group is not None, "either partitions or group have to be set"

self._cli = KafkaCliTools(self._redpanda)
self._messages = []
self._message_cnt = 0

def script(self):
return self._cli._script("kafka-console-consumer.sh")

def _worker(self, _, node):
self._done = False
self._stopping.clear()
self._progress_reporter = threading.Thread(
target=lambda: self._report_progress(), daemon=True)
self._progress_reporter.start()
try:

cmd = [self.script()]
Expand All @@ -73,12 +80,10 @@ def _worker(self, _, node):

cmd += ["--bootstrap-server", self._redpanda.brokers()]

for line in node.account.ssh_capture(' '.join(cmd)):
line.strip()
line = line.replace("\n", "")
self.logger.debug(
f"[{self._instance_name}] consumed: '{line}'")
self._messages.append(line)
for _ in node.account.ssh_capture(' '.join(cmd)):
with self._lock:
self._message_cnt += 1
self._last_consumed = time.time()
except:
if self._stopping.is_set():
# Expect a non-zero exit code when killing during teardown
Expand All @@ -89,7 +94,7 @@ def _worker(self, _, node):
self._done = True

def wait_for_messages(self, messages, timeout=30):
wait_until(lambda: len(self._messages) >= messages,
wait_until(lambda: self._message_cnt >= messages,
timeout,
backoff_sec=2)

Expand All @@ -105,6 +110,8 @@ def all_started():
def stop_node(self, node):
self._stopping.set()
node.account.kill_process("java", clean_shutdown=True)
if self._progress_reporter.is_alive():
self._progress_reporter.join()

try:
wait_until(lambda: self._done is None or self._done == True,
Expand All @@ -120,3 +127,14 @@ def stop_node(self, node):
err_msg=
f"{self._instance_name} running on {node.name} failed to stop after SIGKILL"
)

def _report_progress(self):
while (not self._stopping.is_set()):
with self._lock:
self.logger.info(
f"Consumed {self._message_cnt} messages, time since last consumed: {time.time() - self._last_consumed} seconds"
)
if self._stopping.is_set():
break

time.sleep(5)
2 changes: 1 addition & 1 deletion tests/rptest/tests/consumer_group_balancing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def create_consumer(self,
consumer_properties, instance_id))

def consumed_at_least(consumers, count):
return all([len(c._messages) > count for c in consumers])
return all([c._message_cnt > count for c in consumers])

def validate_group_state(self,
group,
Expand Down
4 changes: 2 additions & 2 deletions tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ def group_is_ready():
return consumers

def consumed_at_least(consumers, count):
return all([len(c._messages) > count for c in consumers])
return all([c._message_cnt > count for c in consumers])

def group_consumed_at_least(consumers, count):
return sum([len(c._messages) for c in consumers]) >= count
return sum([c._message_cnt for c in consumers]) >= count

def validate_group_state(self, group, expected_state, static_members):
rpk = RpkTool(self.redpanda)
Expand Down
87 changes: 87 additions & 0 deletions tests/rptest/tests/follower_fetching_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import random
import string
import time
from ducktape.mark import matrix
from rptest.clients.rpk import RpkTool
from rptest.clients.types import TopicSpec
Expand All @@ -20,6 +21,10 @@
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.util import wait_for_local_storage_truncate

from ducktape.utils.util import wait_until

from rptest.utils.mode_checks import skip_debug_mode


class FollowerFetchingTest(PreallocNodesTest):
def __init__(self, test_context):
Expand Down Expand Up @@ -164,3 +169,85 @@ def test_basic_follower_fetching(self, read_from_object_store):
assert current_bytes_fetched > 0
else:
assert current_bytes_fetched == 0


class IncrementalFollowerFetchingTest(PreallocNodesTest):
def __init__(self, test_context):
super(IncrementalFollowerFetchingTest,
self).__init__(test_context=test_context,
num_brokers=3,
node_prealloc_count=1,
extra_rp_conf={
'enable_rack_awareness': True,
})

@skip_debug_mode
@cluster(num_nodes=5)
@matrix(follower_offline=[True, False])
def test_incremental_fetch_from_follower(self, follower_offline):
rack_layout_str = "ABC"
rack_layout = [str(i) for i in rack_layout_str]

for ix, node in enumerate(self.redpanda.nodes):
extra_node_conf = {
'rack': rack_layout[ix],
'enable_rack_awareness': True,
}
self.redpanda.set_extra_node_conf(node, extra_node_conf)

self.redpanda.start()
topic = TopicSpec(partition_count=12, replication_factor=3)

self.client().create_topic(topic)
msg_size = 512

producer = KgoVerifierProducer(self.test_context,
self.redpanda,
topic,
msg_size,
1000000,
self.preallocated_nodes,
rate_limit_bps=256 * 1024)

producer.start()
consumer_group = "kafka-cli-group"
rack = "A"

# We are using kafka cli consumer to control metadata age and client rack id consumer properties
cli_consumer = KafkaCliConsumer(self.test_context,
self.redpanda,
topic.name,
group="kafka-cli-group",
consumer_properties={
"client.rack": rack,
"metadata.max.age.ms": 10000
})
cli_consumer.start()
cli_consumer.wait_for_messages(100)
if follower_offline:
idx = rack_layout_str.find(rack)
self.redpanda.stop_node(self.redpanda.get_node(idx))

# sleep long enough to cause metadata refresh on the consumer
time.sleep(30)
# stop the producer
producer.stop()
rpk = RpkTool(self.redpanda)

def no_lag():
gr = rpk.group_describe(consumer_group)
if gr.state != "Stable":
return False

return all([p.lag == 0 for p in gr.partitions])

# wait for consumer to clear the backlog
wait_until(
no_lag,
60,
backoff_sec=3,
err_msg=
"Consumer did not finish consuming topic. Lag exists on some of the partitions"
)

cli_consumer.stop()

0 comments on commit 745b12b

Please sign in to comment.