diff --git a/cpp/src/parquet/column/column-reader-test.cc b/cpp/src/parquet/column/column-reader-test.cc index 0d4aea16b3dd3..84a36dbae3429 100644 --- a/cpp/src/parquet/column/column-reader-test.cc +++ b/cpp/src/parquet/column/column-reader-test.cc @@ -29,6 +29,7 @@ #include "parquet/column/reader.h" #include "parquet/column/test-util.h" +#include "parquet/util/output.h" #include "parquet/util/test-common.h" using std::string; @@ -60,31 +61,15 @@ class TestPrimitiveReader : public ::testing::Test { vector > pages_; }; -template -static vector slice(const vector& values, size_t start, size_t end) { - if (end < start) { - return vector(0); - } - - vector out(end - start); - for (size_t i = start; i < end; ++i) { - out[i - start] = values[i]; - } - return out; -} - TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { vector values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; - size_t num_values = values.size(); - parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN; - vector page1; - test::DataPageBuilder page_builder(&page1); - page_builder.AppendValues(values, parquet::Encoding::PLAIN); - pages_.push_back(page_builder.Finish()); + std::vector buffer; + std::shared_ptr page = MakeDataPage(values, {}, 0, + {}, 0, &buffer); + pages_.push_back(page); - // TODO: simplify this NodePtr type = schema::Int32("a", Repetition::REQUIRED); ColumnDescriptor descr(type, 0, 0); InitReader(&descr); @@ -102,21 +87,16 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { ASSERT_TRUE(vector_equal(result, values)); } + TEST_F(TestPrimitiveReader, TestInt32FlatOptional) { vector values = {1, 2, 3, 4, 5}; vector def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1}; - size_t num_values = values.size(); - parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN; - - vector page1; - test::DataPageBuilder page_builder(&page1); - - // Definition levels precede the values - page_builder.AppendDefLevels(def_levels, 1, parquet::Encoding::RLE); - page_builder.AppendValues(values, parquet::Encoding::PLAIN); + std::vector buffer; + std::shared_ptr page = MakeDataPage(values, def_levels, 1, + {}, 0, &buffer); - pages_.push_back(page_builder.Finish()); + pages_.push_back(page); NodePtr type = schema::Int32("a", Repetition::OPTIONAL); ColumnDescriptor descr(type, 1, 0); @@ -159,18 +139,11 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) { vector def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1}; vector rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1}; - size_t num_values = values.size(); - parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN; - - vector page1; - test::DataPageBuilder page_builder(&page1); - - // Definition levels precede the values - page_builder.AppendRepLevels(rep_levels, 1, parquet::Encoding::RLE); - page_builder.AppendDefLevels(def_levels, 2, parquet::Encoding::RLE); - page_builder.AppendValues(values, parquet::Encoding::PLAIN); + std::vector buffer; + std::shared_ptr page = MakeDataPage(values, + def_levels, 2, rep_levels, 1, &buffer); - pages_.push_back(page_builder.Finish()); + pages_.push_back(page); NodePtr type = schema::Int32("a", Repetition::REPEATED); ColumnDescriptor descr(type, 2, 1); diff --git a/cpp/src/parquet/column/serialized-page.cc b/cpp/src/parquet/column/serialized-page.cc index 1cbaf4d399a89..b9d470c07c147 100644 --- a/cpp/src/parquet/column/serialized-page.cc +++ b/cpp/src/parquet/column/serialized-page.cc @@ -21,7 +21,6 @@ #include "parquet/exception.h" #include "parquet/thrift/util.h" -#include "parquet/util/input_stream.h" using parquet::PageType; @@ -52,7 +51,7 @@ std::shared_ptr SerializedPageReader::NextPage() { // Loop here because there may be unhandled page types that we skip until // finding a page that we do know what to do with while (true) { - int bytes_read = 0; + int64_t bytes_read = 0; const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read); if (bytes_read == 0) { return std::shared_ptr(nullptr); diff --git a/cpp/src/parquet/column/serialized-page.h b/cpp/src/parquet/column/serialized-page.h index 2735c3cd535a9..c02152ffcc335 100644 --- a/cpp/src/parquet/column/serialized-page.h +++ b/cpp/src/parquet/column/serialized-page.h @@ -27,7 +27,7 @@ #include "parquet/column/page.h" #include "parquet/compression/codec.h" -#include "parquet/util/input_stream.h" +#include "parquet/util/input.h" #include "parquet/thrift/parquet_types.h" namespace parquet_cpp { diff --git a/cpp/src/parquet/column/test-util.h b/cpp/src/parquet/column/test-util.h index 8861134dffe2e..1cbcf8c9bb62c 100644 --- a/cpp/src/parquet/column/test-util.h +++ b/cpp/src/parquet/column/test-util.h @@ -52,26 +52,22 @@ class MockPageReader : public PageReader { size_t page_index_; }; -// TODO(wesm): this is only used for testing for now - -static constexpr int DEFAULT_DATA_PAGE_SIZE = 64 * 1024; -static constexpr int INIT_BUFFER_SIZE = 1024; +// TODO(wesm): this is only used for testing for now. Refactor to form part of +// primary file write path template class DataPageBuilder { public: typedef typename type_traits::value_type T; - // The passed vector is the owner of the page's data - explicit DataPageBuilder(std::vector* out) : - out_(out), - buffer_size_(0), + // This class writes data and metadata to the passed inputs + explicit DataPageBuilder(InMemoryOutputStream* sink, parquet::DataPageHeader* header) : + sink_(sink), + header_(header), num_values_(0), have_def_levels_(false), have_rep_levels_(false), have_values_(false) { - out_->resize(INIT_BUFFER_SIZE); - buffer_capacity_ = INIT_BUFFER_SIZE; } void AppendDefLevels(const std::vector& levels, @@ -79,7 +75,7 @@ class DataPageBuilder { AppendLevels(levels, max_level, encoding); num_values_ = std::max(levels.size(), num_values_); - header_.__set_definition_level_encoding(encoding); + header_->__set_definition_level_encoding(encoding); have_def_levels_ = true; } @@ -88,7 +84,7 @@ class DataPageBuilder { AppendLevels(levels, max_level, encoding); num_values_ = std::max(levels.size(), num_values_); - header_.__set_repetition_level_encoding(encoding); + header_->__set_repetition_level_encoding(encoding); have_rep_levels_ = true; } @@ -98,53 +94,31 @@ class DataPageBuilder { ParquetException::NYI("only plain encoding currently implemented"); } size_t bytes_to_encode = values.size() * sizeof(T); - Reserve(bytes_to_encode); PlainEncoder encoder(nullptr); - size_t nbytes = encoder.Encode(&values[0], values.size(), Head()); - // In case for some reason it's fewer than bytes_to_encode - buffer_size_ += nbytes; + encoder.Encode(&values[0], values.size(), sink_); num_values_ = std::max(values.size(), num_values_); - header_.__set_encoding(encoding); + header_->__set_encoding(encoding); have_values_ = true; } - std::shared_ptr Finish() { + void Finish() { if (!have_values_) { throw ParquetException("A data page must at least contain values"); } - header_.__set_num_values(num_values_); - return std::make_shared(&(*out_)[0], buffer_size_, header_); + header_->__set_num_values(num_values_); } private: - std::vector* out_; - - size_t buffer_size_; - size_t buffer_capacity_; - - parquet::DataPageHeader header_; + InMemoryOutputStream* sink_; + parquet::DataPageHeader* header_; size_t num_values_; - bool have_def_levels_; bool have_rep_levels_; bool have_values_; - void Reserve(size_t nbytes) { - while ((nbytes + buffer_size_) > buffer_capacity_) { - // TODO(wesm): limit to one reserve when this loop runs more than once - size_t new_capacity = 2 * buffer_capacity_; - out_->resize(new_capacity); - buffer_capacity_ = new_capacity; - } - } - - uint8_t* Head() { - return &(*out_)[buffer_size_]; - } - // Used internally for both repetition and definition levels void AppendLevels(const std::vector& levels, int16_t max_level, parquet::Encoding::type encoding) { @@ -153,9 +127,11 @@ class DataPageBuilder { } // TODO: compute a more precise maximum size for the encoded levels - std::vector encode_buffer(DEFAULT_DATA_PAGE_SIZE); - + std::vector encode_buffer(levels.size() * 4); + // We encode into separate memory from the output stream because the + // RLE-encoded bytes have to be preceded in the stream by their absolute + // size. LevelEncoder encoder; encoder.Init(encoding, max_level, levels.size(), encode_buffer.data(), encode_buffer.size()); @@ -163,15 +139,43 @@ class DataPageBuilder { encoder.Encode(levels.size(), levels.data()); uint32_t rle_bytes = encoder.len(); - size_t levels_footprint = sizeof(uint32_t) + rle_bytes; - Reserve(levels_footprint); - - *reinterpret_cast(Head()) = rle_bytes; - memcpy(Head() + sizeof(uint32_t), encode_buffer.data(), rle_bytes); - buffer_size_ += levels_footprint; + sink_->Write(reinterpret_cast(&rle_bytes), sizeof(uint32_t)); + sink_->Write(encode_buffer.data(), rle_bytes); } }; +template +static std::shared_ptr MakeDataPage(const std::vector& values, + const std::vector& def_levels, int16_t max_def_level, + const std::vector& rep_levels, int16_t max_rep_level, + std::vector* out_buffer) { + size_t num_values = values.size(); + + InMemoryOutputStream page_stream; + parquet::DataPageHeader page_header; + + test::DataPageBuilder page_builder(&page_stream, &page_header); + + if (!rep_levels.empty()) { + page_builder.AppendRepLevels(rep_levels, max_rep_level, + parquet::Encoding::RLE); + } + + if (!def_levels.empty()) { + page_builder.AppendDefLevels(def_levels, max_def_level, + parquet::Encoding::RLE); + } + + page_builder.AppendValues(values, parquet::Encoding::PLAIN); + page_builder.Finish(); + + // Hand off the data stream to the passed std::vector + page_stream.Transfer(out_buffer); + + return std::make_shared(&(*out_buffer)[0], out_buffer->size(), page_header); +} + + } // namespace test } // namespace parquet_cpp diff --git a/cpp/src/parquet/encodings/encodings.h b/cpp/src/parquet/encodings/encodings.h index 21754d13da952..46c61b6cfa10d 100644 --- a/cpp/src/parquet/encodings/encodings.h +++ b/cpp/src/parquet/encodings/encodings.h @@ -23,6 +23,7 @@ #include "parquet/exception.h" #include "parquet/types.h" +#include "parquet/util/output.h" #include "parquet/util/rle-encoding.h" #include "parquet/util/bit-stream-utils.inline.h" @@ -82,14 +83,9 @@ class Encoder { virtual ~Encoder() {} - // TODO(wesm): use an output stream - // Subclasses should override the ones they support - // - // @returns: the number of bytes written to dst - virtual size_t Encode(const T* src, int num_values, uint8_t* dst) { + virtual void Encode(const T* src, int num_values, OutputStream* dst) { throw ParquetException("Encoder does not implement this type."); - return 0; } const parquet::Encoding::type encoding() const { return encoding_; } diff --git a/cpp/src/parquet/encodings/plain-encoding-test.cc b/cpp/src/parquet/encodings/plain-encoding-test.cc index ca425dd3bf242..16862b86d7d91 100644 --- a/cpp/src/parquet/encodings/plain-encoding-test.cc +++ b/cpp/src/parquet/encodings/plain-encoding-test.cc @@ -43,15 +43,18 @@ TEST(BooleanTest, TestEncodeDecode) { PlainEncoder encoder(nullptr); PlainDecoder decoder(nullptr); - std::vector encode_buffer(nbytes); + InMemoryOutputStream dst; + encoder.Encode(draws, nvalues, &dst); - size_t encoded_bytes = encoder.Encode(draws, nvalues, &encode_buffer[0]); - ASSERT_EQ(nbytes, encoded_bytes); + std::vector encode_buffer; + dst.Transfer(&encode_buffer); + + ASSERT_EQ(nbytes, encode_buffer.size()); std::vector decode_buffer(nbytes); const uint8_t* decode_data = &decode_buffer[0]; - decoder.SetData(nvalues, &encode_buffer[0], encoded_bytes); + decoder.SetData(nvalues, &encode_buffer[0], encode_buffer.size()); size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues); ASSERT_EQ(nvalues, values_decoded); diff --git a/cpp/src/parquet/encodings/plain-encoding.h b/cpp/src/parquet/encodings/plain-encoding.h index 03f5940c3c74c..a450eb489f96b 100644 --- a/cpp/src/parquet/encodings/plain-encoding.h +++ b/cpp/src/parquet/encodings/plain-encoding.h @@ -147,7 +147,7 @@ class PlainEncoder : public Encoder { explicit PlainEncoder(const ColumnDescriptor* descr) : Encoder(descr, parquet::Encoding::PLAIN) {} - virtual size_t Encode(const T* src, int num_values, uint8_t* dst); + virtual void Encode(const T* src, int num_values, OutputStream* dst); }; template <> @@ -156,43 +156,46 @@ class PlainEncoder : public Encoder { explicit PlainEncoder(const ColumnDescriptor* descr) : Encoder(descr, parquet::Encoding::PLAIN) {} - virtual size_t Encode(const bool* src, int num_values, uint8_t* dst) { + virtual void Encode(const bool* src, int num_values, OutputStream* dst) { throw ParquetException("this API for encoding bools not implemented"); - return 0; } - size_t Encode(const std::vector& src, int num_values, - uint8_t* dst) { + void Encode(const std::vector& src, int num_values, OutputStream* dst) { size_t bytes_required = BitUtil::RoundUp(num_values, 8) / 8; - BitWriter bit_writer(dst, bytes_required); + + // TODO(wesm) + // Use a temporary buffer for now and copy, because the BitWriter is not + // aware of OutputStream. Later we can add some kind of Request/Flush API + // to OutputStream + std::vector tmp_buffer(bytes_required); + + BitWriter bit_writer(&tmp_buffer[0], bytes_required); for (size_t i = 0; i < num_values; ++i) { bit_writer.PutValue(src[i], 1); } bit_writer.Flush(); - return bit_writer.bytes_written(); + + // Write the result to the output stream + dst->Write(bit_writer.buffer(), bit_writer.bytes_written()); } }; template -inline size_t PlainEncoder::Encode(const T* buffer, int num_values, - uint8_t* dst) { - size_t nbytes = num_values * sizeof(T); - memcpy(dst, buffer, nbytes); - return nbytes; +inline void PlainEncoder::Encode(const T* buffer, int num_values, + OutputStream* dst) { + dst->Write(reinterpret_cast(buffer), num_values * sizeof(T)); } template <> -inline size_t PlainEncoder::Encode(const ByteArray* src, - int num_values, uint8_t* dst) { +inline void PlainEncoder::Encode(const ByteArray* src, + int num_values, OutputStream* dst) { ParquetException::NYI("byte array encoding"); - return 0; } template <> -inline size_t PlainEncoder::Encode( - const FixedLenByteArray* src, int num_values, uint8_t* dst) { +inline void PlainEncoder::Encode( + const FixedLenByteArray* src, int num_values, OutputStream* dst) { ParquetException::NYI("FLBA encoding"); - return 0; } } // namespace parquet_cpp diff --git a/cpp/src/parquet/parquet.h b/cpp/src/parquet/parquet.h index 84a32f320a2c3..7030d0ed18d47 100644 --- a/cpp/src/parquet/parquet.h +++ b/cpp/src/parquet/parquet.h @@ -29,6 +29,8 @@ #include "parquet/exception.h" #include "parquet/reader.h" #include "parquet/column/reader.h" -#include "parquet/util/input_stream.h" + +#include "parquet/util/input.h" +#include "parquet/util/output.h" #endif diff --git a/cpp/src/parquet/reader-test.cc b/cpp/src/parquet/reader-test.cc index ffc882c002d7a..8da8b9992a685 100644 --- a/cpp/src/parquet/reader-test.cc +++ b/cpp/src/parquet/reader-test.cc @@ -25,6 +25,7 @@ #include "parquet/reader.h" #include "parquet/column/reader.h" #include "parquet/column/scanner.h" +#include "parquet/util/input.h" using std::string; @@ -47,7 +48,7 @@ class TestAllTypesPlain : public ::testing::Test { void TearDown() {} protected: - LocalFile file_; + LocalFileSource file_; ParquetFileReader reader_; }; diff --git a/cpp/src/parquet/reader.cc b/cpp/src/parquet/reader.cc index 2f30ebfe28c58..3fcce907ba5c2 100644 --- a/cpp/src/parquet/reader.cc +++ b/cpp/src/parquet/reader.cc @@ -31,55 +31,12 @@ #include "parquet/exception.h" #include "parquet/schema/converter.h" #include "parquet/thrift/util.h" -#include "parquet/util/input_stream.h" using std::string; using std::vector; namespace parquet_cpp { -// ---------------------------------------------------------------------- -// LocalFile methods - -LocalFile::~LocalFile() { - CloseFile(); -} - -void LocalFile::Open(const std::string& path) { - path_ = path; - file_ = fopen(path_.c_str(), "r"); - is_open_ = true; -} - -void LocalFile::Close() { - // Pure virtual - CloseFile(); -} - -void LocalFile::CloseFile() { - if (is_open_) { - fclose(file_); - is_open_ = false; - } -} - -size_t LocalFile::Size() { - fseek(file_, 0L, SEEK_END); - return Tell(); -} - -void LocalFile::Seek(size_t pos) { - fseek(file_, pos, SEEK_SET); -} - -size_t LocalFile::Tell() { - return ftell(file_); -} - -size_t LocalFile::Read(size_t nbytes, uint8_t* buffer) { - return fread(buffer, 1, nbytes, file_); -} - // ---------------------------------------------------------------------- // RowGroupReader @@ -102,7 +59,7 @@ std::shared_ptr RowGroupReader::Column(size_t i) { std::unique_ptr input( new ScopedInMemoryInputStream(col.meta_data.total_compressed_size)); - FileLike* source = this->parent_->buffer_; + RandomAccessSource* source = this->parent_->buffer_; source->Seek(col_start); @@ -141,7 +98,7 @@ ParquetFileReader::ParquetFileReader() : ParquetFileReader::~ParquetFileReader() {} -void ParquetFileReader::Open(FileLike* buffer) { +void ParquetFileReader::Open(RandomAccessSource* buffer) { buffer_ = buffer; } diff --git a/cpp/src/parquet/reader.h b/cpp/src/parquet/reader.h index ea23182d7c97d..3a9dc5d6e03e8 100644 --- a/cpp/src/parquet/reader.h +++ b/cpp/src/parquet/reader.h @@ -27,53 +27,12 @@ #include "parquet/thrift/parquet_types.h" #include "parquet/types.h" - #include "parquet/schema/descriptor.h" +#include "parquet/util/input.h" namespace parquet_cpp { class ColumnReader; - -class FileLike { - public: - virtual ~FileLike() {} - - virtual void Close() = 0; - virtual size_t Size() = 0; - virtual size_t Tell() = 0; - virtual void Seek(size_t pos) = 0; - - // Returns actual number of bytes read - virtual size_t Read(size_t nbytes, uint8_t* out) = 0; -}; - - -class LocalFile : public FileLike { - public: - LocalFile() : file_(nullptr), is_open_(false) {} - virtual ~LocalFile(); - - void Open(const std::string& path); - - virtual void Close(); - virtual size_t Size(); - virtual size_t Tell(); - virtual void Seek(size_t pos); - - // Returns actual number of bytes read - virtual size_t Read(size_t nbytes, uint8_t* out); - - bool is_open() const { return is_open_;} - const std::string& path() const { return path_;} - - private: - void CloseFile(); - - std::string path_; - FILE* file_; - bool is_open_; -}; - class ParquetFileReader; class RowGroupReader { @@ -112,7 +71,7 @@ class ParquetFileReader { // This class does _not_ take ownership of the file. You must manage its // lifetime separately - void Open(FileLike* buffer); + void Open(RandomAccessSource* buffer); void Close(); @@ -150,7 +109,7 @@ class ParquetFileReader { // Row group index -> RowGroupReader std::unordered_map > row_group_readers_; - FileLike* buffer_; + RandomAccessSource* buffer_; }; diff --git a/cpp/src/parquet/util/CMakeLists.txt b/cpp/src/parquet/util/CMakeLists.txt index 046a7c9effbb6..504069f62ed90 100644 --- a/cpp/src/parquet/util/CMakeLists.txt +++ b/cpp/src/parquet/util/CMakeLists.txt @@ -27,11 +27,13 @@ install(FILES macros.h rle-encoding.h stopwatch.h - input_stream.h + input.h + output.h DESTINATION include/parquet/util) add_library(parquet_util STATIC - input_stream.cc + input.cc + output.cc cpu-info.cc ) @@ -54,4 +56,5 @@ if(PARQUET_BUILD_TESTS) endif() ADD_PARQUET_TEST(bit-util-test) +ADD_PARQUET_TEST(output-test) ADD_PARQUET_TEST(rle-test) diff --git a/cpp/src/parquet/util/input_stream.cc b/cpp/src/parquet/util/input.cc similarity index 53% rename from cpp/src/parquet/util/input_stream.cc rename to cpp/src/parquet/util/input.cc index 281a3425086be..0e4b833b94278 100644 --- a/cpp/src/parquet/util/input_stream.cc +++ b/cpp/src/parquet/util/input.cc @@ -15,28 +15,77 @@ // specific language governing permissions and limitations // under the License. -#include "parquet/util/input_stream.h" +#include "parquet/util/input.h" #include +#include #include "parquet/exception.h" namespace parquet_cpp { +// ---------------------------------------------------------------------- +// LocalFileSource + +LocalFileSource::~LocalFileSource() { + CloseFile(); +} + +void LocalFileSource::Open(const std::string& path) { + path_ = path; + file_ = fopen(path_.c_str(), "r"); + is_open_ = true; +} + +void LocalFileSource::Close() { + // Pure virtual + CloseFile(); +} + +void LocalFileSource::CloseFile() { + if (is_open_) { + fclose(file_); + is_open_ = false; + } +} + +size_t LocalFileSource::Size() { + fseek(file_, 0L, SEEK_END); + return Tell(); +} + +void LocalFileSource::Seek(size_t pos) { + fseek(file_, pos, SEEK_SET); +} + +size_t LocalFileSource::Tell() { + return ftell(file_); +} + +size_t LocalFileSource::Read(size_t nbytes, uint8_t* buffer) { + return fread(buffer, 1, nbytes, file_); +} + +// ---------------------------------------------------------------------- +// InMemoryInputStream + InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) : buffer_(buffer), len_(len), offset_(0) {} -const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) { +const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) { *num_bytes = std::min(static_cast(num_to_peek), len_ - offset_); return buffer_ + offset_; } -const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) { +const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) { const uint8_t* result = Peek(num_to_read, num_bytes); offset_ += *num_bytes; return result; } +// ---------------------------------------------------------------------- +// ScopedInMemoryInputStream:: like InMemoryInputStream but owns its memory + ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) { buffer_.resize(len); stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size())); @@ -50,13 +99,11 @@ int64_t ScopedInMemoryInputStream::size() { return buffer_.size(); } -const uint8_t* ScopedInMemoryInputStream::Peek(int num_to_peek, - int* num_bytes) { +const uint8_t* ScopedInMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) { return stream_->Peek(num_to_peek, num_bytes); } -const uint8_t* ScopedInMemoryInputStream::Read(int num_to_read, - int* num_bytes) { +const uint8_t* ScopedInMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) { return stream_->Read(num_to_read, num_bytes); } diff --git a/cpp/src/parquet/util/input_stream.h b/cpp/src/parquet/util/input.h similarity index 58% rename from cpp/src/parquet/util/input_stream.h rename to cpp/src/parquet/util/input.h index ece2488aba267..4fd9cd7894d7d 100644 --- a/cpp/src/parquet/util/input_stream.h +++ b/cpp/src/parquet/util/input.h @@ -15,15 +15,63 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_INPUT_STREAM_H -#define PARQUET_INPUT_STREAM_H +#ifndef PARQUET_UTIL_INPUT_H +#define PARQUET_UTIL_INPUT_H #include #include +#include #include namespace parquet_cpp { +// ---------------------------------------------------------------------- +// Random access input (e.g. file-like) + +// Random +class RandomAccessSource { + public: + virtual ~RandomAccessSource() {} + + virtual void Close() = 0; + virtual size_t Size() = 0; + virtual size_t Tell() = 0; + virtual void Seek(size_t pos) = 0; + + // Returns actual number of bytes read + virtual size_t Read(size_t nbytes, uint8_t* out) = 0; +}; + + +class LocalFileSource : public RandomAccessSource { + public: + LocalFileSource() : file_(nullptr), is_open_(false) {} + virtual ~LocalFileSource(); + + void Open(const std::string& path); + + virtual void Close(); + virtual size_t Size(); + virtual size_t Tell(); + virtual void Seek(size_t pos); + + // Returns actual number of bytes read + virtual size_t Read(size_t nbytes, uint8_t* out); + + bool is_open() const { return is_open_;} + const std::string& path() const { return path_;} + + private: + void CloseFile(); + + std::string path_; + FILE* file_; + bool is_open_; +}; + +// ---------------------------------------------------------------------- +// Streaming input interfaces + // Interface for the column reader to get the bytes. The interface is a stream // interface, meaning the bytes in order and once a byte is read, it does not // need to be read again. @@ -35,11 +83,11 @@ class InputStream { // Since the position is not advanced, calls to this function are idempotent. // The buffer returned to the caller is still owned by the input stream and must // stay valid until the next call to Peek() or Read(). - virtual const uint8_t* Peek(int num_to_peek, int* num_bytes) = 0; + virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0; // Identical to Peek(), except the current position in the stream is advanced by // *num_bytes. - virtual const uint8_t* Read(int num_to_read, int* num_bytes) = 0; + virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0; virtual ~InputStream() {} @@ -51,8 +99,8 @@ class InputStream { class InMemoryInputStream : public InputStream { public: InMemoryInputStream(const uint8_t* buffer, int64_t len); - virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); - virtual const uint8_t* Read(int num_to_read, int* num_bytes); + virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes); + virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes); private: const uint8_t* buffer_; @@ -67,8 +115,8 @@ class ScopedInMemoryInputStream : public InputStream { explicit ScopedInMemoryInputStream(int64_t len); uint8_t* data(); int64_t size(); - virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); - virtual const uint8_t* Read(int num_to_read, int* num_bytes); + virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes); + virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes); private: std::vector buffer_; @@ -77,4 +125,4 @@ class ScopedInMemoryInputStream : public InputStream { } // namespace parquet_cpp -#endif // PARQUET_INPUT_STREAM_H +#endif // PARQUET_UTIL_INPUT_H diff --git a/cpp/src/parquet/util/output-test.cc b/cpp/src/parquet/util/output-test.cc new file mode 100644 index 0000000000000..84f5b57efbb5b --- /dev/null +++ b/cpp/src/parquet/util/output-test.cc @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include + +#include "parquet/util/output.h" +#include "parquet/util/test-common.h" + +namespace parquet_cpp { + +TEST(TestInMemoryOutputStream, Basics) { + std::unique_ptr stream(new InMemoryOutputStream(8)); + + std::vector data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; + + stream->Write(&data[0], 4); + ASSERT_EQ(4, stream->Tell()); + stream->Write(&data[4], data.size() - 4); + + std::vector out; + stream->Transfer(&out); + + test::assert_vector_equal(data, out); + + ASSERT_EQ(0, stream->Tell()); +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/util/output.cc b/cpp/src/parquet/util/output.cc new file mode 100644 index 0000000000000..9748a69eaf4e6 --- /dev/null +++ b/cpp/src/parquet/util/output.cc @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/util/output.h" + +#include +#include +#include + +#include "parquet/exception.h" + +namespace parquet_cpp { + +// ---------------------------------------------------------------------- +// In-memory output stream + +static constexpr int64_t IN_MEMORY_DEFAULT_CAPACITY = 1024; + +InMemoryOutputStream::InMemoryOutputStream(int64_t initial_capacity) : + size_(0), + capacity_(initial_capacity) { + if (initial_capacity == 0) { + initial_capacity = IN_MEMORY_DEFAULT_CAPACITY; + } + buffer_.resize(initial_capacity); +} + +InMemoryOutputStream::InMemoryOutputStream() : + InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY) {} + +uint8_t* InMemoryOutputStream::Head() { + return &buffer_[size_]; +} + +void InMemoryOutputStream::Write(const uint8_t* data, int64_t length) { + if (size_ + length > capacity_) { + int64_t new_capacity = capacity_ * 2; + while (new_capacity < size_ + length) { + new_capacity *= 2; + } + buffer_.resize(new_capacity); + capacity_ = new_capacity; + } + memcpy(Head(), data, length); + size_ += length; +} + +int64_t InMemoryOutputStream::Tell() { + return size_; +} + +void InMemoryOutputStream::Transfer(std::vector* out) { + buffer_.resize(size_); + buffer_.swap(*out); + size_ = 0; + capacity_ = buffer_.size(); +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/util/output.h b/cpp/src/parquet/util/output.h new file mode 100644 index 0000000000000..e83b2619cf98f --- /dev/null +++ b/cpp/src/parquet/util/output.h @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_UTIL_OUTPUT_H +#define PARQUET_UTIL_OUTPUT_H + +#include +#include +#include + +namespace parquet_cpp { + +// ---------------------------------------------------------------------- +// Output stream classes + +// Abstract output stream +class OutputStream { + public: + // Close the output stream + virtual void Close() = 0; + + // Return the current position in the output stream relative to the start + virtual int64_t Tell() = 0; + + // Copy bytes into the output stream + virtual void Write(const uint8_t* data, int64_t length) = 0; +}; + + +// An output stream that is an in-memory +class InMemoryOutputStream : public OutputStream { + public: + InMemoryOutputStream(); + explicit InMemoryOutputStream(int64_t initial_capacity); + + // Close is currently a no-op with the in-memory stream + virtual void Close() {} + + virtual int64_t Tell(); + + virtual void Write(const uint8_t* data, int64_t length); + + // Hand off the in-memory data to a (preferably-empty) std::vector owner + void Transfer(std::vector* out); + + private: + // Mutable pointer to the current write position in the stream + uint8_t* Head(); + + std::vector buffer_; + int64_t size_; + int64_t capacity_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_UTIL_OUTPUT_H diff --git a/cpp/src/parquet/util/test-common.h b/cpp/src/parquet/util/test-common.h index 3cf82f5961683..84519d6f718ac 100644 --- a/cpp/src/parquet/util/test-common.h +++ b/cpp/src/parquet/util/test-common.h @@ -28,6 +28,16 @@ namespace parquet_cpp { namespace test { +template +static inline void assert_vector_equal(const vector& left, + const vector& right) { + ASSERT_EQ(left.size(), right.size()); + + for (size_t i = 0; i < left.size(); ++i) { + ASSERT_EQ(left[i], right[i]) << i; + } +} + template static inline bool vector_equal(const vector& left, const vector& right) { if (left.size() != right.size()) { @@ -47,6 +57,19 @@ static inline bool vector_equal(const vector& left, const vector& right) { return true; } +template +static vector slice(const vector& values, size_t start, size_t end) { + if (end < start) { + return vector(0); + } + + vector out(end - start); + for (size_t i = start; i < end; ++i) { + out[i - start] = values[i]; + } + return out; +} + static inline vector flip_coins_seed(size_t n, double p, uint32_t seed) { std::mt19937 gen(seed); std::bernoulli_distribution d(p);