Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize replicated_partition::validate_fetch_offset #19161

Merged
merged 3 commits into from
Jul 15, 2024

Conversation

ballard26
Copy link
Contributor

@ballard26 ballard26 commented Jun 9, 2024

In the common fetch path where Redpanda is reading batches from the batch cache validate_fetch_offset can consume ~33%(or 3.8% of overall reactor utilization) of the total time spent reading from a given NTP as seen in;
image

After applying the changes in this PR this is reduced to ~9%(0.9% of overall reactor utilization);
image

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.1.x
  • v23.3.x
  • v23.2.x

Release Notes

  • none

@ballard26 ballard26 changed the title [WIP] Optimize replicated_partition::validate_fetch_offset Optimize replicated_partition::validate_fetch_offset Jun 20, 2024
@ballard26 ballard26 marked this pull request as ready for review June 20, 2024 04:12
@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Jun 20, 2024

new failures in https://buildkite.com/redpanda/redpanda/builds/50476#01903418-7120-4b30-b2fc-11e6c48e8ccc:

"rptest.tests.compatibility.arroyo_test.ArroyoTest.test_arroyo_test_suite"

new failures in https://buildkite.com/redpanda/redpanda/builds/50476#01903418-7122-49e6-8663-1ae0fd2b8bd8:

"rptest.tests.read_replica_e2e_test.TestReadReplicaService.test_identical_lwms_after_delete_records.partition_count=5.cloud_storage_type=CloudStorageType.S3"

new failures in https://buildkite.com/redpanda/redpanda/builds/50476#01903418-7125-4db8-9513-f870f193b115:

"rptest.tests.simple_e2e_test.SimpleEndToEndTest.test_consumer_interruption"
"rptest.tests.delete_records_test.DeleteRecordsTest.test_delete_records_topic_start_delta.cloud_storage_enabled=True"

new failures in https://buildkite.com/redpanda/redpanda/builds/50476#01903418-7124-4acc-9ca8-75fcf4bc962f:

"rptest.tests.data_transforms_test.DataTransformsLoggingMetricsTest.test_manager_metrics_values"

new failures in https://buildkite.com/redpanda/redpanda/builds/50476#01903430-a59d-40a9-a552-7453cdffe574:

"rptest.tests.read_replica_e2e_test.TestReadReplicaService.test_identical_lwms_after_delete_records.partition_count=5.cloud_storage_type=CloudStorageType.S3"
"rptest.tests.data_transforms_test.DataTransformsLoggingMetricsTest.test_manager_metrics_values"

new failures in https://buildkite.com/redpanda/redpanda/builds/50476#01903430-a59b-4401-9e3d-75ece1181001:

"rptest.tests.simple_e2e_test.SimpleEndToEndTest.test_consumer_interruption"
"rptest.tests.compatibility.arroyo_test.ArroyoTest.test_arroyo_test_suite"

new failures in https://buildkite.com/redpanda/redpanda/builds/50476#01903430-a59e-496a-a227-3386af8c544f:

"rptest.tests.delete_records_test.DeleteRecordsTest.test_delete_records_topic_start_delta.cloud_storage_enabled=True"

Comment on lines 1256 to 1259
// - The topic has been remotely recovered and the log_eviction_stm won't
// have any state.
// - The start offset override hasn't been set and the log_eviction_stm is
// returning model::offset{} to indicate that.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: or this is a read replica

Comment on lines 382 to 383
std::optional<model::offset> _cached_start_offset_raft,
_cached_start_offset_kafka;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please put these on separate lines

