Skip to content

Commit

Permalink
cloud_storage: parallelize remote_segment stop
Browse files Browse the repository at this point in the history
CONFLICT:
- omits changes to many_partitions_test which doesn't have a tiered
  storage case on this branch

The eviction loop is currently run serially and is waited for by each
remote_partition upon stopping, which effecitvely serializes
partition_manager stopping each of its partitions. This resulted in what
looked like a hang, but was actually a series of waits to stop cloud
segments.

This commit parallelizes segment stopping in both the eviction loop and
in the remote_partition stop call. A test that previously took over 9
minutes to shutdown a node now takes 10 seconds.

I also considered moving the eviction loop into the remote_partition to
further reduce partitions waiting for each other to complete, but that
wasn't necessary to avoid this issue.

This also adds a test case similar to the tiered storage
many_partitions_test case that reproduces slow shutdown with a large
number of partitions and a large number of segments. On my local
workstation, the test consistently fails to finish shutting down in 30
seconds without these changes.

(cherry picked from commit b8ba09c)
  • Loading branch information
andrwng committed Mar 2, 2023
1 parent 889e070 commit adcce7a
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 15 deletions.
14 changes: 8 additions & 6 deletions src/v/cloud_storage/materialized_segments.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "ssx/future-util.h"
#include "vlog.h"

#include <seastar/core/loop.hh>

#include <absl/container/btree_map.h>

#include <chrono>
Expand Down Expand Up @@ -144,12 +146,12 @@ ss::future<> materialized_segments::run_eviction_loop() {
while (true) {
co_await _cvar.wait([this] { return !_eviction_pending.empty(); });
_eviction_in_flight = std::exchange(_eviction_pending, {});
while (!_eviction_in_flight.empty()) {
co_await std::visit(
[](auto&& rs) { return rs->stop(); },
_eviction_in_flight.front());
_eviction_in_flight.pop_front();
}
co_await ss::max_concurrent_for_each(
_eviction_in_flight, 1024, [](auto&& rs_variant) {
return std::visit(
[](auto&& rs) { return rs->stop(); }, rs_variant);
});
_eviction_in_flight.clear();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_storage/materialized_segments.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,4 @@ class materialized_segments {
friend class remote_probe;
};

} // namespace cloud_storage
} // namespace cloud_storage
14 changes: 9 additions & 5 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,15 @@ ss::future<> remote_partition::stop() {
// segments are being stopped.
decltype(_segments) segments_to_stop;
segments_to_stop.swap(_segments);
for (auto& [offset, segment] : segments_to_stop) {
vlog(
_ctxlog.debug, "remote partition stop {}", segment->base_rp_offset());
co_await segment->stop();
}
co_await ss::max_concurrent_for_each(
segments_to_stop, 200, [this](auto& iter) {
auto& [offset, segment] = iter;
vlog(
_ctxlog.debug,
"remote partition stop {}",
segment->base_rp_offset());
return segment->stop();
});

// We may have some segment or reader objects enqueued for stop in
// the shared eviction queue: must flush it, or they can outlive
Expand Down
77 changes: 75 additions & 2 deletions tests/rptest/tests/e2e_shadow_indexing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from rptest.clients.types import TopicSpec
from rptest.services.action_injector import random_process_kills, ActionConfig
from rptest.services.cluster import cluster
from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierRandomConsumer
from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierRandomConsumer, KgoVerifierSeqConsumer
from rptest.services.redpanda import RedpandaService, CHAOS_LOG_ALLOW_LIST
from rptest.services.redpanda import SISettings
from rptest.tests.end_to_end import EndToEndTest
Expand All @@ -29,7 +29,7 @@
produce_until_segments,
wait_for_removal_of_n_segments,
)
from rptest.utils.si_utils import S3Snapshot
from rptest.utils.si_utils import nodes_report_cloud_segments, S3Snapshot
from rptest.utils.mode_checks import skip_debug_mode


