Skip to content

Commit

Permalink
pp: Decompressing compressed batches
Browse files Browse the repository at this point in the history
Added calls that will decompress record_batch's synchronously when
fetched via kafka client.
  • Loading branch information
michael-redpanda committed Apr 18, 2023
1 parent 1e9968e commit aa69b3e
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions src/v/pandaproxy/json/requests/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "pandaproxy/json/rjson_util.h"
#include "pandaproxy/json/types.h"
#include "seastarx.h"
#include "storage/parser_utils.h"

#include <seastar/core/sstring.hh>

Expand Down Expand Up @@ -121,20 +122,26 @@ class rjson_serialize_impl<kafka::fetch_response> {
auto rjs = rjson_serialize_impl<model::record>(
_fmt, tpv, adapter.batch->base_offset());

adapter.batch->for_each_record(
[&rjs, &w](model::record record) {
auto offset = record.offset_delta() + rjs.base_offset()();
if (!rjs(w, std::move(record))) {
throw serialize_error(
make_error_code(error_code::unable_to_serialize),
fmt::format(
"Unable to serialize record at offset {} in "
"topic:partition {}:{}",
offset,
rjs.tpv().topic(),
rjs.tpv().partition()));
}
});
auto batch = std::move(*adapter.batch);

if (batch.compressed()) {
batch = storage::internal::maybe_decompress_batch_sync(
batch);
}

batch.for_each_record([&rjs, &w](model::record record) {
auto offset = record.offset_delta() + rjs.base_offset()();
if (!rjs(w, std::move(record))) {
throw serialize_error(
make_error_code(error_code::unable_to_serialize),
fmt::format(
"Unable to serialize record at offset {} in "
"topic:partition {}:{}",
offset,
rjs.tpv().topic(),
rjs.tpv().partition()));
}
});
}
}
w.EndArray();
Expand Down

0 comments on commit aa69b3e

Please sign in to comment.