Skip to content

Commit

Permalink
Merge pull request #14958 from vbotbuildovich/backport-pr-14957-v23.2…
Browse files Browse the repository at this point in the history
….x-137

[v23.2.x] cloud_storage: Log client address in batch parser
  • Loading branch information
piyushredpanda authored Nov 14, 2023
2 parents 315cb04 + f9e0016 commit b049f55
Showing 1 changed file with 29 additions and 10 deletions.
39 changes: 29 additions & 10 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1134,14 +1134,17 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
const model::record_batch_header& header) const override {
vlog(
_ctxlog.trace,
"accept_batch_start {}, current delta: {}",
"[{}] accept_batch_start {}, current delta: {}",
_config.client_address,
header,
_parent._cur_delta);

if (rp_to_kafka(header.base_offset) > _config.max_offset) {
vlog(
_ctxlog.debug,
"accept_batch_start stop parser because {} > {}(kafka offset)",
"[{}] accept_batch_start stop parser because {} > {}(kafka "
"offset)",
_config.client_address,
header.base_offset(),
_config.max_offset);
return batch_consumer::consume_result::stop_parser;
Expand All @@ -1153,7 +1156,8 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
if (model::record_batch_type::raft_data != header.type) {
vlog(
_ctxlog.debug,
"accept_batch_start skip because record batch type is {}",
"[{}] accept_batch_start skip because record batch type is {}",
_config.client_address,
header.type);
return batch_consumer::consume_result::skip_batch;
}
Expand All @@ -1164,9 +1168,10 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
rp_to_kafka(header.last_offset()) < _config.start_offset)) {
vlog(
_ctxlog.debug,
"accept_batch_start skip because "
"[{}] accept_batch_start skip because "
"last_kafka_offset {} (last_rp_offset: {}) < "
"config.start_offset: {}",
_config.client_address,
rp_to_kafka(header.last_offset()),
header.last_offset(),
_config.start_offset);
Expand All @@ -1176,15 +1181,19 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
if (
(_config.strict_max_bytes || _config.bytes_consumed)
&& (_config.bytes_consumed + header.size_bytes) > _config.max_bytes) {
vlog(_ctxlog.debug, "accept_batch_start stop because overbudget");
vlog(
_ctxlog.debug,
"[{}] accept_batch_start stop because overbudget",
_config.client_address);
_config.over_budget = true;
return batch_consumer::consume_result::stop_parser;
}

if (_config.first_timestamp > header.max_timestamp) {
vlog(
_ctxlog.debug,
"accept_batch_start skip because header timestamp is {}",
"[{}] accept_batch_start skip because header timestamp is {}",
_config.client_address,
header.first_timestamp);
return batch_consumer::consume_result::skip_batch;
}
Expand All @@ -1199,7 +1208,8 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
size_t /*size_on_disk*/) override {
vlog(
_ctxlog.trace,
"consume_batch_start called for {}",
"[{}] consume_batch_start called for {}",
_config.client_address,
header.base_offset);
_header = header;
_header.ctx.term = _term;
Expand All @@ -1214,7 +1224,10 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
// changing the _cur_delta. The _cur_delta that is be used for current
// record batch can only account record batches in all previous batches.
vlog(
_ctxlog.debug, "skip_batch_start called for {}", header.base_offset);
_ctxlog.debug,
"[{}] skip_batch_start called for {}",
_config.client_address,
header.base_offset);
advance_config_offsets(header);
if (
std::count(
Expand Down Expand Up @@ -1407,10 +1420,16 @@ ss::future<> remote_segment_batch_reader::stop() {
co_return;
}

vlog(_ctxlog.debug, "remote_segment_batch_reader::stop");
vlog(
_ctxlog.debug,
"[{}] remote_segment_batch_reader::stop",
_config.client_address);
co_await _gate.close();
if (_parser) {
vlog(_ctxlog.debug, "remote_segment_batch_reader::stop - parser-close");
vlog(
_ctxlog.debug,
"[{}] remote_segment_batch_reader::stop - parser-close",
_config.client_address);
co_await _parser->close();
_parser.reset();
}
Expand Down

0 comments on commit b049f55

Please sign in to comment.