Expand Down Expand Up @@ -359,6 +359,79 @@ def manifest_has_segments():
assert "0-1-v1.log" in manifest["segments"], manifest


class ShadowIndexingManyPartitionsTest(PreallocNodesTest):
small_segment_size = 4096
topic_name = f"{EndToEndShadowIndexingBase.s3_topic_name}"
topics = (TopicSpec(name=topic_name,
partition_count=128,
replication_factor=1,
redpanda_remote_write=True,
redpanda_remote_read=True,
retention_bytes=-1,
retention_ms=-1,
segment_bytes=small_segment_size), )

def __init__(self, test_context):
self.num_brokers = 1
si_settings = SISettings(
test_context,
log_segment_size=self.small_segment_size,
cloud_storage_cache_size=20 * 2**30,
cloud_storage_segment_max_upload_interval_sec=1)
super().__init__(
test_context,
node_prealloc_count=1,
extra_rp_conf={
# Avoid segment merging so we can generate many segments
# quickly.
"cloud_storage_enable_segment_merging": False,
'log_segment_size_min': 1024,
},
si_settings=si_settings)
self.kafka_tools = KafkaCliTools(self.redpanda)

def setUp(self):
self.redpanda.start()
for topic in self.topics:
self.kafka_tools.create_topic(topic)
rpk = RpkTool(self.redpanda)
rpk.alter_topic_config(self.topic,
TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_MS,
'1000')

@skip_debug_mode
@cluster(num_nodes=2)
def test_many_partitions_shutdown(self):
"""
Test that reproduces a slow shutdown when many partitions each with
many hydrated segments get shut down.
"""
producer = KgoVerifierProducer(self.test_context,
self.redpanda,
self.topic,
msg_size=1024,
msg_count=1000 * 1000,
custom_node=self.preallocated_nodes)
producer.start()
try:
wait_until(
lambda: nodes_report_cloud_segments(self.redpanda, 128 * 200),
timeout_sec=120,
backoff_sec=3)
finally:
producer.stop()
producer.wait()

seq_consumer = KgoVerifierSeqConsumer(self.test_context,
self.redpanda,
self.topic,
0,
nodes=self.preallocated_nodes)
seq_consumer.start(clean=False)
seq_consumer.wait()
self.redpanda.stop_node(self.redpanda.nodes[0])


class ShadowIndexingWhileBusyTest(PreallocNodesTest):
# With SI enabled, run common operations against a cluster
# while the system is under load (busy).
Expand Down
31 changes: 30 additions & 1 deletion tests/rptest/utils/si_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# Copyright 2023 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
import collections
import json
import pprint
Expand All @@ -8,7 +16,7 @@

from rptest.archival.s3_client import S3ObjectMetadata, S3Client
from rptest.clients.types import TopicSpec
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST
from rptest.services.redpanda import MetricsEndpoint, RESTART_LOG_ALLOW_LIST

EMPTY_SEGMENT_SIZE = 4096

Expand Down Expand Up @@ -292,6 +300,27 @@ def get_expected_ntp_restored_size(nodes_segments_report: dict[str,
return expected_restored_sizes


def nodes_report_cloud_segments(redpanda, target_segments):
"""
Returns true if the nodes in the cluster collectively report having
above the given number of segments.
NOTE: we're explicitly not checking the manifest via cloud client
because we expect the number of items in our bucket to be quite large,
and for associated ListObjects calls to take a long time.
"""
try:
num_segments = redpanda.metric_sum(
"redpanda_cloud_storage_segments",
metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS)
redpanda.logger.info(
f"Cluster metrics report {num_segments} / {target_segments} cloud segments"
)
except:
return False
return num_segments >= target_segments


def is_close_size(actual_size, expected_size):
"""Checks if the log size is close to expected size.
The actual size shouldn't be less than expected. Also, the difference
Expand Down

0 comments on commit adcce7a

Please sign in to comment.