diff --git a/src/v/storage/parser_utils.cc b/src/v/storage/parser_utils.cc index c51b1333b99a..0723f9f9cdf2 100644 --- a/src/v/storage/parser_utils.cc +++ b/src/v/storage/parser_utils.cc @@ -34,19 +34,27 @@ model::record_batch_reader decompress_batch_consumer::end_of_stream() { } ss::future decompress_batch(model::record_batch&& b) { + return ss::futurize_invoke(decompress_batch_sync, std::move(b)); +} + +ss::future decompress_batch(const model::record_batch& b) { + return ss::futurize_invoke(maybe_decompress_batch_sync, b); +} + +model::record_batch decompress_batch_sync(model::record_batch&& b) { if (!b.compressed()) { - return ss::make_ready_future(std::move(b)); + return std::move(b); } - return decompress_batch(b); + + return maybe_decompress_batch_sync(b); } -ss::future decompress_batch(const model::record_batch& b) { +model::record_batch maybe_decompress_batch_sync(const model::record_batch& b) { if (unlikely(!b.compressed())) { - return ss::make_exception_future( - std::runtime_error(fmt_with_ctx( - fmt::format, - "Asked to decompressed a non-compressed batch:{}", - b.header()))); + throw std::runtime_error(fmt_with_ctx( + fmt::format, + "Asked to decompressed a non-compressed batch:{}", + b.header())); } iobuf body_buf = compression::compressor::uncompress( b.data(), b.header().attrs.compression()); @@ -56,7 +64,7 @@ ss::future decompress_batch(const model::record_batch& b) { reset_size_checksum_metadata(h, body_buf); auto batch = model::record_batch( h, std::move(body_buf), model::record_batch::tag_ctor_ng{}); - return ss::make_ready_future(std::move(batch)); + return batch; } compress_batch_consumer::compress_batch_consumer( diff --git a/src/v/storage/parser_utils.h b/src/v/storage/parser_utils.h index 46059bd6a81c..0cfff7761403 100644 --- a/src/v/storage/parser_utils.h +++ b/src/v/storage/parser_utils.h @@ -44,6 +44,11 @@ class compress_batch_consumer { ss::future decompress_batch(model::record_batch&&); /// \brief batch decompression ss::future decompress_batch(const model::record_batch&); +/// \brief synchronous batch decompression +model::record_batch decompress_batch_sync(model::record_batch&&); +/// \brief synchronous batch decompression +/// \throw std::runtime_error If provided batch is not compressed +model::record_batch maybe_decompress_batch_sync(const model::record_batch&); /// \brief batch compression ss::future