Skip to content

Commit

Permalink
PARQUET-689: C++: Compress DataPages eagerly
Browse files Browse the repository at this point in the history
Author: Deepak Majeti <[email protected]>

Closes apache#162 from majetideepak/PARQUET-689 and squashes the following commits:

46f04fb [Deepak Majeti] Clang format
73dfcf9 [Deepak Majeti] Compress Data Pages early
  • Loading branch information
Deepak Majeti authored and wesm committed Sep 2, 2018
1 parent c99966b commit 20e7344
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 18 deletions.
19 changes: 18 additions & 1 deletion cpp/src/parquet/column/page.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ class DataPage : public Page {
std::string min_;
};

class CompressedDataPage : public DataPage {
public:
CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t uncompressed_size)
: DataPage(buffer, num_values, encoding, definition_level_encoding,
repetition_level_encoding),
uncompressed_size_(uncompressed_size) {}

int64_t uncompressed_size() const { return uncompressed_size_; }

private:
int64_t uncompressed_size_;
};

class DataPageV2 : public Page {
public:
DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values, int32_t num_nulls,
Expand Down Expand Up @@ -176,9 +191,11 @@ class PageWriter {
// page limit
virtual void Close(bool has_dictionary, bool fallback) = 0;

virtual int64_t WriteDataPage(const DataPage& page) = 0;
virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;

virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;

virtual std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer) = 0;
};

} // namespace parquet
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/parquet/column/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ void ColumnWriter::AddDataPage() {
memcpy(uncompressed_ptr, definition_levels->data(), definition_levels->size());
uncompressed_ptr += definition_levels->size();
memcpy(uncompressed_ptr, values->data(), values->size());
DataPage page(
uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE);

std::shared_ptr<Buffer> compressed_data = pager_->Compress(uncompressed_data);
CompressedDataPage page(compressed_data, num_buffered_values_, encoding_, Encoding::RLE,
Encoding::RLE, uncompressed_size);

// Write the page to OutputStream eagerly if there is no dictionary or
// if dictionary encoding has fallen back to PLAIN
Expand All @@ -133,7 +135,7 @@ void ColumnWriter::AddDataPage() {
num_buffered_encoded_values_ = 0;
}

void ColumnWriter::WriteDataPage(const DataPage& page) {
void ColumnWriter::WriteDataPage(const CompressedDataPage& page) {
int64_t bytes_written = pager_->WriteDataPage(page);
total_bytes_written_ += bytes_written;
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/column/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class PARQUET_EXPORT ColumnWriter {
void AddDataPage();

// Serializes Data Pages
void WriteDataPage(const DataPage& page);
void WriteDataPage(const CompressedDataPage& page);

// Write multiple definition levels
void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels);
Expand Down Expand Up @@ -128,7 +128,7 @@ class PARQUET_EXPORT ColumnWriter {
std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;

std::vector<DataPage> data_pages_;
std::vector<CompressedDataPage> data_pages_;

private:
void InitSinks();
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/file/writer-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ std::shared_ptr<Buffer> SerializedPageWriter::Compress(
return compression_buffer_;
}

int64_t SerializedPageWriter::WriteDataPage(const DataPage& page) {
int64_t uncompressed_size = page.size();
std::shared_ptr<Buffer> compressed_data = Compress(page.buffer());
int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) {
int64_t uncompressed_size = page.uncompressed_size();
std::shared_ptr<Buffer> compressed_data = page.buffer();

format::DataPageHeader data_page_header;
data_page_header.__set_num_values(page.num_values());
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/parquet/file/writer-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,18 @@ class SerializedPageWriter : public PageWriter {

virtual ~SerializedPageWriter() {}

int64_t WriteDataPage(const DataPage& page) override;
int64_t WriteDataPage(const CompressedDataPage& page) override;

int64_t WriteDictionaryPage(const DictionaryPage& page) override;

/**
* Compress a buffer.
*
* This method may return compression_buffer_ and thus the resulting memory
* is only valid until the next call to Compress().
*/
std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer) override;

void Close(bool has_dictionary, bool fallback) override;

private:
Expand All @@ -58,14 +66,6 @@ class SerializedPageWriter : public PageWriter {
// Compression codec to use.
std::unique_ptr<Codec> compressor_;
std::shared_ptr<OwnedMutableBuffer> compression_buffer_;

/**
* Compress a buffer.
*
* This method may return compression_buffer_ and thus the resulting memory
* is only valid until the next call to Compress().
*/
std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer);
};

// RowGroupWriter::Contents implementation for the Parquet file specification
Expand Down

0 comments on commit 20e7344

Please sign in to comment.