Skip to content

Commit

Permalink
PARQUET-520: Add MemoryMapSource and add unit tests for both it and L…
Browse files Browse the repository at this point in the history
…ocalFileSource

I also added the `file_descriptor` API so that we can verify that dtors elsewhere successfully close open files. Closes apache#56

Author: Wes McKinney <[email protected]>

Closes apache#66 from wesm/PARQUET-520 and squashes the following commits:

9d638ba [Wes McKinney] Add memory-mapping option to ParquetFileReader::OpenFile. Add --no-memory-map flag to parquet_reader
6389683 [Wes McKinney] Add Read API tests
dbf6a45 [Wes McKinney] Test some failure modes for LocalFileSource / MemoryMapSource
01a7d64 [Wes McKinney] Add a MemoryMapSource and use this by default for SerializedFileReader

Change-Id: I467fcda7439d36c244d74bf5fec0ae61f6b674f0
  • Loading branch information
wesm authored and julienledem committed Mar 1, 2016
1 parent 61900cf commit bbd34ba
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 58 deletions.
10 changes: 8 additions & 2 deletions cpp/src/parquet/file/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,14 @@ RowGroupStatistics RowGroupReader::GetColumnStats(int i) const {
ParquetFileReader::ParquetFileReader() : schema_(nullptr) {}
ParquetFileReader::~ParquetFileReader() {}

std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string& path) {
std::unique_ptr<LocalFileSource> file(new LocalFileSource());
std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string& path,
bool memory_map) {
std::unique_ptr<LocalFileSource> file;
if (memory_map) {
file.reset(new MemoryMapSource());
} else {
file.reset(new LocalFileSource());
}
file->Open(path);

auto contents = SerializedFile::Open(std::move(file));
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/parquet/file/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ class ParquetFileReader {
~ParquetFileReader();

// API Convenience to open a serialized Parquet file on disk
static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path);
static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path,
bool memory_map = true);

void Open(std::unique_ptr<Contents> contents);
void Close();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,6 @@ endif()

ADD_PARQUET_TEST(bit-util-test)
ADD_PARQUET_TEST(buffer-test)
ADD_PARQUET_TEST(input-output-test)
ADD_PARQUET_TEST(mem-pool-test)
ADD_PARQUET_TEST(output-test)
ADD_PARQUET_TEST(rle-test)
125 changes: 125 additions & 0 deletions cpp/src/parquet/util/input-output-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include <cstdint>
#include <cstdio>
#include <fstream>
#include <iostream>
#include <memory>
#include <string>
#include <vector>

#include "parquet/exception.h"
#include "parquet/util/buffer.h"
#include "parquet/util/input.h"
#include "parquet/util/output.h"
#include "parquet/util/test-common.h"

namespace parquet_cpp {

TEST(TestInMemoryOutputStream, Basics) {
std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8));

std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};

stream->Write(&data[0], 4);
ASSERT_EQ(4, stream->Tell());
stream->Write(&data[4], data.size() - 4);

std::shared_ptr<Buffer> buffer = stream->GetBuffer();

Buffer data_buf(data.data(), data.size());

ASSERT_TRUE(data_buf.Equals(*buffer));
}

static bool file_exists(const std::string& path) {
return std::ifstream(path.c_str()).good();
}

template <typename ReaderType>
class TestFileReaders : public ::testing::Test {
public:
void SetUp() {
test_path_ = "parquet-input-output-test.txt";
if (file_exists(test_path_)) {
std::remove(test_path_.c_str());
}
test_data_ = "testingdata";

std::ofstream stream;
stream.open(test_path_.c_str());
stream << test_data_;
filesize_ = test_data_.size();
}

void TearDown() {
DeleteTestFile();
}

void DeleteTestFile() {
if (file_exists(test_path_)) {
std::remove(test_path_.c_str());
}
}

protected:
ReaderType source;
std::string test_path_;
std::string test_data_;
int filesize_;
};

typedef ::testing::Types<LocalFileSource, MemoryMapSource> ReaderTypes;

TYPED_TEST_CASE(TestFileReaders, ReaderTypes);

TYPED_TEST(TestFileReaders, NonExistentFile) {
ASSERT_THROW(this->source.Open("0xDEADBEEF.txt"), ParquetException);
}

TYPED_TEST(TestFileReaders, Read) {
this->source.Open(this->test_path_);

ASSERT_EQ(this->filesize_, this->source.Size());

std::shared_ptr<Buffer> buffer = this->source.Read(4);
ASSERT_EQ(4, buffer->size());
ASSERT_EQ(0, memcmp(this->test_data_.c_str(), buffer->data(), 4));

// Read past EOF
buffer = this->source.Read(10);
ASSERT_EQ(7, buffer->size());
ASSERT_EQ(0, memcmp(this->test_data_.c_str() + 4, buffer->data(), 7));
}

TYPED_TEST(TestFileReaders, FileDisappeared) {
this->source.Open(this->test_path_);
this->source.Seek(4);
this->DeleteTestFile();
this->source.Close();
}

TYPED_TEST(TestFileReaders, BadSeek) {
this->source.Open(this->test_path_);

ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException);
}

} // namespace parquet_cpp
96 changes: 91 additions & 5 deletions cpp/src/parquet/util/input.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

#include "parquet/util/input.h"

#include <sys/mman.h>
#include <algorithm>
#include <sstream>
#include <string>

