Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v22.3.x] cloud_storage: parallelize remote_segment stop #9239

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/v/cloud_storage/materialized_segments.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "ssx/future-util.h"
#include "vlog.h"

#include <seastar/core/loop.hh>

#include <absl/container/btree_map.h>

#include <chrono>
Expand Down Expand Up @@ -144,12 +146,12 @@ ss::future<> materialized_segments::run_eviction_loop() {
while (true) {
co_await _cvar.wait([this] { return !_eviction_pending.empty(); });
_eviction_in_flight = std::exchange(_eviction_pending, {});
while (!_eviction_in_flight.empty()) {
co_await std::visit(
[](auto&& rs) { return rs->stop(); },
_eviction_in_flight.front());
_eviction_in_flight.pop_front();
}
co_await ss::max_concurrent_for_each(
_eviction_in_flight, 1024, [](auto&& rs_variant) {
return std::visit(
[](auto&& rs) { return rs->stop(); }, rs_variant);
});
_eviction_in_flight.clear();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_storage/materialized_segments.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,4 @@ class materialized_segments {
friend class remote_probe;
};

} // namespace cloud_storage
} // namespace cloud_storage
14 changes: 9 additions & 5 deletions src/v/cloud_storage/remote_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,15 @@ ss::future<> remote_partition::stop() {
// segments are being stopped.
decltype(_segments) segments_to_stop;
segments_to_stop.swap(_segments);
for (auto& [offset, segment] : segments_to_stop) {
vlog(
_ctxlog.debug, "remote partition stop {}", segment->base_rp_offset());
co_await segment->stop();
}
co_await ss::max_concurrent_for_each(
segments_to_stop, 200, [this](auto& iter) {
auto& [offset, segment] = iter;
vlog(
_ctxlog.debug,
"remote partition stop {}",
segment->base_rp_offset());
return segment->stop();
});

// We may have some segment or reader objects enqueued for stop in
// the shared eviction queue: must flush it, or they can outlive
Expand Down
72 changes: 70 additions & 2 deletions tests/rptest/tests/e2e_shadow_indexing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from rptest.clients.types import TopicSpec
from rptest.services.action_injector import random_process_kills, ActionConfig
from rptest.services.cluster import cluster
from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierRandomConsumer
from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierRandomConsumer, KgoVerifierSeqConsumer
from rptest.services.redpanda import RedpandaService, CHAOS_LOG_ALLOW_LIST
from rptest.services.redpanda import SISettings
from rptest.tests.end_to_end import EndToEndTest
Expand All @@ -29,7 +29,7 @@
produce_until_segments,
wait_for_removal_of_n_segments,
)
from rptest.utils.si_utils import S3Snapshot
from rptest.utils.si_utils import nodes_report_cloud_segments, S3Snapshot
from rptest.utils.mode_checks import skip_debug_mode


Expand Down Expand Up @@ -359,6 +359,74 @@ def manifest_has_segments():
assert "0-1-v1.log" in manifest["segments"], manifest


class ShadowIndexingManyPartitionsTest(PreallocNodesTest):
small_segment_size = 4096
topic_name = f"{EndToEndShadowIndexingBase.s3_topic_name}"
topics = (TopicSpec(name=topic_name,
partition_count=128,
replication_factor=1,
redpanda_remote_write=True,
redpanda_remote_read=True,
retention_bytes=-1,
retention_ms=-1,
segment_bytes=small_segment_size), )

def __init__(self, test_context):
self.num_brokers = 1
si_settings = SISettings(
log_segment_size=self.small_segment_size,
cloud_storage_cache_size=20 * 2**30,
cloud_storage_segment_max_upload_interval_sec=1)
super().__init__(test_context,
node_prealloc_count=1,
extra_rp_conf={
'log_segment_size_min': 1024,
},
si_settings=si_settings)
self.kafka_tools = KafkaCliTools(self.redpanda)

def setUp(self):
self.redpanda.start()
for topic in self.topics:
self.kafka_tools.create_topic(topic)
rpk = RpkTool(self.redpanda)
rpk.alter_topic_config(self.topic,
TopicSpec.PROPERTY_RETENTION_LOCAL_TARGET_MS,
'1000')

@skip_debug_mode
@cluster(num_nodes=2)
def test_many_partitions_shutdown(self):
"""
Test that reproduces a slow shutdown when many partitions each with
many hydrated segments get shut down.
"""
producer = KgoVerifierProducer(self.test_context,
self.redpanda,
self.topic,
msg_size=1024,
msg_count=1000 * 1000,
custom_node=self.preallocated_nodes)
producer.start()
try:
wait_until(
lambda: nodes_report_cloud_segments(self.redpanda, 128 * 200),
timeout_sec=120,
backoff_sec=3)
finally:
producer.stop()
producer.wait()

seq_consumer = KgoVerifierSeqConsumer(self.test_context,
self.redpanda,
self.topic,
0,
nodes=self.preallocated_nodes)
seq_consumer.start(clean=False)
seq_consumer.wait()
self.redpanda.stop_node(self.redpanda.nodes[0])


class ShadowIndexingWhileBusyTest(PreallocNodesTest):
# With SI enabled, run common operations against a cluster
# while the system is under load (busy).
Expand Down
31 changes: 30 additions & 1 deletion tests/rptest/utils/si_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# Copyright 2023 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
import collections
import json
import pprint
Expand All @@ -8,7 +16,7 @@

from rptest.archival.s3_client import S3ObjectMetadata, S3Client
from rptest.clients.types import TopicSpec
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST
from rptest.services.redpanda import MetricsEndpoint, RESTART_LOG_ALLOW_LIST

EMPTY_SEGMENT_SIZE = 4096

Expand Down Expand Up @@ -292,6 +300,27 @@ def get_expected_ntp_restored_size(nodes_segments_report: dict[str,
return expected_restored_sizes


def nodes_report_cloud_segments(redpanda, target_segments):
"""
Returns true if the nodes in the cluster collectively report having
above the given number of segments.

NOTE: we're explicitly not checking the manifest via cloud client
because we expect the number of items in our bucket to be quite large,
and for associated ListObjects calls to take a long time.
"""
try:
num_segments = redpanda.metric_sum(
"redpanda_cloud_storage_segments",
metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS)
redpanda.logger.info(
f"Cluster metrics report {num_segments} / {target_segments} cloud segments"
)
except:
return False
return num_segments >= target_segments


def is_close_size(actual_size, expected_size):
"""Checks if the log size is close to expected size.
The actual size shouldn't be less than expected. Also, the difference
Expand Down