From d35efe214bbd05cfabce42d226e87b725a90d9e9 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 11 Feb 2016 00:21:32 -0800 Subject: [PATCH] PARQUET-497: Decouple serialized file internals from the ParquetFileReader public API This depends on PARQUET-501. A bit of a refactoring bloodbath, but extremely important to split out these details so that we can instrument the file reader public APIs with test fixtures for unit testing purposes. Author: Wes McKinney Closes #47 from wesm/PARQUET-497 and squashes the following commits: aa152ad [Wes McKinney] Decouple Parquet file format details and Thrift metadata from the ParquetFileReader and RowGroupReader public APIs. Change-Id: Ifc84ba4520517a304f373110cff6a5a43f505d2d --- cpp/src/parquet/CMakeLists.txt | 1 - cpp/src/parquet/file/CMakeLists.txt | 20 ++ cpp/src/parquet/file/reader-internal.cc | 166 +++++++++++++++ cpp/src/parquet/file/reader-internal.h | 79 +++++++ cpp/src/parquet/file/reader.cc | 207 +++++++++++++++++++ cpp/src/parquet/file/reader.h | 139 +++++++++++++ cpp/src/parquet/parquet.h | 2 +- cpp/src/parquet/reader-test.cc | 21 +- cpp/src/parquet/reader.cc | 262 ------------------------ cpp/src/parquet/reader.h | 118 ----------- cpp/src/parquet/schema/descriptor.h | 4 + 11 files changed, 626 insertions(+), 393 deletions(-) create mode 100644 cpp/src/parquet/file/CMakeLists.txt create mode 100644 cpp/src/parquet/file/reader-internal.cc create mode 100644 cpp/src/parquet/file/reader-internal.h create mode 100644 cpp/src/parquet/file/reader.cc create mode 100644 cpp/src/parquet/file/reader.h delete mode 100644 cpp/src/parquet/reader.cc delete mode 100644 cpp/src/parquet/reader.h diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 2d69ba067c8a7..6a47917e95ff4 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -18,7 +18,6 @@ # Headers: top level install(FILES parquet.h - reader.h exception.h types.h DESTINATION include/parquet) diff --git a/cpp/src/parquet/file/CMakeLists.txt b/cpp/src/parquet/file/CMakeLists.txt new file mode 100644 index 0000000000000..ef6ac01ea7ed5 --- /dev/null +++ b/cpp/src/parquet/file/CMakeLists.txt @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +install(FILES + reader.h + DESTINATION include/parquet/file) diff --git a/cpp/src/parquet/file/reader-internal.cc b/cpp/src/parquet/file/reader-internal.cc new file mode 100644 index 0000000000000..7b0a7195f8e5a --- /dev/null +++ b/cpp/src/parquet/file/reader-internal.cc @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file/reader-internal.h" + +#include +#include + +#include "parquet/column/serialized-page.h" +#include "parquet/schema/converter.h" +#include "parquet/thrift/util.h" +#include "parquet/util/input.h" + +namespace parquet_cpp { + +// ---------------------------------------------------------------------- +// SerializedRowGroup + +int SerializedRowGroup::num_columns() const { + return metadata_->columns.size(); +} + +std::unique_ptr SerializedRowGroup::GetColumnPageReader(int i) { + // Read column chunk from the file + const parquet::ColumnChunk& col = metadata_->columns[i]; + + int64_t col_start = col.meta_data.data_page_offset; + if (col.meta_data.__isset.dictionary_page_offset && + col_start > col.meta_data.dictionary_page_offset) { + col_start = col.meta_data.dictionary_page_offset; + } + + // TODO(wesm): some input streams (e.g. memory maps) may not require + // copying data. This should be added to the input stream API to support + // zero-copy streaming + std::unique_ptr input( + new ScopedInMemoryInputStream(col.meta_data.total_compressed_size)); + + source_->Seek(col_start); + ScopedInMemoryInputStream* scoped_input = + static_cast(input.get()); + size_t bytes_read = source_->Read(scoped_input->size(), scoped_input->data()); + + if (bytes_read != scoped_input->size()) { + throw ParquetException("Unable to read column chunk data"); + } + + const ColumnDescriptor* descr = schema_->Column(i); + + return std::unique_ptr(new SerializedPageReader(std::move(input), + col.meta_data.codec)); +} + +RowGroupStatistics SerializedRowGroup::GetColumnStats(int i) { + const parquet::ColumnMetaData& meta_data = metadata_->columns[i].meta_data; + + RowGroupStatistics result; + result.num_values = meta_data.num_values; + result.null_count = meta_data.statistics.null_count; + result.distinct_count = meta_data.statistics.distinct_count; + + return result; +} + +// ---------------------------------------------------------------------- +// SerializedFile: Parquet on-disk layout + +static constexpr uint32_t FOOTER_SIZE = 8; +static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; + +std::unique_ptr SerializedFile::Open( + std::unique_ptr source) { + std::unique_ptr result( + new SerializedFile(std::move(source))); + + // Access private methods here, but otherwise unavailable + SerializedFile* file = static_cast(result.get()); + + // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor + file->ParseMetaData(); + + return result; +} + +void SerializedFile::Close() { + source_->Close(); +} + +std::shared_ptr SerializedFile::GetRowGroup(int i) { + std::unique_ptr contents(new SerializedRowGroup(source_.get(), + &schema_, &metadata_.row_groups[i])); + + return std::make_shared(&schema_, std::move(contents)); +} + +int64_t SerializedFile::num_rows() const { + return metadata_.num_rows; +} + +int SerializedFile::num_columns() const { + return schema_.num_columns(); +} + +int SerializedFile::num_row_groups() const { + return metadata_.row_groups.size(); +} + +SerializedFile::SerializedFile(std::unique_ptr source) : + source_(std::move(source)) {} + + +void SerializedFile::ParseMetaData() { + size_t filesize = source_->Size(); + + if (filesize < FOOTER_SIZE) { + throw ParquetException("Corrupted file, smaller than file footer"); + } + + uint8_t footer_buffer[FOOTER_SIZE]; + source_->Seek(filesize - FOOTER_SIZE); + size_t bytes_read = source_->Read(FOOTER_SIZE, footer_buffer); + + if (bytes_read != FOOTER_SIZE) { + throw ParquetException("Invalid parquet file. Corrupt footer."); + } + if (memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) { + throw ParquetException("Invalid parquet file. Corrupt footer."); + } + + uint32_t metadata_len = *reinterpret_cast(footer_buffer); + size_t metadata_start = filesize - FOOTER_SIZE - metadata_len; + if (FOOTER_SIZE + metadata_len > filesize) { + throw ParquetException("Invalid parquet file. File is less than " + "file metadata size."); + } + + source_->Seek(metadata_start); + + std::vector metadata_buffer(metadata_len); + bytes_read = source_->Read(metadata_len, &metadata_buffer[0]); + if (bytes_read != metadata_len) { + throw ParquetException("Invalid parquet file. Could not read metadata bytes."); + } + + DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_); + + schema::FlatSchemaConverter converter(&metadata_.schema[0], + metadata_.schema.size()); + schema_.Init(converter.Convert()); +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/file/reader-internal.h b/cpp/src/parquet/file/reader-internal.h new file mode 100644 index 0000000000000..8ba105eaf4d38 --- /dev/null +++ b/cpp/src/parquet/file/reader-internal.h @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_FILE_READER_INTERNAL_H +#define PARQUET_FILE_READER_INTERNAL_H + +#include "parquet/file/reader.h" + +#include + +#include "parquet/schema/descriptor.h" +#include "parquet/util/input.h" +#include "parquet/thrift/parquet_types.h" + +namespace parquet_cpp { + +// RowGroupReader::Contents implementation for the Parquet file specification +class SerializedRowGroup : public RowGroupReader::Contents { + public: + SerializedRowGroup(RandomAccessSource* source, const SchemaDescriptor* schema, + const parquet::RowGroup* metadata) : + source_(source), + schema_(schema), + metadata_(metadata) {} + + virtual int num_columns() const; + virtual std::unique_ptr GetColumnPageReader(int i); + virtual RowGroupStatistics GetColumnStats(int i); + + private: + RandomAccessSource* source_; + const SchemaDescriptor* schema_; + const parquet::RowGroup* metadata_; +}; + +// An implementation of ParquetFileReader::Contents that deals with the Parquet +// file structure, Thrift deserialization, and other internal matters + +class SerializedFile : public ParquetFileReader::Contents { + public: + // Open the valid and validate the header, footer, and parse the Thrift metadata + // + // This class does _not_ take ownership of the data source. You must manage its + // lifetime separately + static std::unique_ptr Open( + std::unique_ptr source); + virtual void Close(); + virtual std::shared_ptr GetRowGroup(int i); + virtual int64_t num_rows() const; + virtual int num_columns() const; + virtual int num_row_groups() const; + + private: + // This class takes ownership of the provided data source + explicit SerializedFile(std::unique_ptr source); + + std::unique_ptr source_; + parquet::FileMetaData metadata_; + + void ParseMetaData(); +}; + +} // namespace parquet_cpp + +#endif // PARQUET_FILE_READER_INTERNAL_H diff --git a/cpp/src/parquet/file/reader.cc b/cpp/src/parquet/file/reader.cc new file mode 100644 index 0000000000000..6ef59edcef097 --- /dev/null +++ b/cpp/src/parquet/file/reader.cc @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file/reader.h" + +#include +#include +#include +#include +#include +#include + +#include "parquet/column/reader.h" +#include "parquet/column/scanner.h" + +#include "parquet/exception.h" +#include "parquet/file/reader-internal.h" + +using std::string; +using std::vector; + +namespace parquet_cpp { + +// ---------------------------------------------------------------------- +// RowGroupReader public API + +RowGroupReader::RowGroupReader(const SchemaDescriptor* schema, + std::unique_ptr contents) : + schema_(schema), + contents_(std::move(contents)) {} + +int RowGroupReader::num_columns() const { + return contents_->num_columns(); +} + +std::shared_ptr RowGroupReader::Column(int i) { + // TODO: boundschecking + auto it = column_readers_.find(i); + if (it != column_readers_.end()) { + // Already have constructed the ColumnReader + return it->second; + } + + const ColumnDescriptor* descr = schema_->Column(i); + + std::unique_ptr page_reader = contents_->GetColumnPageReader(i); + std::shared_ptr reader = ColumnReader::Make(descr, + std::move(page_reader)); + column_readers_[i] = reader; + return reader; +} + +RowGroupStatistics RowGroupReader::GetColumnStats(int i) const { + return contents_->GetColumnStats(i); +} + +// ---------------------------------------------------------------------- +// ParquetFileReader public API + +ParquetFileReader::ParquetFileReader() : schema_(nullptr) {} +ParquetFileReader::~ParquetFileReader() {} + +std::unique_ptr ParquetFileReader::OpenFile(const std::string& path) { + std::unique_ptr file(new LocalFileSource()); + file->Open(path); + + auto contents = SerializedFile::Open(std::move(file)); + + std::unique_ptr result(new ParquetFileReader()); + result->Open(std::move(contents)); + + return result; +} + +void ParquetFileReader::Open(std::unique_ptr contents) { + contents_ = std::move(contents); + schema_ = contents_->schema(); +} + +void ParquetFileReader::Close() { + contents_->Close(); +} + +int ParquetFileReader::num_row_groups() const { + return contents_->num_row_groups(); +} + +int64_t ParquetFileReader::num_rows() const { + return contents_->num_rows(); +} + +int ParquetFileReader::num_columns() const { + return schema_->num_columns(); +} + +RowGroupReader* ParquetFileReader::RowGroup(int i) { + if (i >= num_row_groups()) { + std::stringstream ss; + ss << "The file only has " << num_row_groups() + << "row groups, requested reader for: " + << i; + throw ParquetException(ss.str()); + } + + auto it = row_group_readers_.find(i); + if (it != row_group_readers_.end()) { + // Constructed the RowGroupReader already + return it->second.get(); + } + + row_group_readers_[i] = contents_->GetRowGroup(i); + return row_group_readers_[i].get(); +} + +// ---------------------------------------------------------------------- +// ParquetFileReader::DebugPrint + +// the fixed initial size is just for an example +#define COL_WIDTH "20" + + +void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) { + stream << "File statistics:\n"; + stream << "Total rows: " << this->num_rows() << "\n"; + + for (int i = 0; i < num_columns(); ++i) { + const ColumnDescriptor* descr = schema_->Column(i); + stream << "Column " << i << ": " + << descr->name() + << " (" + << type_to_string(descr->physical_type()) + << ")" << std::endl; + } + + for (int r = 0; r < num_row_groups(); ++r) { + stream << "--- Row Group " << r << " ---\n"; + + RowGroupReader* group_reader = RowGroup(r); + + // Print column metadata + size_t num_columns = group_reader->num_columns(); + + for (int i = 0; i < num_columns; ++i) { + RowGroupStatistics stats = group_reader->GetColumnStats(i); + + stream << "Column " << i << ": " + << stats.num_values << " rows, " + << stats.null_count << " null values, " + << stats.distinct_count << " distinct values, " + << std::endl; + } + + if (!print_values) { + continue; + } + + static constexpr size_t bufsize = 25; + char buffer[bufsize]; + + // Create readers for all columns and print contents + vector > scanners(num_columns, NULL); + for (int i = 0; i < num_columns; ++i) { + std::shared_ptr col_reader = group_reader->Column(i); + Type::type col_type = col_reader->type(); + + std::stringstream ss; + ss << "%-" << COL_WIDTH << "s"; + std::string fmt = ss.str(); + + snprintf(buffer, bufsize, fmt.c_str(), column_schema(i)->name().c_str()); + stream << buffer; + + // This is OK in this method as long as the RowGroupReader does not get + // deleted + scanners[i] = Scanner::Make(col_reader); + } + stream << "\n"; + + bool hasRow; + do { + hasRow = false; + for (int i = 0; i < num_columns; ++i) { + if (scanners[i]->HasNext()) { + hasRow = true; + scanners[i]->PrintNext(stream, 17); + } + } + stream << "\n"; + } while (hasRow); + } +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/file/reader.h b/cpp/src/parquet/file/reader.h new file mode 100644 index 0000000000000..3ff8697cf2613 --- /dev/null +++ b/cpp/src/parquet/file/reader.h @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_FILE_READER_H +#define PARQUET_FILE_READER_H + +#include +#include +#include +#include +#include + +#include "parquet/types.h" +#include "parquet/schema/descriptor.h" + +// TODO(wesm): Still depends on Thrift +#include "parquet/column/page.h" + +namespace parquet_cpp { + +class ColumnReader; +class ParquetFileReader; + +struct RowGroupStatistics { + int64_t num_values; + int64_t null_count; + int64_t distinct_count; +}; + +class RowGroupReader { + public: + // Forward declare the PIMPL + struct Contents { + virtual int num_columns() const = 0; + virtual RowGroupStatistics GetColumnStats(int i) = 0; + virtual std::unique_ptr GetColumnPageReader(int i) = 0; + }; + + RowGroupReader(const SchemaDescriptor* schema, std::unique_ptr contents); + + // Construct a ColumnReader for the indicated row group-relative + // column. Ownership is shared with the RowGroupReader. + std::shared_ptr Column(int i); + int num_columns() const; + + RowGroupStatistics GetColumnStats(int i) const; + + private: + // Owned by the parent ParquetFileReader + const SchemaDescriptor* schema_; + + // PIMPL idiom + // This is declared in the .cc file so that we can hide compiled Thrift + // headers from the public API and also more easily create test fixtures. + std::unique_ptr contents_; + + // Column index -> ColumnReader + std::unordered_map > column_readers_; +}; + + +class ParquetFileReader { + public: + // Forward declare the PIMPL + struct Contents { + // Perform any cleanup associated with the file contents + virtual void Close() = 0; + + virtual std::shared_ptr GetRowGroup(int i) = 0; + + virtual int64_t num_rows() const = 0; + virtual int num_columns() const = 0; + virtual int num_row_groups() const = 0; + + // Return const-poitner to make it clear that this object is not to be copied + const SchemaDescriptor* schema() const { + return &schema_; + } + SchemaDescriptor schema_; + }; + + ParquetFileReader(); + ~ParquetFileReader(); + + // API Convenience to open a serialized Parquet file on disk + static std::unique_ptr OpenFile(const std::string& path); + + void Open(std::unique_ptr contents); + void Close(); + + // The RowGroupReader is owned by the FileReader + RowGroupReader* RowGroup(int i); + + int num_columns() const; + int64_t num_rows() const; + int num_row_groups() const; + + // Returns the file schema descriptor + const SchemaDescriptor* descr() { + return schema_; + } + + const ColumnDescriptor* column_schema(int i) const { + return schema_->Column(i); + } + + void DebugPrint(std::ostream& stream, bool print_values = true); + + private: + // PIMPL idiom + // This is declared in the .cc file so that we can hide compiled Thrift + // headers from the public API and also more easily create test fixtures. + std::unique_ptr contents_; + + // The SchemaDescriptor is provided by the Contents impl + const SchemaDescriptor* schema_; + + // Row group index -> RowGroupReader + std::unordered_map > row_group_readers_; +}; + + +} // namespace parquet_cpp + +#endif // PARQUET_FILE_READER_H diff --git a/cpp/src/parquet/parquet.h b/cpp/src/parquet/parquet.h index 7030d0ed18d47..b8624aea35ef9 100644 --- a/cpp/src/parquet/parquet.h +++ b/cpp/src/parquet/parquet.h @@ -27,8 +27,8 @@ #include #include "parquet/exception.h" -#include "parquet/reader.h" #include "parquet/column/reader.h" +#include "parquet/file/reader.h" #include "parquet/util/input.h" #include "parquet/util/output.h" diff --git a/cpp/src/parquet/reader-test.cc b/cpp/src/parquet/reader-test.cc index 8da8b9992a685..8599e7e3d2085 100644 --- a/cpp/src/parquet/reader-test.cc +++ b/cpp/src/parquet/reader-test.cc @@ -17,12 +17,13 @@ #include #include +#include #include #include #include -#include "parquet/reader.h" +#include "parquet/file/reader.h" #include "parquet/column/reader.h" #include "parquet/column/scanner.h" #include "parquet/util/input.h" @@ -41,24 +42,22 @@ class TestAllTypesPlain : public ::testing::Test { std::stringstream ss; ss << dir_string << "/" << "alltypes_plain.parquet"; - file_.Open(ss.str()); - reader_.Open(&file_); + + reader_ = ParquetFileReader::OpenFile(ss.str()); } void TearDown() {} protected: - LocalFileSource file_; - ParquetFileReader reader_; + std::unique_ptr reader_; }; -TEST_F(TestAllTypesPlain, ParseMetaData) { - reader_.ParseMetaData(); +TEST_F(TestAllTypesPlain, NoopConstructDestruct) { } TEST_F(TestAllTypesPlain, TestBatchRead) { - RowGroupReader* group = reader_.RowGroup(0); + RowGroupReader* group = reader_->RowGroup(0); // column 0, id std::shared_ptr col = @@ -86,7 +85,7 @@ TEST_F(TestAllTypesPlain, TestBatchRead) { } TEST_F(TestAllTypesPlain, TestFlatScannerInt32) { - RowGroupReader* group = reader_.RowGroup(0); + RowGroupReader* group = reader_->RowGroup(0); // column 0, id std::shared_ptr scanner(new Int32Scanner(group->Column(0))); @@ -103,7 +102,7 @@ TEST_F(TestAllTypesPlain, TestFlatScannerInt32) { TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) { - RowGroupReader* group = reader_.RowGroup(0); + RowGroupReader* group = reader_->RowGroup(0); // column 0, id std::shared_ptr scanner(new Int32Scanner(group->Column(0))); @@ -118,7 +117,7 @@ TEST_F(TestAllTypesPlain, DebugPrintWorks) { std::stringstream ss; // Automatically parses metadata - reader_.DebugPrint(ss); + reader_->DebugPrint(ss); std::string result = ss.str(); ASSERT_GT(result.size(), 0); diff --git a/cpp/src/parquet/reader.cc b/cpp/src/parquet/reader.cc deleted file mode 100644 index 3fcce907ba5c2..0000000000000 --- a/cpp/src/parquet/reader.cc +++ /dev/null @@ -1,262 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/reader.h" - -#include -#include -#include -#include -#include -#include - -#include "parquet/column/reader.h" -#include "parquet/column/serialized-page.h" -#include "parquet/column/scanner.h" - -#include "parquet/exception.h" -#include "parquet/schema/converter.h" -#include "parquet/thrift/util.h" - -using std::string; -using std::vector; - -namespace parquet_cpp { - -// ---------------------------------------------------------------------- -// RowGroupReader - -std::shared_ptr RowGroupReader::Column(size_t i) { - // TODO: boundschecking - auto it = column_readers_.find(i); - if (it != column_readers_.end()) { - // Already have constructed the ColumnReader - return it->second; - } - - const parquet::ColumnChunk& col = row_group_->columns[i]; - - size_t col_start = col.meta_data.data_page_offset; - if (col.meta_data.__isset.dictionary_page_offset && - col_start > col.meta_data.dictionary_page_offset) { - col_start = col.meta_data.dictionary_page_offset; - } - - std::unique_ptr input( - new ScopedInMemoryInputStream(col.meta_data.total_compressed_size)); - - RandomAccessSource* source = this->parent_->buffer_; - - source->Seek(col_start); - - // TODO(wesm): Law of demeter violation - ScopedInMemoryInputStream* scoped_input = - static_cast(input.get()); - size_t bytes_read = source->Read(scoped_input->size(), scoped_input->data()); - if (bytes_read != scoped_input->size()) { - std::cout << "Bytes needed: " << col.meta_data.total_compressed_size << std::endl; - std::cout << "Bytes read: " << bytes_read << std::endl; - throw ParquetException("Unable to read column chunk data"); - } - - const ColumnDescriptor* descr = parent_->column_descr(i); - - std::unique_ptr pager( - new SerializedPageReader(std::move(input), col.meta_data.codec)); - - std::shared_ptr reader = ColumnReader::Make(descr, - std::move(pager)); - column_readers_[i] = reader; - - return reader; -} - -// ---------------------------------------------------------------------- -// ParquetFileReader - -// 4 byte constant + 4 byte metadata len -static constexpr uint32_t FOOTER_SIZE = 8; -static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; - -ParquetFileReader::ParquetFileReader() : - parsed_metadata_(false), - buffer_(nullptr) {} - -ParquetFileReader::~ParquetFileReader() {} - -void ParquetFileReader::Open(RandomAccessSource* buffer) { - buffer_ = buffer; -} - -void ParquetFileReader::Close() { - buffer_->Close(); -} - -RowGroupReader* ParquetFileReader::RowGroup(size_t i) { - if (!parsed_metadata_) { - ParseMetaData(); - } - - if (i >= num_row_groups()) { - std::stringstream ss; - ss << "The file only has " << num_row_groups() - << "row groups, requested reader for: " - << i; - throw ParquetException(ss.str()); - } - - auto it = row_group_readers_.find(i); - if (it != row_group_readers_.end()) { - // Constructed the RowGroupReader already - return it->second.get(); - } - if (!parsed_metadata_) { - ParseMetaData(); - } - - // Construct the RowGroupReader - row_group_readers_[i] = std::make_shared(this, - &metadata_.row_groups[i]); - return row_group_readers_[i].get(); -} - -void ParquetFileReader::ParseMetaData() { - size_t filesize = buffer_->Size(); - - if (filesize < FOOTER_SIZE) { - throw ParquetException("Corrupted file, smaller than file footer"); - } - - uint8_t footer_buffer[FOOTER_SIZE]; - - buffer_->Seek(filesize - FOOTER_SIZE); - - size_t bytes_read = buffer_->Read(FOOTER_SIZE, footer_buffer); - - if (bytes_read != FOOTER_SIZE) { - throw ParquetException("Invalid parquet file. Corrupt footer."); - } - if (memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) { - throw ParquetException("Invalid parquet file. Corrupt footer."); - } - - uint32_t metadata_len = *reinterpret_cast(footer_buffer); - size_t metadata_start = filesize - FOOTER_SIZE - metadata_len; - if (FOOTER_SIZE + metadata_len > filesize) { - throw ParquetException("Invalid parquet file. File is less than file metadata size."); - } - - buffer_->Seek(metadata_start); - - std::vector metadata_buffer(metadata_len); - bytes_read = buffer_->Read(metadata_len, &metadata_buffer[0]); - if (bytes_read != metadata_len) { - throw ParquetException("Invalid parquet file. Could not read metadata bytes."); - } - DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_); - - schema::FlatSchemaConverter converter(&metadata_.schema[0], - metadata_.schema.size()); - schema_descr_.Init(converter.Convert()); - - parsed_metadata_ = true; -} - -// ---------------------------------------------------------------------- -// ParquetFileReader::DebugPrint - -// the fixed initial size is just for an example -#define COL_WIDTH "20" - - -void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) { - if (!parsed_metadata_) { - ParseMetaData(); - } - - stream << "File statistics:\n"; - stream << "Total rows: " << metadata_.num_rows << "\n"; - for (int i = 0; i < num_columns(); ++i) { - const ColumnDescriptor* descr = column_descr(i); - stream << "Column " << i << ": " - << descr->name() - << " (" - << type_to_string(descr->physical_type()) - << ")" << std::endl; - } - - for (int r = 0; r < num_row_groups(); ++r) { - stream << "--- Row Group " << r << " ---\n"; - - RowGroupReader* group_reader = RowGroup(r); - - // Print column metadata - size_t num_columns = group_reader->num_columns(); - - for (int i = 0; i < num_columns; ++i) { - const parquet::ColumnMetaData* meta_data = group_reader->column_metadata(i); - stream << "Column " << i << ": " - << meta_data->num_values << " rows, " - << meta_data->statistics.null_count << " null values, " - << meta_data->statistics.distinct_count << " distinct values, " - << "min value: " << (meta_data->statistics.min.length() > 0 ? - meta_data->statistics.min : "N/A") - << ", max value: " << (meta_data->statistics.max.length() > 0 ? - meta_data->statistics.max : "N/A") << ".\n"; - } - - if (!print_values) { - continue; - } - - static constexpr size_t bufsize = 25; - char buffer[bufsize]; - - // Create readers for all columns and print contents - vector > scanners(num_columns, NULL); - for (int i = 0; i < num_columns; ++i) { - std::shared_ptr col_reader = group_reader->Column(i); - Type::type col_type = col_reader->type(); - - std::stringstream ss; - ss << "%-" << COL_WIDTH << "s"; - std::string fmt = ss.str(); - - snprintf(buffer, bufsize, fmt.c_str(), column_descr(i)->name().c_str()); - stream << buffer; - - // This is OK in this method as long as the RowGroupReader does not get - // deleted - scanners[i] = Scanner::Make(col_reader); - } - stream << "\n"; - - bool hasRow; - do { - hasRow = false; - for (int i = 0; i < num_columns; ++i) { - if (scanners[i]->HasNext()) { - hasRow = true; - scanners[i]->PrintNext(stream, 17); - } - } - stream << "\n"; - } while (hasRow); - } -} - -} // namespace parquet_cpp diff --git a/cpp/src/parquet/reader.h b/cpp/src/parquet/reader.h deleted file mode 100644 index 3a9dc5d6e03e8..0000000000000 --- a/cpp/src/parquet/reader.h +++ /dev/null @@ -1,118 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_FILE_READER_H -#define PARQUET_FILE_READER_H - -#include -#include -#include -#include -#include - -#include "parquet/thrift/parquet_types.h" - -#include "parquet/types.h" -#include "parquet/schema/descriptor.h" -#include "parquet/util/input.h" - -namespace parquet_cpp { - -class ColumnReader; -class ParquetFileReader; - -class RowGroupReader { - public: - RowGroupReader(ParquetFileReader* parent, parquet::RowGroup* group) : - parent_(parent), - row_group_(group) {} - - // Construct a ColumnReader for the indicated row group-relative - // column. Ownership is shared with the RowGroupReader. - std::shared_ptr Column(size_t i); - - const parquet::ColumnMetaData* column_metadata(size_t i) const { - return &row_group_->columns[i].meta_data; - } - - size_t num_columns() const { - return row_group_->columns.size(); - } - - private: - friend class ParquetFileReader; - - ParquetFileReader* parent_; - parquet::RowGroup* row_group_; - - // Column index -> ColumnReader - std::unordered_map > column_readers_; -}; - - -class ParquetFileReader { - public: - ParquetFileReader(); - ~ParquetFileReader(); - - // This class does _not_ take ownership of the file. You must manage its - // lifetime separately - void Open(RandomAccessSource* buffer); - - void Close(); - - void ParseMetaData(); - - // The RowGroupReader is owned by the FileReader - RowGroupReader* RowGroup(size_t i); - - size_t num_row_groups() const { - return metadata_.row_groups.size(); - } - - const ColumnDescriptor* column_descr(size_t i) const { - return schema_descr_.Column(i); - } - - size_t num_columns() const { - return schema_descr_.num_columns(); - } - - const parquet::FileMetaData& metadata() const { - return metadata_; - } - - void DebugPrint(std::ostream& stream, bool print_values = true); - - private: - friend class RowGroupReader; - - parquet::FileMetaData metadata_; - SchemaDescriptor schema_descr_; - - bool parsed_metadata_; - - // Row group index -> RowGroupReader - std::unordered_map > row_group_readers_; - - RandomAccessSource* buffer_; -}; - - -} // namespace parquet_cpp - -#endif // PARQUET_FILE_READER_H diff --git a/cpp/src/parquet/schema/descriptor.h b/cpp/src/parquet/schema/descriptor.h index 144666f0c90b4..d27dcc1e35e24 100644 --- a/cpp/src/parquet/schema/descriptor.h +++ b/cpp/src/parquet/schema/descriptor.h @@ -98,6 +98,10 @@ class SchemaDescriptor { return leaves_.size(); } + const schema::NodePtr& schema() const { + return schema_; + } + private: friend class ColumnDescriptor;