Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: various space management non-functional changes #18314

Merged
55 changes: 25 additions & 30 deletions src/v/resource_mgmt/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,19 +278,10 @@ eviction_policy::collect_reclaimable_offsets() {
/*
* retention settings mirror settings found in housekeeping()
*/
const auto collection_threshold = [this] {
const auto& lm = _storage->local().log_mgr();
if (!lm.config().log_retention().has_value()) {
return model::timestamp(0);
}
const auto now = model::timestamp::now().value();
const auto retention = lm.config().log_retention().value().count();
return model::timestamp(now - retention);
};

const auto collection_ts
= _storage->local().log_mgr().lowest_ts_to_retain();
gc_config cfg(
collection_threshold(),
_storage->local().log_mgr().config().retention_bytes());
collection_ts, _storage->local().log_mgr().config().retention_bytes());

/*
* in smallish batches partitions are queried for their reclaimable
Expand All @@ -312,7 +303,7 @@ eviction_policy::collect_reclaimable_offsets() {
.handle_exception_type([](const ss::gate_closed_exception&) {})
.handle_exception([ntp = p->ntp()](std::exception_ptr e) {
vlog(
rlog.debug,
andrwng marked this conversation as resolved.
Show resolved Hide resolved
rlog.warn,
"Error collecting reclaimable offsets from {}: {}",
ntp,
e);
Expand Down Expand Up @@ -566,10 +557,10 @@ ss::future<> disk_space_manager::manage_data_disk(uint64_t target_size) {
* generally we may want to consider a smoother function as well as
* dynamically adjusting the control loop frequency.
*/
const auto target_excess = static_cast<uint64_t>(
const auto adjusted_target_excess = static_cast<uint64_t>(
real_target_excess
* config::shard_local_cfg().retention_local_trim_overage_coeff());
_probe.set_target_excess(target_excess);
_probe.set_target_excess(adjusted_target_excess);

/*
* when log storage has exceeded the target usage, then there are some knobs
Expand All @@ -589,43 +580,44 @@ ss::future<> disk_space_manager::manage_data_disk(uint64_t target_size) {
* targets for cloud-enabled topics, removing data that has been backed up
* into the cloud.
*/
if (target_excess > usage.reclaim.retention) {
if (adjusted_target_excess > usage.reclaim.retention) {
vlog(
rlog.info,
"Log storage usage {} > target size {} by {} (adjusted {}). Garbage "
"collection expected to recover {}. Overriding tiered storage "
"retention to recover {}. Total estimated available to recover {}",
"collection expected to remove {}. Space management of tiered "
"storage topics to remove {}. Total estimated available to remove "
"{}",
human::bytes(usage.usage.total()),
human::bytes(target_size),
human::bytes(real_target_excess),
human::bytes(target_excess),
human::bytes(adjusted_target_excess),
human::bytes(usage.reclaim.retention),
human::bytes(target_excess - usage.reclaim.retention),
human::bytes(adjusted_target_excess - usage.reclaim.retention),
human::bytes(usage.reclaim.available));

auto schedule = co_await _policy.create_new_schedule();
if (schedule.sched_size > 0) {
auto estimate = _policy.evict_until_local_retention(
schedule, target_excess);
schedule, adjusted_target_excess);
_probe.set_reclaim_local(estimate);

if (estimate < target_excess) {
if (estimate < adjusted_target_excess) {
const auto amount = _policy.evict_until_low_space_non_hinted(
schedule, target_excess - estimate);
schedule, adjusted_target_excess - estimate);
_probe.set_reclaim_low_non_hinted(amount);
estimate += amount;
}

if (estimate < target_excess) {
if (estimate < adjusted_target_excess) {
const auto amount = _policy.evict_until_low_space_hinted(
schedule, target_excess - estimate);
schedule, adjusted_target_excess - estimate);
_probe.set_reclaim_low_hinted(amount);
estimate += amount;
}

if (estimate < target_excess) {
if (estimate < adjusted_target_excess) {
const auto amount = _policy.evict_until_active_segment(
schedule, target_excess - estimate);
schedule, adjusted_target_excess - estimate);
_probe.set_reclaim_active_segment(amount);
estimate += amount;
}
Expand All @@ -644,17 +636,20 @@ ss::future<> disk_space_manager::manage_data_disk(uint64_t target_size) {
rlog.info, "Scheduling {} for reclaim", human::bytes(estimate));
co_await _policy.install_schedule(std::move(schedule));
} else {
vlog(rlog.info, "No partitions eligible for reclaim were found");
vlog(
rlog.info,
"No tiered storage partitions were found, unable to reclaim");
}
} else {
vlog(
rlog.info,
"Log storage usage {} > target size {} by {} (adjusted {}). Garbage "
"collection expected to recover {}.",
"collection expected to remove {}. No additional space management "
"required",
human::bytes(usage.usage.total()),
human::bytes(target_size),
human::bytes(real_target_excess),
human::bytes(target_excess),
human::bytes(adjusted_target_excess),
human::bytes(usage.reclaim.retention));
}

Expand Down
71 changes: 35 additions & 36 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,8 @@ bool disk_log_impl::has_local_retention_override() const {
return false;
}

gc_config disk_log_impl::maybe_override_retention_config(gc_config cfg) const {
gc_config
disk_log_impl::maybe_apply_local_storage_overrides(gc_config cfg) const {
// Read replica topics have a different default retention
if (config().is_read_replica_mode_enabled()) {
cfg.eviction_time = std::max(
Expand All @@ -1050,20 +1051,24 @@ gc_config disk_log_impl::maybe_override_retention_config(gc_config cfg) const {
/*
* don't override with local retention settings--let partition data expand
* up to standard retention settings.
* NOTE: both retention_local_strict and retention_local_strict_override
* need to be set to true to honor strict local retention. Otherwise, local
* retention is advisory.
*/
if (
andrwng marked this conversation as resolved.
Show resolved Hide resolved
!config::shard_local_cfg().retention_local_strict()
|| !config::shard_local_cfg().retention_local_strict_override()) {
bool strict_local_retention
= config::shard_local_cfg().retention_local_strict()
&& config::shard_local_cfg().retention_local_strict_override();
if (!strict_local_retention) {
vlog(
gclog.trace,
"[{}] Skipped retention override for topic with remote write "
"[{}] Skipped local retention override for topic with remote write "
"enabled: {}",
config().ntp(),
cfg);
return cfg;
}

cfg = override_retention_config(cfg);
cfg = apply_local_storage_overrides(cfg);

vlog(
gclog.trace,
Expand All @@ -1074,7 +1079,7 @@ gc_config disk_log_impl::maybe_override_retention_config(gc_config cfg) const {
return cfg;
}

gc_config disk_log_impl::override_retention_config(gc_config cfg) const {
gc_config disk_log_impl::apply_local_storage_overrides(gc_config cfg) const {
tristate<std::size_t> local_retention_bytes{std::nullopt};
tristate<std::chrono::milliseconds> local_retention_ms{std::nullopt};

Expand Down Expand Up @@ -1127,7 +1132,8 @@ bool disk_log_impl::is_cloud_retention_active() const {
/*
* applies overrides for non-cloud storage settings
*/
gc_config disk_log_impl::apply_base_overrides(gc_config defaults) const {
gc_config
disk_log_impl::apply_kafka_retention_overrides(gc_config defaults) const {
if (!config().has_overrides()) {
return defaults;
}
Expand Down Expand Up @@ -1161,8 +1167,8 @@ gc_config disk_log_impl::apply_base_overrides(gc_config defaults) const {
}

gc_config disk_log_impl::apply_overrides(gc_config defaults) const {
auto ret = apply_base_overrides(defaults);
return maybe_override_retention_config(ret);
auto ret = apply_kafka_retention_overrides(defaults);
return maybe_apply_local_storage_overrides(ret);
}

ss::future<> disk_log_impl::housekeeping(housekeeping_config cfg) {
Expand Down Expand Up @@ -1311,10 +1317,18 @@ ss::future<std::optional<model::offset>> disk_log_impl::do_gc(gc_config cfg) {
co_return std::nullopt;
}

ss::future<> disk_log_impl::retention_adjust_timestamps(
std::chrono::seconds ignore_in_future) {
ss::future<> disk_log_impl::maybe_adjust_retention_timestamps() {
// Correct any timestamps too far in the future, meant to be called before
// calculating the retention offset for garbage collection.
// It's expected that this will be used only for segments pre v23.3,
// without a proper broker_timestamps
auto ignore_in_future
= config::shard_local_cfg().storage_ignore_timestamps_in_future_sec();
if (!ignore_in_future.has_value()) {
co_return;
}
auto ignore_threshold = model::timestamp(
model::timestamp::now().value() + ignore_in_future / 1ms);
model::timestamp::now().value() + ignore_in_future.value() / 1ms);

auto retention_cfg = time_based_retention_cfg::make(
_feature_table.local()); // this will retrieve cluster cfgs
Expand Down Expand Up @@ -1381,17 +1395,7 @@ ss::future<> disk_log_impl::retention_adjust_timestamps(

ss::future<std::optional<model::offset>>
disk_log_impl::maybe_adjusted_retention_offset(gc_config cfg) {
auto ignore_timestamps_in_future
= config::shard_local_cfg().storage_ignore_timestamps_in_future_sec();
if (ignore_timestamps_in_future.has_value()) {
// Correct any timestamps too far in future, before calculating the
// retention offset.
// It's expected that this will be used only for segments pre v23.3,
// without a proper broker_timestamps
co_await retention_adjust_timestamps(
ignore_timestamps_in_future.value());
}

co_await maybe_adjust_retention_timestamps();
co_return retention_offset(cfg);
}

Expand Down Expand Up @@ -3044,8 +3048,8 @@ disk_log_impl::disk_usage_and_reclaimable_space(gc_config input_cfg) {
* offset calculation for local retention reclaimable is different than the
* retention above and takes into account local retention advisory flag.
*/
auto local_retention_cfg = apply_base_overrides(input_cfg);
local_retention_cfg = override_retention_config(local_retention_cfg);
auto local_retention_cfg = apply_kafka_retention_overrides(input_cfg);
local_retention_cfg = apply_local_storage_overrides(local_retention_cfg);
const auto local_retention_offset
= co_await maybe_adjusted_retention_offset(local_retention_cfg);

Expand Down Expand Up @@ -3197,7 +3201,7 @@ disk_log_impl::disk_usage_target(gc_config cfg, usage usage) {
= config::shard_local_cfg().storage_reserve_min_segments()
* max_segment_size();

cfg = apply_base_overrides(cfg);
cfg = apply_kafka_retention_overrides(cfg);

/*
* compacted topics are always stored whole on local storage such that local
Expand Down Expand Up @@ -3226,7 +3230,7 @@ disk_log_impl::disk_usage_target(gc_config cfg, usage usage) {
*/
} else {
// applies local retention overrides for cloud storage
cfg = maybe_override_retention_config(cfg);
cfg = maybe_apply_local_storage_overrides(cfg);
}

/*
Expand Down Expand Up @@ -3282,12 +3286,7 @@ disk_log_impl::disk_usage_target_time_retention(gc_config cfg) {
* take the opportunity to maybe fixup janky weird timestamps. also done in
* normal garbage collection path and should be idempotent.
*/
if (auto ignore_timestamps_in_future
= config::shard_local_cfg().storage_ignore_timestamps_in_future_sec();
ignore_timestamps_in_future.has_value()) {
co_await retention_adjust_timestamps(
ignore_timestamps_in_future.value());
}
co_await maybe_adjust_retention_timestamps();

/*
* we are going to use whole segments for the data we'll use to extrapolate
Expand Down Expand Up @@ -3506,8 +3505,8 @@ disk_log_impl::get_reclaimable_offsets(gc_config cfg) {
* override in contrast to housekeeping GC where the overrides are applied
* only when local retention is non-advisory.
*/
cfg = apply_base_overrides(cfg);
cfg = override_retention_config(cfg);
cfg = apply_kafka_retention_overrides(cfg);
cfg = apply_local_storage_overrides(cfg);
const auto local_retention_offset
= co_await maybe_adjusted_retention_offset(cfg);

Expand Down
9 changes: 4 additions & 5 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,20 +277,19 @@ class disk_log_impl final : public log {

/// Conditionally adjust retention timestamp on any segment that appears
/// to have invalid timestamps, to ensure retention can proceed.
ss::future<>
retention_adjust_timestamps(std::chrono::seconds ignore_in_future);
ss::future<> maybe_adjust_retention_timestamps();

gc_config apply_overrides(gc_config) const;
gc_config apply_base_overrides(gc_config) const;
gc_config apply_kafka_retention_overrides(gc_config) const;

void wrote_stm_bytes(size_t);

// returns true if this partition's local retention configuration has
// overrides, such as custom topic configs.
bool has_local_retention_override() const;

gc_config maybe_override_retention_config(gc_config) const;
gc_config override_retention_config(gc_config) const;
gc_config maybe_apply_local_storage_overrides(gc_config) const;
gc_config apply_local_storage_overrides(gc_config) const;

bool is_cloud_retention_active() const;

Expand Down
24 changes: 12 additions & 12 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,19 +310,19 @@ ss::future<> log_manager::housekeeping() {
}
}

model::timestamp log_manager::lowest_ts_to_retain() const {
if (!_config.log_retention().has_value()) {
return model::timestamp(0);
}
const auto now = model::timestamp::now().value();
const auto retention = _config.log_retention().value().count();
return model::timestamp(now - retention);
}

ss::future<> log_manager::housekeeping_loop() {
/*
* data older than this threshold may be garbage collected
*/
const auto collection_threshold = [this] {
if (!_config.log_retention().has_value()) {
return model::timestamp(0);
}
const auto now = model::timestamp::now().value();
const auto retention = _config.log_retention().value().count();
return model::timestamp(now - retention);
};

while (true) {
try {
const auto prev_jitter_base = _jitter.base_duration();
Expand Down Expand Up @@ -383,7 +383,7 @@ ss::future<> log_manager::housekeeping_loop() {

auto ntp = log_meta.handle->config().ntp();
auto usage = co_await log_meta.handle->disk_usage(
gc_config(collection_threshold(), _config.retention_bytes()));
gc_config(lowest_ts_to_retain(), _config.retention_bytes()));

/*
* NOTE: this estimate is for local retention policy only. for a
Expand All @@ -405,7 +405,7 @@ ss::future<> log_manager::housekeeping_loop() {
continue;
}
co_await log->gc(
gc_config(collection_threshold(), _config.retention_bytes()));
gc_config(lowest_ts_to_retain(), _config.retention_bytes()));
}
}

Expand All @@ -432,7 +432,7 @@ ss::future<> log_manager::housekeeping_loop() {
auto prev_sg = co_await ss::coroutine::switch_to(_config.compaction_sg);

try {
co_await housekeeping_scan(collection_threshold());
co_await housekeeping_scan(lowest_ts_to_retain());
} catch (const std::exception& e) {
vlog(stlog.info, "Error processing housekeeping(): {}", e);
}
Expand Down
4 changes: 4 additions & 0 deletions src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ class log_manager {
/// Returns all ntp's managed by this instance
absl::flat_hash_set<model::ntp> get_all_ntps() const;

// Returns the timestamp that should be retained via log.retention.ms, or 0
// if not configured.
model::timestamp lowest_ts_to_retain() const;

int64_t compaction_backlog() const;

storage_resources& resources() { return _resources; }
Expand Down
Loading