// The eviction STM only keeps track of DeleteRecords truncations
// as Raft offsets. Translate if possible.
if (
offset_res.value() != model::offset{}
offset_override != model::offset{}
&& _raft->start_offset() < offset_res.value()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use offset_override here too?

Comment on lines 1270 to 1276
// For the first case if we've read the start_offset_override from the
// archival_meta_stm then it won't change until the log_eviction_stm has
// local state once more. Hence we can just return the offset we read
// without re-syncing.
if (_cached_start_offset_kafka && offset_override == model::offset{}) {
co_return *_cached_start_offset_kafka;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems kind of off -- can't we get here without any syncing at all e.g. for read replicas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, forgot to account for read replicas initially. In update I just pushed I have it so we're always syncing the archival stm and getting the start offset override from it afterwards for read replicas.

Comment on lines 1232 to 1256
// Check if the translation has been cached.
if (
_cached_start_offset_raft
&& offset_override == *_cached_start_offset_raft) {
co_return *_cached_start_offset_kafka;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this happen regardless of raft->start_offset() < offset_res.value()? Then I don't think we'd need the check at L1281?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, removed the duplicate check in the updated version.

Comment on lines 1213 to 1244
ss::future<result<model::offset, std::error_code>>
partition::sync_kafka_start_offset_override(
model::timeout_clock::duration timeout) {
model::offset offset_override;

if (_log_eviction_stm && !is_read_replica_mode_enabled()) {
auto offset_res
= co_await _log_eviction_stm->sync_start_offset_override(timeout);
if (offset_res.has_failure()) {
co_return offset_res.as_failure();
}

offset_override = offset_res.value();

Copy link
Contributor

@andrwng andrwng Jun 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried taking a step back to understand how this looks / should look with these new cached variables. I think this is roughly what we have here, but there are some nuances. It seems like the cached values are basically caching the translation, but in a potentially imperfect way.

raft_start_override = co_await sync_log_eviction_stm()
if raft_start_override.has_value():
    if raft_start_override == raft_start_cache:
        # Easy case, we've translated before.
        return kafka_start_cache

    # We haven't translated before, but data is still in local log.
    if we can translate:
        translated_override = translate(raft_start_override)
        raft_start_cache = raft_start_override
        kafka_start_cache = translated_override

    # Can't translate because a race has likely occurred: start override has
    # fallen out of local log, and there is no cached value to fall back on.
    # Fall through.

# We may or may not have already synced above, but we sync again to ensure the
# value we get from archival is up-to-date.
kafka_start_override = co_await sync_archival_stm()

if raft_start_override.has_value() && kafka_start_override.has_value():
    # Note that even if there is an override from the log_eviction_stm AND from
    # the archival_stm, they don't necessarily correspond to the same record
    # (e.g. consider a race where DeleteRecords comes in after the first sync).
    #
    # If this is the case though, subsequent calls will resync the log eviction
    # stm and not use this cached value.
    raft_start_cache = raft_start_override
    kafka_start_cache = kafka_start_override

return kafka_start_override

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lmk if this is more or less what you're thinking here. If so, maybe we can incorporate the structure or comments into the implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've hopefully simplified the caching logic and added clearer comments in the updated version as per our conversations on Slack. Let me know if they're still lacking though.

@ballard26 ballard26 force-pushed the fetch-opt branch 3 times, most recently from 57dbe14 to 59ee771 Compare June 27, 2024 03:05
@ballard26 ballard26 requested a review from andrwng June 27, 2024 05:38
src/v/cluster/partition.cc Show resolved Hide resolved
Comment on lines 1270 to 1272
auto start_kafka_offset
= _archival_meta_stm->manifest().get_start_kafka_offset_override();
if (start_kafka_offset != kafka::offset{}) {
co_return kafka::offset_cast(start_kafka_offset);
}
_cached_kafka_start_override = kafka::offset_cast(start_kafka_offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this mean that we'll never do the archival sync after we've cached a start override?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the hope.

The log eviction stm is now caching the kafka offset in every prefix truncate batch that is being applied against it; even if they don't land in local storage. So the only cases where we'd need to get the offset override from the archival stm are where there hasn't been any prefix truncate batches applied to the log eviction stm since the RP broker started. In these cases we only need to sync the archival stm once to get the most up-to-date prefix truncation from cloud storage. And then after that if any additional prefix truncate batches come in the log eviction stm will be aware of them.

src/v/cluster/partition.cc Show resolved Hide resolved
Comment on lines 1259 to 1265
// Once it has been sync'd we can cache the override value as it won't
// change until `_log_eviction_stm->cached_kafka_start_offset()` does.
if (!_cached_kafka_start_override || !_log_eviction_stm) {
auto term = _raft->term();
if (!co_await _archival_meta_stm->sync(timeout)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: may be clearer as some

if (_cached_kafka_start_override.has_value() && _log_eviction_stm) {
    return *_cached_kafka_start_override;
}
auto term = _raft->term();
...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm thinking it might make sense for us to early return very early, if !_log_eviction_stm, in which case we can omit these other checks for _log_eviction_stm.

It's probably worth checking if _log_eviction_stm can ever be removed -- I don't think it can, but maybe someone from the replication team will have a clearer understanding, since that code has changed in the last few months

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently there are a couple ntps that don't have a _log_eviction_stm. The controller, the consumer offsets, tx manager... Luckily none of these are ktps and none are consumable in the normal kafka manner. So for any partitions we're concerned about here the log eviction stm will always exist. I think I should still check for its existence though as I think it makes things clearer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Luckily none of these are ktps and none are consumable in the normal kafka manner

__consumer_offets is a kafka partition and AFAIK is consumable in the normal kafka way? IIRC this was a change we made a couple of years ago: it used to be in the Redpanda namespace and hence invisible to Kafka for for compatibility we moved it into kafka namespace with the same name as Apache Kafka so it would work in the same way for anyone who was accessing it directly (Enterprise team would probably have the most context here).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, then lets assume there are kafka consumable partitions without the log eviction stm(like in consumer offsets as you've mentioned). Then there are two paths the function can take;

Either !_archival_meta_stm in which case it returns model::offset{} which is correct since there can be no overridden offset without the log eviction stm.
Or _archival_meta_stm in which case it'll sync the archival stm once then return _archival_meta_stm->manifest().get_start_kafka_offset_override() for every call afterwards. The result of this function should be model::offset{} as well since we shouldn't technically be able to apply delete record batches to partitions without the log eviction stm

Either way the function does return the correct value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG.

@andrwng any other concerns on this thread of discussion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea this looks good. Thanks for the ping Travis, and Brandon for updating!

src/v/cluster/partition.cc Show resolved Hide resolved
This allows the log_eviction_stm to know what the current kafka start
offset override is even if it doesn't reside in the local log. However,
since this cached value is not snapshotted by the stm it may not be
recovered when repanda restarts. In this case one would need to fallback
on the archival stm to find the kafka start offset override.
The most common path for validate_fetch_offset results in a number of
short lived coroutines. In these cases the allocation/deallocation for
the coroutine's frame ends up dominating the runtime for the function.

This commit removes the coroutines in favor for then chains which can
avoid the allocation if the task quota hasn't been met.
@travisdowns
Copy link
Member

/ci-repeat 1

@ballard26
Copy link
Contributor Author

The failure in the latest CI run was a known issue #20570

@mmaslankaprv
Copy link
Member

this looks good to me, i am wondering do we have a test where we do delete records with an offset that is in the range included only in cloud ?

@andrwng
Copy link
Contributor

andrwng commented Jul 11, 2024

this looks good to me, i am wondering do we have a test where we do delete records with an offset that is in the range included only in cloud ?

Looking around actually I had a hard time finding one that does this explicitly. It should be fairly easy to add one to cloud_storage/tests/cloud_storage_e2e_test.cc or cloud_storage/tests/delete_records_e2e_test.cc (both have full access to the partition, and we can easily force an aggressive local housekeeping run before deleting)

Copy link
Contributor

@andrwng andrwng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes LGTM, but good callout from Michal about ensuring test coverage for cloud-only deletes. I'm okay with that going in separately, but also happy to wait on such a test to merge (and help if needed)

@ballard26
Copy link
Contributor Author

this looks good to me, i am wondering do we have a test where we do delete records with an offset that is in the range included only in cloud ?

Looking around actually I had a hard time finding one that does this explicitly. It should be fairly easy to add one to cloud_storage/tests/cloud_storage_e2e_test.cc or cloud_storage/tests/delete_records_e2e_test.cc (both have full access to the partition, and we can easily force an aggressive local housekeeping run before deleting)

I will add a test to do that today. I think that should resolve the last blocker for this PR.

@ballard26
Copy link
Contributor Author

Created a new PR that adds the discussed test #21382 .

@piyushredpanda
Copy link
Contributor

Thanks, @ballard26; merging this one then.

@piyushredpanda piyushredpanda merged commit a4e54a3 into redpanda-data:dev Jul 15, 2024
15 of 18 checks passed
@vbotbuildovich
Copy link
Collaborator

/backport v24.1.x

@vbotbuildovich
Copy link
Collaborator

/backport v23.3.x

@vbotbuildovich
Copy link
Collaborator

/backport v23.2.x

@vbotbuildovich
Copy link
Collaborator

Failed to create a backport PR to v23.3.x branch. I tried:

git remote add upstream https://github.com/redpanda-data/redpanda.git
git fetch --all
git checkout -b backport-pr-19161-v23.3.x-402 remotes/upstream/v23.3.x
git cherry-pick -x feeec639f2d52379d33100c505682113deb03817 3ba042d4ee19e1329c8faedd336d65d483c3b462 e96451ad86d860097a7f1928ac3445f411459318

Workflow run logs.

@vbotbuildovich
Copy link
Collaborator

Failed to create a backport PR to v23.2.x branch. I tried:

git remote add upstream https://github.com/redpanda-data/redpanda.git
git fetch --all
git checkout -b backport-pr-19161-v23.2.x-732 remotes/upstream/v23.2.x
git cherry-pick -x feeec639f2d52379d33100c505682113deb03817 3ba042d4ee19e1329c8faedd336d65d483c3b462 e96451ad86d860097a7f1928ac3445f411459318

Workflow run logs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants