diff --git a/src/v/cloud_storage/remote_segment.cc b/src/v/cloud_storage/remote_segment.cc index a352b4c518a5..e64d1264180a 100644 --- a/src/v/cloud_storage/remote_segment.cc +++ b/src/v/cloud_storage/remote_segment.cc @@ -1134,14 +1134,17 @@ class remote_segment_batch_consumer : public storage::batch_consumer { const model::record_batch_header& header) const override { vlog( _ctxlog.trace, - "accept_batch_start {}, current delta: {}", + "[{}] accept_batch_start {}, current delta: {}", + _config.client_address, header, _parent._cur_delta); if (rp_to_kafka(header.base_offset) > _config.max_offset) { vlog( _ctxlog.debug, - "accept_batch_start stop parser because {} > {}(kafka offset)", + "[{}] accept_batch_start stop parser because {} > {}(kafka " + "offset)", + _config.client_address, header.base_offset(), _config.max_offset); return batch_consumer::consume_result::stop_parser; @@ -1153,7 +1156,8 @@ class remote_segment_batch_consumer : public storage::batch_consumer { if (model::record_batch_type::raft_data != header.type) { vlog( _ctxlog.debug, - "accept_batch_start skip because record batch type is {}", + "[{}] accept_batch_start skip because record batch type is {}", + _config.client_address, header.type); return batch_consumer::consume_result::skip_batch; } @@ -1164,9 +1168,10 @@ class remote_segment_batch_consumer : public storage::batch_consumer { rp_to_kafka(header.last_offset()) < _config.start_offset)) { vlog( _ctxlog.debug, - "accept_batch_start skip because " + "[{}] accept_batch_start skip because " "last_kafka_offset {} (last_rp_offset: {}) < " "config.start_offset: {}", + _config.client_address, rp_to_kafka(header.last_offset()), header.last_offset(), _config.start_offset); @@ -1176,7 +1181,10 @@ class remote_segment_batch_consumer : public storage::batch_consumer { if ( (_config.strict_max_bytes || _config.bytes_consumed) && (_config.bytes_consumed + header.size_bytes) > _config.max_bytes) { - vlog(_ctxlog.debug, "accept_batch_start stop because overbudget"); + vlog( + _ctxlog.debug, + "[{}] accept_batch_start stop because overbudget", + _config.client_address); _config.over_budget = true; return batch_consumer::consume_result::stop_parser; } @@ -1184,7 +1192,8 @@ class remote_segment_batch_consumer : public storage::batch_consumer { if (_config.first_timestamp > header.max_timestamp) { vlog( _ctxlog.debug, - "accept_batch_start skip because header timestamp is {}", + "[{}] accept_batch_start skip because header timestamp is {}", + _config.client_address, header.first_timestamp); return batch_consumer::consume_result::skip_batch; } @@ -1199,7 +1208,8 @@ class remote_segment_batch_consumer : public storage::batch_consumer { size_t /*size_on_disk*/) override { vlog( _ctxlog.trace, - "consume_batch_start called for {}", + "[{}] consume_batch_start called for {}", + _config.client_address, header.base_offset); _header = header; _header.ctx.term = _term; @@ -1214,7 +1224,10 @@ class remote_segment_batch_consumer : public storage::batch_consumer { // changing the _cur_delta. The _cur_delta that is be used for current // record batch can only account record batches in all previous batches. vlog( - _ctxlog.debug, "skip_batch_start called for {}", header.base_offset); + _ctxlog.debug, + "[{}] skip_batch_start called for {}", + _config.client_address, + header.base_offset); advance_config_offsets(header); if ( std::count( @@ -1407,10 +1420,16 @@ ss::future<> remote_segment_batch_reader::stop() { co_return; } - vlog(_ctxlog.debug, "remote_segment_batch_reader::stop"); + vlog( + _ctxlog.debug, + "[{}] remote_segment_batch_reader::stop", + _config.client_address); co_await _gate.close(); if (_parser) { - vlog(_ctxlog.debug, "remote_segment_batch_reader::stop - parser-close"); + vlog( + _ctxlog.debug, + "[{}] remote_segment_batch_reader::stop - parser-close", + _config.client_address); co_await _parser->close(); _parser.reset(); }