Skip to content

Commit

Permalink
storage: Added synchronous record_batch decompress
Browse files Browse the repository at this point in the history
Added calls that will decompress record_batch's synchronously
  • Loading branch information
michael-redpanda committed Apr 18, 2023
1 parent a33675e commit 1e9968e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
26 changes: 17 additions & 9 deletions src/v/storage/parser_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,27 @@ model::record_batch_reader decompress_batch_consumer::end_of_stream() {
}

ss::future<model::record_batch> decompress_batch(model::record_batch&& b) {
return ss::futurize_invoke(decompress_batch_sync, std::move(b));
}

ss::future<model::record_batch> decompress_batch(const model::record_batch& b) {
return ss::futurize_invoke(maybe_decompress_batch_sync, b);
}

model::record_batch decompress_batch_sync(model::record_batch&& b) {
if (!b.compressed()) {
return ss::make_ready_future<model::record_batch>(std::move(b));
return std::move(b);
}
return decompress_batch(b);

return maybe_decompress_batch_sync(b);
}

ss::future<model::record_batch> decompress_batch(const model::record_batch& b) {
model::record_batch maybe_decompress_batch_sync(const model::record_batch& b) {
if (unlikely(!b.compressed())) {
return ss::make_exception_future<model::record_batch>(
std::runtime_error(fmt_with_ctx(
fmt::format,
"Asked to decompressed a non-compressed batch:{}",
b.header())));
throw std::runtime_error(fmt_with_ctx(
fmt::format,
"Asked to decompressed a non-compressed batch:{}",
b.header()));
}
iobuf body_buf = compression::compressor::uncompress(
b.data(), b.header().attrs.compression());
Expand All @@ -56,7 +64,7 @@ ss::future<model::record_batch> decompress_batch(const model::record_batch& b) {
reset_size_checksum_metadata(h, body_buf);
auto batch = model::record_batch(
h, std::move(body_buf), model::record_batch::tag_ctor_ng{});
return ss::make_ready_future<model::record_batch>(std::move(batch));
return batch;
}

compress_batch_consumer::compress_batch_consumer(
Expand Down
5 changes: 5 additions & 0 deletions src/v/storage/parser_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class compress_batch_consumer {
ss::future<model::record_batch> decompress_batch(model::record_batch&&);
/// \brief batch decompression
ss::future<model::record_batch> decompress_batch(const model::record_batch&);
/// \brief synchronous batch decompression
model::record_batch decompress_batch_sync(model::record_batch&&);
/// \brief synchronous batch decompression
/// \throw std::runtime_error If provided batch is not compressed
model::record_batch maybe_decompress_batch_sync(const model::record_batch&);

/// \brief batch compression
ss::future<model::record_batch>
Expand Down

0 comments on commit 1e9968e

Please sign in to comment.