#include "parquet/exception.h"
Expand All @@ -42,13 +44,32 @@ LocalFileSource::~LocalFileSource() {

void LocalFileSource::Open(const std::string& path) {
path_ = path;
file_ = fopen(path_.c_str(), "r");
file_ = fopen(path_.c_str(), "rb");
if (file_ == nullptr || ferror(file_)) {
std::stringstream ss;
ss << "Unable to open file: " << path;
throw ParquetException(ss.str());
}
is_open_ = true;
fseek(file_, 0L, SEEK_END);
size_ = Tell();
SeekFile(0, SEEK_END);
size_ = LocalFileSource::Tell();
Seek(0);
}

void LocalFileSource::SeekFile(int64_t pos, int origin) {
if (origin == SEEK_SET && (pos < 0 || pos >= size_)) {
std::stringstream ss;
ss << "Position " << pos << " is not in range.";
throw ParquetException(ss.str());
}

if (0 != fseek(file_, pos, origin)) {
std::stringstream ss;
ss << "File seek to position " << pos << " failed.";
throw ParquetException(ss.str());
}
}

void LocalFileSource::Close() {
// Pure virtual
CloseFile();
Expand All @@ -62,15 +83,23 @@ void LocalFileSource::CloseFile() {
}

void LocalFileSource::Seek(int64_t pos) {
fseek(file_, pos, SEEK_SET);
SeekFile(pos);
}

int64_t LocalFileSource::Size() const {
return size_;
}

int64_t LocalFileSource::Tell() const {
return ftell(file_);
int64_t position = ftell(file_);
if (position < 0) {
throw ParquetException("ftell failed, did the file disappear?");
}
return position;
}

int LocalFileSource::file_descriptor() const {
return fileno(file_);
}

int64_t LocalFileSource::Read(int64_t nbytes, uint8_t* buffer) {
Expand All @@ -87,6 +116,63 @@ std::shared_ptr<Buffer> LocalFileSource::Read(int64_t nbytes) {
}
return result;
}
// ----------------------------------------------------------------------
// MemoryMapSource methods

MemoryMapSource::~MemoryMapSource() {
CloseFile();
}

void MemoryMapSource::Open(const std::string& path) {
LocalFileSource::Open(path);
data_ = reinterpret_cast<uint8_t*>(mmap(nullptr, size_, PROT_READ,
MAP_SHARED, fileno(file_), 0));
if (data_ == nullptr) {
throw ParquetException("Memory mapping file failed");
}
pos_ = 0;
}

void MemoryMapSource::Close() {
// Pure virtual
CloseFile();
}

void MemoryMapSource::CloseFile() {
if (data_ != nullptr) {
munmap(data_, size_);
}

LocalFileSource::CloseFile();
}

void MemoryMapSource::Seek(int64_t pos) {
if (pos < 0 || pos >= size_) {
std::stringstream ss;
ss << "Position " << pos << " is not in range.";
throw ParquetException(ss.str());
}

pos_ = pos;
}

int64_t MemoryMapSource::Tell() const {
return pos_;
}

int64_t MemoryMapSource::Read(int64_t nbytes, uint8_t* buffer) {
int64_t bytes_available = std::min(nbytes, size_ - pos_);
memcpy(buffer, data_ + pos_, bytes_available);
pos_ += bytes_available;
return bytes_available;
}

std::shared_ptr<Buffer> MemoryMapSource::Read(int64_t nbytes) {
int64_t bytes_available = std::min(nbytes, size_ - pos_);
auto result = std::make_shared<Buffer>(data_ + pos_, bytes_available);
pos_ += bytes_available;
return result;
}

// ----------------------------------------------------------------------
// BufferReader
Expand Down
39 changes: 36 additions & 3 deletions cpp/src/parquet/util/input.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
#ifndef PARQUET_UTIL_INPUT_H
#define PARQUET_UTIL_INPUT_H

#include <stdio.h>
#include <cstdint>
#include <cstdio>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -58,7 +58,7 @@ class LocalFileSource : public RandomAccessSource {
LocalFileSource() : file_(nullptr), is_open_(false) {}
virtual ~LocalFileSource();

void Open(const std::string& path);
virtual void Open(const std::string& path);

virtual void Close();
virtual int64_t Size() const;
Expand All @@ -73,14 +73,47 @@ class LocalFileSource : public RandomAccessSource {
bool is_open() const { return is_open_;}
const std::string& path() const { return path_;}

private:
// Return the integer file descriptor
int file_descriptor() const;

protected:
void CloseFile();
void SeekFile(int64_t pos, int origin = SEEK_SET);

std::string path_;
FILE* file_;
bool is_open_;
};

class MemoryMapSource : public LocalFileSource {
public:
MemoryMapSource() :
LocalFileSource(),
data_(nullptr),
pos_(0) {}

virtual ~MemoryMapSource();

virtual void Close();
virtual void Open(const std::string& path);

virtual int64_t Tell() const;
virtual void Seek(int64_t pos);

// Copy data from memory map into out (must be already allocated memory)
// @returns: actual number of bytes read
virtual int64_t Read(int64_t nbytes, uint8_t* out);

// Return a buffer referencing memory-map (no copy)
virtual std::shared_ptr<Buffer> Read(int64_t nbytes);

private:
void CloseFile();

uint8_t* data_;
int64_t pos_;
};

// ----------------------------------------------------------------------
// A file-like object that reads from virtual address space

Expand Down
Loading

0 comments on commit bbd34ba

Please sign in to comment.