From bbd34ba96d98865a562b1d0d43fb03f4751b80f1 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 29 Feb 2016 16:17:16 -0800 Subject: [PATCH] PARQUET-520: Add MemoryMapSource and add unit tests for both it and LocalFileSource I also added the `file_descriptor` API so that we can verify that dtors elsewhere successfully close open files. Closes #56 Author: Wes McKinney Closes #66 from wesm/PARQUET-520 and squashes the following commits: 9d638ba [Wes McKinney] Add memory-mapping option to ParquetFileReader::OpenFile. Add --no-memory-map flag to parquet_reader 6389683 [Wes McKinney] Add Read API tests dbf6a45 [Wes McKinney] Test some failure modes for LocalFileSource / MemoryMapSource 01a7d64 [Wes McKinney] Add a MemoryMapSource and use this by default for SerializedFileReader Change-Id: I467fcda7439d36c244d74bf5fec0ae61f6b674f0 --- cpp/src/parquet/file/reader.cc | 10 +- cpp/src/parquet/file/reader.h | 3 +- cpp/src/parquet/util/CMakeLists.txt | 2 +- cpp/src/parquet/util/input-output-test.cc | 125 ++++++++++++++++++++++ cpp/src/parquet/util/input.cc | 96 ++++++++++++++++- cpp/src/parquet/util/input.h | 39 ++++++- cpp/src/parquet/util/output-test.cc | 46 -------- 7 files changed, 263 insertions(+), 58 deletions(-) create mode 100644 cpp/src/parquet/util/input-output-test.cc delete mode 100644 cpp/src/parquet/util/output-test.cc diff --git a/cpp/src/parquet/file/reader.cc b/cpp/src/parquet/file/reader.cc index 490147176e577..fcbe4530fcbf2 100644 --- a/cpp/src/parquet/file/reader.cc +++ b/cpp/src/parquet/file/reader.cc @@ -67,8 +67,14 @@ RowGroupStatistics RowGroupReader::GetColumnStats(int i) const { ParquetFileReader::ParquetFileReader() : schema_(nullptr) {} ParquetFileReader::~ParquetFileReader() {} -std::unique_ptr ParquetFileReader::OpenFile(const std::string& path) { - std::unique_ptr file(new LocalFileSource()); +std::unique_ptr ParquetFileReader::OpenFile(const std::string& path, + bool memory_map) { + std::unique_ptr file; + if (memory_map) { + file.reset(new MemoryMapSource()); + } else { + file.reset(new LocalFileSource()); + } file->Open(path); auto contents = SerializedFile::Open(std::move(file)); diff --git a/cpp/src/parquet/file/reader.h b/cpp/src/parquet/file/reader.h index fcc2c1807e5ab..94f593136d523 100644 --- a/cpp/src/parquet/file/reader.h +++ b/cpp/src/parquet/file/reader.h @@ -89,7 +89,8 @@ class ParquetFileReader { ~ParquetFileReader(); // API Convenience to open a serialized Parquet file on disk - static std::unique_ptr OpenFile(const std::string& path); + static std::unique_ptr OpenFile(const std::string& path, + bool memory_map = true); void Open(std::unique_ptr contents); void Close(); diff --git a/cpp/src/parquet/util/CMakeLists.txt b/cpp/src/parquet/util/CMakeLists.txt index a009129dc9cc5..c8d2c2fa3d2be 100644 --- a/cpp/src/parquet/util/CMakeLists.txt +++ b/cpp/src/parquet/util/CMakeLists.txt @@ -63,6 +63,6 @@ endif() ADD_PARQUET_TEST(bit-util-test) ADD_PARQUET_TEST(buffer-test) +ADD_PARQUET_TEST(input-output-test) ADD_PARQUET_TEST(mem-pool-test) -ADD_PARQUET_TEST(output-test) ADD_PARQUET_TEST(rle-test) diff --git a/cpp/src/parquet/util/input-output-test.cc b/cpp/src/parquet/util/input-output-test.cc new file mode 100644 index 0000000000000..424be3a2193f9 --- /dev/null +++ b/cpp/src/parquet/util/input-output-test.cc @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "parquet/exception.h" +#include "parquet/util/buffer.h" +#include "parquet/util/input.h" +#include "parquet/util/output.h" +#include "parquet/util/test-common.h" + +namespace parquet_cpp { + +TEST(TestInMemoryOutputStream, Basics) { + std::unique_ptr stream(new InMemoryOutputStream(8)); + + std::vector data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; + + stream->Write(&data[0], 4); + ASSERT_EQ(4, stream->Tell()); + stream->Write(&data[4], data.size() - 4); + + std::shared_ptr buffer = stream->GetBuffer(); + + Buffer data_buf(data.data(), data.size()); + + ASSERT_TRUE(data_buf.Equals(*buffer)); +} + +static bool file_exists(const std::string& path) { + return std::ifstream(path.c_str()).good(); +} + +template +class TestFileReaders : public ::testing::Test { + public: + void SetUp() { + test_path_ = "parquet-input-output-test.txt"; + if (file_exists(test_path_)) { + std::remove(test_path_.c_str()); + } + test_data_ = "testingdata"; + + std::ofstream stream; + stream.open(test_path_.c_str()); + stream << test_data_; + filesize_ = test_data_.size(); + } + + void TearDown() { + DeleteTestFile(); + } + + void DeleteTestFile() { + if (file_exists(test_path_)) { + std::remove(test_path_.c_str()); + } + } + + protected: + ReaderType source; + std::string test_path_; + std::string test_data_; + int filesize_; +}; + +typedef ::testing::Types ReaderTypes; + +TYPED_TEST_CASE(TestFileReaders, ReaderTypes); + +TYPED_TEST(TestFileReaders, NonExistentFile) { + ASSERT_THROW(this->source.Open("0xDEADBEEF.txt"), ParquetException); +} + +TYPED_TEST(TestFileReaders, Read) { + this->source.Open(this->test_path_); + + ASSERT_EQ(this->filesize_, this->source.Size()); + + std::shared_ptr buffer = this->source.Read(4); + ASSERT_EQ(4, buffer->size()); + ASSERT_EQ(0, memcmp(this->test_data_.c_str(), buffer->data(), 4)); + + // Read past EOF + buffer = this->source.Read(10); + ASSERT_EQ(7, buffer->size()); + ASSERT_EQ(0, memcmp(this->test_data_.c_str() + 4, buffer->data(), 7)); +} + +TYPED_TEST(TestFileReaders, FileDisappeared) { + this->source.Open(this->test_path_); + this->source.Seek(4); + this->DeleteTestFile(); + this->source.Close(); +} + +TYPED_TEST(TestFileReaders, BadSeek) { + this->source.Open(this->test_path_); + + ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException); +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/util/input.cc b/cpp/src/parquet/util/input.cc index a238ff688ac9d..897d81c5cf93b 100644 --- a/cpp/src/parquet/util/input.cc +++ b/cpp/src/parquet/util/input.cc @@ -17,7 +17,9 @@ #include "parquet/util/input.h" +#include #include +#include #include #include "parquet/exception.h" @@ -42,13 +44,32 @@ LocalFileSource::~LocalFileSource() { void LocalFileSource::Open(const std::string& path) { path_ = path; - file_ = fopen(path_.c_str(), "r"); + file_ = fopen(path_.c_str(), "rb"); + if (file_ == nullptr || ferror(file_)) { + std::stringstream ss; + ss << "Unable to open file: " << path; + throw ParquetException(ss.str()); + } is_open_ = true; - fseek(file_, 0L, SEEK_END); - size_ = Tell(); + SeekFile(0, SEEK_END); + size_ = LocalFileSource::Tell(); Seek(0); } +void LocalFileSource::SeekFile(int64_t pos, int origin) { + if (origin == SEEK_SET && (pos < 0 || pos >= size_)) { + std::stringstream ss; + ss << "Position " << pos << " is not in range."; + throw ParquetException(ss.str()); + } + + if (0 != fseek(file_, pos, origin)) { + std::stringstream ss; + ss << "File seek to position " << pos << " failed."; + throw ParquetException(ss.str()); + } +} + void LocalFileSource::Close() { // Pure virtual CloseFile(); @@ -62,7 +83,7 @@ void LocalFileSource::CloseFile() { } void LocalFileSource::Seek(int64_t pos) { - fseek(file_, pos, SEEK_SET); + SeekFile(pos); } int64_t LocalFileSource::Size() const { @@ -70,7 +91,15 @@ int64_t LocalFileSource::Size() const { } int64_t LocalFileSource::Tell() const { - return ftell(file_); + int64_t position = ftell(file_); + if (position < 0) { + throw ParquetException("ftell failed, did the file disappear?"); + } + return position; +} + +int LocalFileSource::file_descriptor() const { + return fileno(file_); } int64_t LocalFileSource::Read(int64_t nbytes, uint8_t* buffer) { @@ -87,6 +116,63 @@ std::shared_ptr LocalFileSource::Read(int64_t nbytes) { } return result; } +// ---------------------------------------------------------------------- +// MemoryMapSource methods + +MemoryMapSource::~MemoryMapSource() { + CloseFile(); +} + +void MemoryMapSource::Open(const std::string& path) { + LocalFileSource::Open(path); + data_ = reinterpret_cast(mmap(nullptr, size_, PROT_READ, + MAP_SHARED, fileno(file_), 0)); + if (data_ == nullptr) { + throw ParquetException("Memory mapping file failed"); + } + pos_ = 0; +} + +void MemoryMapSource::Close() { + // Pure virtual + CloseFile(); +} + +void MemoryMapSource::CloseFile() { + if (data_ != nullptr) { + munmap(data_, size_); + } + + LocalFileSource::CloseFile(); +} + +void MemoryMapSource::Seek(int64_t pos) { + if (pos < 0 || pos >= size_) { + std::stringstream ss; + ss << "Position " << pos << " is not in range."; + throw ParquetException(ss.str()); + } + + pos_ = pos; +} + +int64_t MemoryMapSource::Tell() const { + return pos_; +} + +int64_t MemoryMapSource::Read(int64_t nbytes, uint8_t* buffer) { + int64_t bytes_available = std::min(nbytes, size_ - pos_); + memcpy(buffer, data_ + pos_, bytes_available); + pos_ += bytes_available; + return bytes_available; +} + +std::shared_ptr MemoryMapSource::Read(int64_t nbytes) { + int64_t bytes_available = std::min(nbytes, size_ - pos_); + auto result = std::make_shared(data_ + pos_, bytes_available); + pos_ += bytes_available; + return result; +} // ---------------------------------------------------------------------- // BufferReader diff --git a/cpp/src/parquet/util/input.h b/cpp/src/parquet/util/input.h index 5f2bde3a51f4a..80fb730ed5e2d 100644 --- a/cpp/src/parquet/util/input.h +++ b/cpp/src/parquet/util/input.h @@ -18,8 +18,8 @@ #ifndef PARQUET_UTIL_INPUT_H #define PARQUET_UTIL_INPUT_H -#include #include +#include #include #include #include @@ -58,7 +58,7 @@ class LocalFileSource : public RandomAccessSource { LocalFileSource() : file_(nullptr), is_open_(false) {} virtual ~LocalFileSource(); - void Open(const std::string& path); + virtual void Open(const std::string& path); virtual void Close(); virtual int64_t Size() const; @@ -73,14 +73,47 @@ class LocalFileSource : public RandomAccessSource { bool is_open() const { return is_open_;} const std::string& path() const { return path_;} - private: + // Return the integer file descriptor + int file_descriptor() const; + + protected: void CloseFile(); + void SeekFile(int64_t pos, int origin = SEEK_SET); std::string path_; FILE* file_; bool is_open_; }; +class MemoryMapSource : public LocalFileSource { + public: + MemoryMapSource() : + LocalFileSource(), + data_(nullptr), + pos_(0) {} + + virtual ~MemoryMapSource(); + + virtual void Close(); + virtual void Open(const std::string& path); + + virtual int64_t Tell() const; + virtual void Seek(int64_t pos); + + // Copy data from memory map into out (must be already allocated memory) + // @returns: actual number of bytes read + virtual int64_t Read(int64_t nbytes, uint8_t* out); + + // Return a buffer referencing memory-map (no copy) + virtual std::shared_ptr Read(int64_t nbytes); + + private: + void CloseFile(); + + uint8_t* data_; + int64_t pos_; +}; + // ---------------------------------------------------------------------- // A file-like object that reads from virtual address space diff --git a/cpp/src/parquet/util/output-test.cc b/cpp/src/parquet/util/output-test.cc deleted file mode 100644 index bae184a30c011..0000000000000 --- a/cpp/src/parquet/util/output-test.cc +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include - -#include -#include -#include - -#include "parquet/util/buffer.h" -#include "parquet/util/output.h" -#include "parquet/util/test-common.h" - -namespace parquet_cpp { - -TEST(TestInMemoryOutputStream, Basics) { - std::unique_ptr stream(new InMemoryOutputStream(8)); - - std::vector data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; - - stream->Write(&data[0], 4); - ASSERT_EQ(4, stream->Tell()); - stream->Write(&data[4], data.size() - 4); - - std::shared_ptr buffer = stream->GetBuffer(); - - Buffer data_buf(data.data(), data.size()); - - ASSERT_TRUE(data_buf.Equals(*buffer)); -} - -} // namespace parquet_cpp