From 904129b2b90a7e6439b604cd971bf532bb4f5af9 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 25 Jun 2017 11:15:45 -0400 Subject: [PATCH] PARQUET-1044: Use compression libraries from Apache Arrow Depends on https://github.com/apache/arrow/pull/771; will update Arrow version once that's merged Author: Wes McKinney Closes #362 from wesm/move-compression-to-arrow and squashes the following commits: b055b69 [Wes McKinney] Check status, do not use /WX with Arrow EP build on MSVC 6e9b9cd [Wes McKinney] Fixes for macOS and Windows 6d39a39 [Wes McKinney] Fix function name a0bfa95 [Wes McKinney] Untabify file e095b10 [Wes McKinney] Use compression toolchain from Arrow Change-Id: I381e1e2f8f8621bfdb4a20900eba1218de29855f --- cpp/src/parquet/CMakeLists.txt | 3 - cpp/src/parquet/compression-test.cc | 84 ----- cpp/src/parquet/compression.cc | 311 ------------------ cpp/src/parquet/compression.h | 106 ------ cpp/src/parquet/exception.h | 25 ++ cpp/src/parquet/file/file-deserialize-test.cc | 14 +- cpp/src/parquet/file/metadata.h | 1 - cpp/src/parquet/file/reader-internal.cc | 11 +- cpp/src/parquet/file/reader-internal.h | 8 +- cpp/src/parquet/file/writer-internal.cc | 12 +- cpp/src/parquet/file/writer-internal.h | 8 +- cpp/src/parquet/parquet_version.h | 2 +- cpp/src/parquet/schema.cc | 8 +- cpp/src/parquet/types.h | 12 +- cpp/src/parquet/util/memory.h | 51 +-- 15 files changed, 100 insertions(+), 556 deletions(-) delete mode 100644 cpp/src/parquet/compression-test.cc delete mode 100644 cpp/src/parquet/compression.cc delete mode 100644 cpp/src/parquet/compression.h diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index ed3fe565f5cdb..f0eedcff1b5c6 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -17,7 +17,6 @@ # Headers: top level install(FILES - compression.h encoding.h exception.h schema.h @@ -41,11 +40,9 @@ install(FILES "${CMAKE_CURRENT_BINARY_DIR}/parquet.pc" DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/") -ADD_PARQUET_TEST(compression-test) ADD_PARQUET_TEST(encoding-test) ADD_PARQUET_TEST(public-api-test) ADD_PARQUET_TEST(types-test) ADD_PARQUET_TEST(reader-test) ADD_PARQUET_TEST(schema-test) - ADD_PARQUET_BENCHMARK(encoding-benchmark) diff --git a/cpp/src/parquet/compression-test.cc b/cpp/src/parquet/compression-test.cc deleted file mode 100644 index feaf9e31ff091..0000000000000 --- a/cpp/src/parquet/compression-test.cc +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include -#include - -#include "parquet/compression.h" -#include "parquet/util/test-common.h" - -using std::string; -using std::vector; - -namespace parquet { - -template -void CheckCodecRoundtrip(const vector& data) { - // create multiple compressors to try to break them - T c1; - T c2; - - int max_compressed_len = static_cast(c1.MaxCompressedLen(data.size(), &data[0])); - std::vector compressed(max_compressed_len); - std::vector decompressed(data.size()); - - // compress with c1 - int actual_size = static_cast( - c1.Compress(data.size(), &data[0], max_compressed_len, &compressed[0])); - compressed.resize(actual_size); - - // decompress with c2 - c2.Decompress(compressed.size(), &compressed[0], decompressed.size(), &decompressed[0]); - - ASSERT_TRUE(test::vector_equal(data, decompressed)); - - // compress with c2 - int actual_size2 = static_cast( - c2.Compress(data.size(), &data[0], max_compressed_len, &compressed[0])); - ASSERT_EQ(actual_size2, actual_size); - - // decompress with c1 - c1.Decompress(compressed.size(), &compressed[0], decompressed.size(), &decompressed[0]); - - ASSERT_TRUE(test::vector_equal(data, decompressed)); -} - -template -void CheckCodec() { - int sizes[] = {10000, 100000}; - for (int data_size : sizes) { - vector data; - test::random_bytes(data_size, 1234, &data); - CheckCodecRoundtrip(data); - } -} - -TEST(TestCompressors, Snappy) { - CheckCodec(); -} - -TEST(TestCompressors, Brotli) { - CheckCodec(); -} - -TEST(TestCompressors, GZip) { - CheckCodec(); -} - -} // namespace parquet diff --git a/cpp/src/parquet/compression.cc b/cpp/src/parquet/compression.cc deleted file mode 100644 index dc6b93da63a28..0000000000000 --- a/cpp/src/parquet/compression.cc +++ /dev/null @@ -1,311 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/compression.h" - -#include -#include -#include - -#include -#include -#include -#include - -#include "parquet/exception.h" -#include "parquet/types.h" - -namespace parquet { - -Codec::~Codec() {} - -std::unique_ptr Codec::Create(Compression::type codec_type) { - std::unique_ptr result; - switch (codec_type) { - case Compression::UNCOMPRESSED: - break; - case Compression::SNAPPY: - result.reset(new SnappyCodec()); - break; - case Compression::GZIP: - result.reset(new GZipCodec()); - break; - case Compression::LZO: - ParquetException::NYI("LZO codec not implemented"); - break; - case Compression::BROTLI: - result.reset(new BrotliCodec()); - break; - default: - ParquetException::NYI("Unrecognized codec"); - break; - } - return result; -} - -// ---------------------------------------------------------------------- -// gzip implementation - -// These are magic numbers from zlib.h. Not clear why they are not defined -// there. - -// Maximum window size -static constexpr int WINDOW_BITS = 15; - -// Output Gzip. -static constexpr int GZIP_CODEC = 16; - -// Determine if this is libz or gzip from header. -static constexpr int DETECT_CODEC = 32; - -class GZipCodec::GZipCodecImpl { - public: - explicit GZipCodecImpl(GZipCodec::Format format) - : format_(format), - compressor_initialized_(false), - decompressor_initialized_(false) {} - - ~GZipCodecImpl() { - EndCompressor(); - EndDecompressor(); - } - - void InitCompressor() { - EndDecompressor(); - memset(&stream_, 0, sizeof(stream_)); - - int ret; - // Initialize to run specified format - int window_bits = WINDOW_BITS; - if (format_ == DEFLATE) { - window_bits = -window_bits; - } else if (format_ == GZIP) { - window_bits += GZIP_CODEC; - } - if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 9, - Z_DEFAULT_STRATEGY)) != Z_OK) { - throw ParquetException("zlib deflateInit failed: " + std::string(stream_.msg)); - } - - compressor_initialized_ = true; - } - - void EndCompressor() { - if (compressor_initialized_) { (void)deflateEnd(&stream_); } - compressor_initialized_ = false; - } - - void InitDecompressor() { - EndCompressor(); - memset(&stream_, 0, sizeof(stream_)); - int ret; - - // Initialize to run either deflate or zlib/gzip format - int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC; - if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { - throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg)); - } - decompressor_initialized_ = true; - } - - void EndDecompressor() { - if (decompressor_initialized_) { (void)inflateEnd(&stream_); } - decompressor_initialized_ = false; - } - - void Decompress(int64_t input_length, const uint8_t* input, int64_t output_length, - uint8_t* output) { - if (!decompressor_initialized_) { InitDecompressor(); } - if (output_length == 0) { - // The zlib library does not allow *output to be NULL, even when output_length - // is 0 (inflate() will return Z_STREAM_ERROR). We don't consider this an - // error, so bail early if no output is expected. Note that we don't signal - // an error if the input actually contains compressed data. - return; - } - - // Reset the stream for this block - if (inflateReset(&stream_) != Z_OK) { - throw ParquetException("zlib inflateReset failed: " + std::string(stream_.msg)); - } - - int ret = 0; - // gzip can run in streaming mode or non-streaming mode. We only - // support the non-streaming use case where we present it the entire - // compressed input and a buffer big enough to contain the entire - // compressed output. In the case where we don't know the output, - // we just make a bigger buffer and try the non-streaming mode - // from the beginning again. - while (ret != Z_STREAM_END) { - stream_.next_in = const_cast(reinterpret_cast(input)); - stream_.avail_in = static_cast(input_length); - stream_.next_out = reinterpret_cast(output); - stream_.avail_out = static_cast(output_length); - - // We know the output size. In this case, we can use Z_FINISH - // which is more efficient. - ret = inflate(&stream_, Z_FINISH); - if (ret == Z_STREAM_END || ret != Z_OK) break; - - // Failure, buffer was too small - std::stringstream ss; - ss << "Too small a buffer passed to GZipCodec. InputLength=" << input_length - << " OutputLength=" << output_length; - throw ParquetException(ss.str()); - } - - // Failure for some other reason - if (ret != Z_STREAM_END) { - std::stringstream ss; - ss << "GZipCodec failed: "; - if (stream_.msg != NULL) ss << stream_.msg; - throw ParquetException(ss.str()); - } - } - - int64_t MaxCompressedLen(int64_t input_length, const uint8_t* input) { - // Most be in compression mode - if (!compressor_initialized_) { InitCompressor(); } - // TODO(wesm): deal with zlib < 1.2.3 (see Impala codebase) - return deflateBound(&stream_, static_cast(input_length)); - } - - int64_t Compress(int64_t input_length, const uint8_t* input, int64_t output_length, - uint8_t* output) { - if (!compressor_initialized_) { InitCompressor(); } - stream_.next_in = const_cast(reinterpret_cast(input)); - stream_.avail_in = static_cast(input_length); - stream_.next_out = reinterpret_cast(output); - stream_.avail_out = static_cast(output_length); - - int64_t ret = 0; - if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) { - if (ret == Z_OK) { - // will return Z_OK (and stream.msg NOT set) if stream.avail_out is too - // small - throw ParquetException("zlib deflate failed, output buffer too small"); - } - std::stringstream ss; - ss << "zlib deflate failed: " << stream_.msg; - throw ParquetException(ss.str()); - } - - if (deflateReset(&stream_) != Z_OK) { - throw ParquetException("zlib deflateReset failed: " + std::string(stream_.msg)); - } - - // Actual output length - return output_length - stream_.avail_out; - } - - private: - // zlib is stateful and the z_stream state variable must be initialized - // before - z_stream stream_; - - // Realistically, this will always be GZIP, but we leave the option open to - // configure - GZipCodec::Format format_; - - // These variables are mutually exclusive. When the codec is in "compressor" - // state, compressor_initialized_ is true while decompressor_initialized_ is - // false. When it's decompressing, the opposite is true. - // - // Indeed, this is slightly hacky, but the alternative is having separate - // Compressor and Decompressor classes. If this ever becomes an issue, we can - // perform the refactoring then - bool compressor_initialized_; - bool decompressor_initialized_; -}; - -GZipCodec::GZipCodec(Format format) { - impl_.reset(new GZipCodecImpl(format)); -} - -GZipCodec::~GZipCodec() {} - -void GZipCodec::Decompress( - int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) { - return impl_->Decompress(input_length, input, output_length, output); -} - -int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) { - return impl_->MaxCompressedLen(input_length, input); -} - -int64_t GZipCodec::Compress( - int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) { - return impl_->Compress(input_length, input, output_length, output); -} - -const char* GZipCodec::name() const { - return "gzip"; -} - -// ---------------------------------------------------------------------- -// Snappy implementation - -void SnappyCodec::Decompress( - int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) { - if (!snappy::RawUncompress(reinterpret_cast(input), - static_cast(input_len), reinterpret_cast(output_buffer))) { - throw parquet::ParquetException("Corrupt snappy compressed data."); - } -} - -int64_t SnappyCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) { - return snappy::MaxCompressedLength(input_len); -} - -int64_t SnappyCodec::Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) { - size_t output_len; - snappy::RawCompress(reinterpret_cast(input), - static_cast(input_len), reinterpret_cast(output_buffer), - &output_len); - return output_len; -} - -// ---------------------------------------------------------------------- -// Brotli implementation - -void BrotliCodec::Decompress( - int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) { - size_t output_size = output_len; - if (BrotliDecoderDecompress(input_len, input, &output_size, output_buffer) != - BROTLI_DECODER_RESULT_SUCCESS) { - throw parquet::ParquetException("Corrupt brotli compressed data."); - } -} - -int64_t BrotliCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) { - return BrotliEncoderMaxCompressedSize(input_len); -} - -int64_t BrotliCodec::Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) { - size_t output_len = output_buffer_len; - // TODO: Make quality configurable. We use 8 as a default as it is the best - // trade-off for Parquet workload - if (BrotliEncoderCompress(8, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, input_len, - input, &output_len, output_buffer) == BROTLI_FALSE) { - throw parquet::ParquetException("Brotli compression failure."); - } - return output_len; -} - -} // namespace parquet diff --git a/cpp/src/parquet/compression.h b/cpp/src/parquet/compression.h deleted file mode 100644 index c1a3bf469e924..0000000000000 --- a/cpp/src/parquet/compression.h +++ /dev/null @@ -1,106 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_COMPRESSION_CODEC_H -#define PARQUET_COMPRESSION_CODEC_H - -#include -#include - -#include "parquet/exception.h" -#include "parquet/types.h" -#include "parquet/util/visibility.h" - -namespace parquet { - -class PARQUET_EXPORT Codec { - public: - virtual ~Codec(); - - static std::unique_ptr Create(Compression::type codec); - - virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, - uint8_t* output_buffer) = 0; - - virtual int64_t Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) = 0; - - virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) = 0; - - virtual const char* name() const = 0; -}; - -// Snappy codec. -class PARQUET_EXPORT SnappyCodec : public Codec { - public: - void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, - uint8_t* output_buffer) override; - - int64_t Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) override; - - int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override; - - const char* name() const override { return "snappy"; } -}; - -// Brotli codec. -class PARQUET_EXPORT BrotliCodec : public Codec { - public: - void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, - uint8_t* output_buffer) override; - - int64_t Compress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, - uint8_t* output_buffer) override; - - int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override; - - const char* name() const override { return "brotli"; } -}; - -// GZip codec. -class PARQUET_EXPORT GZipCodec : public Codec { - public: - /// Compression formats supported by the zlib library - enum Format { - ZLIB, - DEFLATE, - GZIP, - }; - - explicit GZipCodec(Format format = GZIP); - virtual ~GZipCodec(); - - void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, - uint8_t* output_buffer) override; - - int64_t Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) override; - - int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override; - - const char* name() const override; - - private: - // The gzip compressor is stateful - class GZipCodecImpl; - std::unique_ptr impl_; -}; - -} // namespace parquet - -#endif diff --git a/cpp/src/parquet/exception.h b/cpp/src/parquet/exception.h index b161bf77f4ccd..40c24e5a04123 100644 --- a/cpp/src/parquet/exception.h +++ b/cpp/src/parquet/exception.h @@ -19,10 +19,35 @@ #define PARQUET_EXCEPTION_H #include +#include #include +#include "arrow/status.h" + #include "parquet/util/visibility.h" +#define PARQUET_CATCH_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) { \ + return ::arrow::Status::IOError(e.what()); \ + } + +#define PARQUET_IGNORE_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) { UNUSED(e); } + +#define PARQUET_THROW_NOT_OK(s) \ + do { \ + ::arrow::Status _s = (s); \ + if (!_s.ok()) { \ + std::stringstream ss; \ + ss << "Arrow error: " << _s.ToString(); \ + ::parquet::ParquetException::Throw(ss.str()); \ + } \ + } while (0); + namespace parquet { class PARQUET_EXPORT ParquetException : public std::exception { diff --git a/cpp/src/parquet/file/file-deserialize-test.cc b/cpp/src/parquet/file/file-deserialize-test.cc index a823591a4d8c8..7c4690e40f177 100644 --- a/cpp/src/parquet/file/file-deserialize-test.cc +++ b/cpp/src/parquet/file/file-deserialize-test.cc @@ -27,7 +27,6 @@ #include #include "parquet/column/page.h" -#include "parquet/compression.h" #include "parquet/exception.h" #include "parquet/file/reader-internal.h" #include "parquet/parquet_types.h" @@ -38,6 +37,12 @@ namespace parquet { +#define ASSERT_OK(expr) \ + do { \ + ::arrow::Status s = (expr); \ + if (!s.ok()) { FAIL() << s.ToString(); } \ + } while (0) + using ::arrow::io::BufferReader; // Adds page statistics occupying a certain amount of bytes (for testing very @@ -187,7 +192,7 @@ TEST_F(TestPageSerde, Compression) { test::random_bytes(page_size, 0, &faux_data[i]); } for (auto codec_type : codec_types) { - std::unique_ptr codec = Codec::Create(codec_type); + std::unique_ptr<::arrow::Codec> codec = GetCodecFromArrow(codec_type); std::vector buffer; for (int i = 0; i < num_pages; ++i) { @@ -197,8 +202,9 @@ TEST_F(TestPageSerde, Compression) { int64_t max_compressed_size = codec->MaxCompressedLen(data_size, data); buffer.resize(max_compressed_size); - int64_t actual_size = - codec->Compress(data_size, data, max_compressed_size, &buffer[0]); + int64_t actual_size; + ASSERT_OK(codec->Compress( + data_size, data, max_compressed_size, &buffer[0], &actual_size)); WriteDataPageHeader(1024, data_size, static_cast(actual_size)); out_stream_->Write(buffer.data(), actual_size); diff --git a/cpp/src/parquet/file/metadata.h b/cpp/src/parquet/file/metadata.h index d663617adb3d3..50d21144ec827 100644 --- a/cpp/src/parquet/file/metadata.h +++ b/cpp/src/parquet/file/metadata.h @@ -26,7 +26,6 @@ #include "parquet/column/properties.h" #include "parquet/column/statistics.h" -#include "parquet/compression.h" #include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/memory.h" diff --git a/cpp/src/parquet/file/reader-internal.cc b/cpp/src/parquet/file/reader-internal.cc index c5420000c07d0..1d9ab47b5b740 100644 --- a/cpp/src/parquet/file/reader-internal.cc +++ b/cpp/src/parquet/file/reader-internal.cc @@ -24,8 +24,9 @@ #include #include +#include "arrow/util/compression.h" + #include "parquet/column/page.h" -#include "parquet/compression.h" #include "parquet/exception.h" #include "parquet/schema.h" #include "parquet/thrift.h" @@ -41,13 +42,13 @@ namespace parquet { // assembled in a serialized stream for storing in a Parquet files SerializedPageReader::SerializedPageReader(std::unique_ptr stream, - int64_t total_num_rows, Compression::type codec_type, MemoryPool* pool) + int64_t total_num_rows, Compression::type codec, MemoryPool* pool) : stream_(std::move(stream)), decompression_buffer_(AllocateBuffer(pool, 0)), seen_num_rows_(0), total_num_rows_(total_num_rows) { max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE; - decompressor_ = Codec::Create(codec_type); + decompressor_ = GetCodecFromArrow(codec); } std::shared_ptr SerializedPageReader::NextPage() { @@ -99,8 +100,8 @@ std::shared_ptr SerializedPageReader::NextPage() { if (uncompressed_len > static_cast(decompression_buffer_->size())) { PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false)); } - decompressor_->Decompress(compressed_len, buffer, uncompressed_len, - decompression_buffer_->mutable_data()); + PARQUET_THROW_NOT_OK(decompressor_->Decompress(compressed_len, buffer, + uncompressed_len, decompression_buffer_->mutable_data())); buffer = decompression_buffer_->data(); } diff --git a/cpp/src/parquet/file/reader-internal.h b/cpp/src/parquet/file/reader-internal.h index 5d35540dfe90d..1ac23848959ed 100644 --- a/cpp/src/parquet/file/reader-internal.h +++ b/cpp/src/parquet/file/reader-internal.h @@ -24,7 +24,6 @@ #include "parquet/column/page.h" #include "parquet/column/properties.h" -#include "parquet/compression.h" #include "parquet/file/metadata.h" #include "parquet/file/reader.h" #include "parquet/parquet_types.h" @@ -32,6 +31,11 @@ #include "parquet/util/memory.h" #include "parquet/util/visibility.h" +namespace arrow { + +class Codec; +}; + namespace parquet { // 16 MB is the default maximum page header size @@ -63,7 +67,7 @@ class PARQUET_EXPORT SerializedPageReader : public PageReader { std::shared_ptr current_page_; // Compression codec to use. - std::unique_ptr decompressor_; + std::unique_ptr<::arrow::Codec> decompressor_; std::shared_ptr decompression_buffer_; // Maximum allowed page size diff --git a/cpp/src/parquet/file/writer-internal.cc b/cpp/src/parquet/file/writer-internal.cc index 019271f3ec6bb..bb24737e86559 100644 --- a/cpp/src/parquet/file/writer-internal.cc +++ b/cpp/src/parquet/file/writer-internal.cc @@ -17,6 +17,11 @@ #include "parquet/file/writer-internal.h" +#include +#include + +#include "arrow/util/compression.h" + #include "parquet/column/writer.h" #include "parquet/schema-internal.h" #include "parquet/schema.h" @@ -46,7 +51,7 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type data_page_offset_(0), total_uncompressed_size_(0), total_compressed_size_(0) { - compressor_ = Codec::Create(codec); + compressor_ = GetCodecFromArrow(codec); } static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics) { @@ -81,8 +86,9 @@ void SerializedPageWriter::Compress( // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false)); - int64_t compressed_size = compressor_->Compress(src_buffer.size(), src_buffer.data(), - max_compressed_size, dest_buffer->mutable_data()); + int64_t compressed_size; + PARQUET_THROW_NOT_OK(compressor_->Compress(src_buffer.size(), src_buffer.data(), + max_compressed_size, dest_buffer->mutable_data(), &compressed_size)); PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false)); } diff --git a/cpp/src/parquet/file/writer-internal.h b/cpp/src/parquet/file/writer-internal.h index 27dc89ecb1bbc..6ac792753500a 100644 --- a/cpp/src/parquet/file/writer-internal.h +++ b/cpp/src/parquet/file/writer-internal.h @@ -22,12 +22,16 @@ #include #include "parquet/column/page.h" -#include "parquet/compression.h" #include "parquet/file/metadata.h" #include "parquet/file/writer.h" #include "parquet/parquet_types.h" #include "parquet/util/memory.h" +namespace arrow { + +class Codec; +}; + namespace parquet { // This subclass delimits pages appearing in a serialized stream, each preceded @@ -65,7 +69,7 @@ class SerializedPageWriter : public PageWriter { int64_t total_compressed_size_; // Compression codec to use. - std::unique_ptr compressor_; + std::unique_ptr<::arrow::Codec> compressor_; }; // RowGroupWriter::Contents implementation for the Parquet file specification diff --git a/cpp/src/parquet/parquet_version.h b/cpp/src/parquet/parquet_version.h index 453af5eb76a13..5432333f6fe9a 100644 --- a/cpp/src/parquet/parquet_version.h +++ b/cpp/src/parquet/parquet_version.h @@ -21,4 +21,4 @@ // define the parquet created by version #define CREATED_BY_VERSION "parquet-cpp version 1.1.1-SNAPSHOT" -#endif // PARQUET_VERSION_H +#endif // PARQUET_VERSION_H diff --git a/cpp/src/parquet/schema.cc b/cpp/src/parquet/schema.cc index 4efa0b2d21bdd..5fc51fed7c3b1 100644 --- a/cpp/src/parquet/schema.cc +++ b/cpp/src/parquet/schema.cc @@ -264,9 +264,7 @@ int GroupNode::FieldIndex(const std::string& name) const { int GroupNode::FieldIndex(const Node& node) const { int result = FieldIndex(node.name()); - if (result < 0) { - return -1; - } + if (result < 0) { return -1; } DCHECK(result < field_count()); if (!node.Equals(field(result).get())) { // Same name but not the same node @@ -679,9 +677,7 @@ int SchemaDescriptor::ColumnIndex(const std::string& node_path) const { int SchemaDescriptor::ColumnIndex(const Node& node) const { int result = ColumnIndex(node.path()->ToDotString()); - if (result < 0) { - return -1; - } + if (result < 0) { return -1; } DCHECK(result < num_columns()); if (!node.Equals(Column(result)->schema_node().get())) { // Same path but not the same node diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 8504f5d36dccf..7ec3825092738 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -270,17 +270,17 @@ inline std::string format_fwf(int width) { return ss.str(); } -std::string PARQUET_EXPORT CompressionToString(Compression::type t); +PARQUET_EXPORT std::string CompressionToString(Compression::type t); -std::string PARQUET_EXPORT EncodingToString(Encoding::type t); +PARQUET_EXPORT std::string EncodingToString(Encoding::type t); -std::string PARQUET_EXPORT LogicalTypeToString(LogicalType::type t); +PARQUET_EXPORT std::string LogicalTypeToString(LogicalType::type t); -std::string PARQUET_EXPORT TypeToString(Type::type t); +PARQUET_EXPORT std::string TypeToString(Type::type t); -std::string PARQUET_EXPORT FormatStatValue(Type::type parquet_type, const char* val); +PARQUET_EXPORT std::string FormatStatValue(Type::type parquet_type, const char* val); -int PARQUET_EXPORT GetTypeByteSize(Type::type t); +PARQUET_EXPORT int GetTypeByteSize(Type::type t); } // namespace parquet diff --git a/cpp/src/parquet/util/memory.h b/cpp/src/parquet/util/memory.h index ca244a75b5996..4f780c4f29260 100644 --- a/cpp/src/parquet/util/memory.h +++ b/cpp/src/parquet/util/memory.h @@ -31,35 +31,42 @@ #include "arrow/io/memory.h" #include "arrow/memory_pool.h" #include "arrow/status.h" +#include "arrow/util/compression.h" #include "parquet/exception.h" +#include "parquet/types.h" #include "parquet/util/macros.h" #include "parquet/util/visibility.h" -#define PARQUET_CATCH_NOT_OK(s) \ - try { \ - (s); \ - } catch (const ::parquet::ParquetException& e) { \ - return ::arrow::Status::IOError(e.what()); \ - } - -#define PARQUET_IGNORE_NOT_OK(s) \ - try { \ - (s); \ - } catch (const ::parquet::ParquetException& e) { UNUSED(e); } - -#define PARQUET_THROW_NOT_OK(s) \ - do { \ - ::arrow::Status _s = (s); \ - if (!_s.ok()) { \ - std::stringstream ss; \ - ss << "Arrow error: " << _s.ToString(); \ - ::parquet::ParquetException::Throw(ss.str()); \ - } \ - } while (0); - namespace parquet { +static inline std::unique_ptr<::arrow::Codec> GetCodecFromArrow(Compression::type codec) { + std::unique_ptr<::arrow::Codec> result; + switch (codec) { + case Compression::UNCOMPRESSED: + break; + case Compression::SNAPPY: + PARQUET_THROW_NOT_OK( + ::arrow::Codec::Create(::arrow::Compression::SNAPPY, &result)); + break; + case Compression::GZIP: + PARQUET_THROW_NOT_OK( + ::arrow::Codec::Create(::arrow::Compression::GZIP, &result)); + break; + case Compression::LZO: + PARQUET_THROW_NOT_OK( + ::arrow::Codec::Create(::arrow::Compression::LZO, &result)); + break; + case Compression::BROTLI: + PARQUET_THROW_NOT_OK( + ::arrow::Codec::Create(::arrow::Compression::BROTLI, &result)); + break; + default: + break; + } + return result; +} + static constexpr int64_t kInMemoryDefaultCapacity = 1024; using Buffer = ::arrow::Buffer;