diff --git a/cpp/src/arrow/util/hashing-test.cc b/cpp/src/arrow/util/hashing-test.cc index cc80283532241..f8b761826c25d 100644 --- a/cpp/src/arrow/util/hashing-test.cc +++ b/cpp/src/arrow/util/hashing-test.cc @@ -370,6 +370,14 @@ TEST(BinaryMemoTable, Basics) { table.CopyValues(4 /* start offset */, reinterpret_cast(&values[0])); ASSERT_EQ(values, expected_values); } + { + std::vector expected({B, C, D, E, F}); + std::vector actual; + table.VisitValues(1 /* start offset */, [&](const util::string_view& v) { + actual.emplace_back(v.data(), v.length()); + }); + ASSERT_EQ(actual, expected); + } } TEST(BinaryMemoTable, Stress) { diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h index 24325e81eb4fd..ee368fb4e314c 100644 --- a/cpp/src/arrow/util/hashing.h +++ b/cpp/src/arrow/util/hashing.h @@ -34,6 +34,7 @@ #include "arrow/array.h" #include "arrow/buffer.h" +#include "arrow/builder.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" @@ -605,6 +606,17 @@ class BinaryMemoTable { CopyValues(0, out_size, out_data); } + // Visit the stored values in insertion order. + // The visitor function should have the signature `void(util::string_view)` + // or `void(const util::string_view&)`. + template + void VisitValues(int32_t start, VisitFunc&& visit) const { + for (uint32_t i = start; i < offsets_.size() - 1; ++i) { + visit( + util::string_view(values_.data() + offsets_[i], offsets_[i + 1] - offsets_[i])); + } + } + protected: struct Payload { int32_t memo_index; diff --git a/cpp/src/parquet/encoding-benchmark.cc b/cpp/src/parquet/encoding-benchmark.cc index e7309dbead5b8..364cdba15a252 100644 --- a/cpp/src/parquet/encoding-benchmark.cc +++ b/cpp/src/parquet/encoding-benchmark.cc @@ -110,8 +110,7 @@ static void DecodeDict(std::vector& values, DictEncoder encoder(descr.get(), &pool, allocator); for (int i = 0; i < num_values; ++i) { - // No SSE - encoder.template Put(values[i]); + encoder.Put(values[i]); } std::shared_ptr dict_buffer = diff --git a/cpp/src/parquet/encoding-internal.h b/cpp/src/parquet/encoding-internal.h index 93d49930049fe..d4042cb19a330 100644 --- a/cpp/src/parquet/encoding-internal.h +++ b/cpp/src/parquet/encoding-internal.h @@ -26,8 +26,7 @@ #include "arrow/util/bit-stream-utils.h" #include "arrow/util/bit-util.h" -#include "arrow/util/cpu-info.h" -#include "arrow/util/hash-util.h" +#include "arrow/util/hashing.h" #include "arrow/util/macros.h" #include "arrow/util/rle-encoding.h" @@ -435,17 +434,24 @@ inline void DictionaryDecoder::SetDict(Decoder* dictionary) // ---------------------------------------------------------------------- // Dictionary encoder -// Initially imported from Apache Impala on 2016-02-22, and has been modified -// since for parquet-cpp +template +struct DictEncoderTraits { + using c_type = typename DType::c_type; + using MemoTableType = ::arrow::internal::ScalarMemoTable; +}; -// Initially 1024 elements -static constexpr int INITIAL_HASH_TABLE_SIZE = 1 << 10; +template <> +struct DictEncoderTraits { + using MemoTableType = ::arrow::internal::BinaryMemoTable; +}; -typedef int32_t hash_slot_t; -static constexpr hash_slot_t HASH_SLOT_EMPTY = std::numeric_limits::max(); +template <> +struct DictEncoderTraits { + using MemoTableType = ::arrow::internal::BinaryMemoTable; +}; -// The maximum load factor for the hash table before resizing. -static constexpr double MAX_HASH_LOAD = 0.7; +// Initially 1024 elements +static constexpr int32_t INITIAL_HASH_TABLE_SIZE = 1 << 10; /// See the dictionary encoding section of https://github.com/Parquet/parquet-format. /// The encoding supports streaming encoding. Values are encoded as they are added while @@ -454,29 +460,23 @@ static constexpr double MAX_HASH_LOAD = 0.7; /// the encoder, including new dictionary entries. template class DictEncoder : public Encoder { + using MemoTableType = typename DictEncoderTraits::MemoTableType; + public: typedef typename DType::c_type T; + // XXX pool is unused explicit DictEncoder(const ColumnDescriptor* desc, ChunkedAllocator* pool = nullptr, ::arrow::MemoryPool* allocator = ::arrow::default_memory_pool()) : Encoder(desc, Encoding::PLAIN_DICTIONARY, allocator), allocator_(allocator), pool_(pool), - hash_table_size_(INITIAL_HASH_TABLE_SIZE), - mod_bitmask_(hash_table_size_ - 1), - hash_slots_(0, allocator), dict_encoded_size_(0), - type_length_(desc->type_length()) { - hash_slots_.Assign(hash_table_size_, HASH_SLOT_EMPTY); - cpu_info_ = ::arrow::internal::CpuInfo::GetInstance(); - } + type_length_(desc->type_length()), + memo_table_(INITIAL_HASH_TABLE_SIZE) {} ~DictEncoder() override { DCHECK(buffered_indices_.empty()); } - // TODO(wesm): think about how to address the construction semantics in - // encodings/dictionary-encoding.h - void set_mem_pool(ChunkedAllocator* pool) { pool_ = pool; } - void set_type_length(int type_length) { type_length_ = type_length; } /// Returns a conservative estimate of the number of bytes needed to encode the buffered @@ -506,62 +506,31 @@ class DictEncoder : public Encoder { /// to size buffer. int WriteIndices(uint8_t* buffer, int buffer_len); - int hash_table_size() { return hash_table_size_; } int dict_encoded_size() { return dict_encoded_size_; } - /// Clears all the indices (but leaves the dictionary). - void ClearIndices() { buffered_indices_.clear(); } /// Encode value. Note that this does not actually write any data, just /// buffers the value's index to be written later. - template - void Put(const T& value); - - template - int Hash(const T& value); + inline void Put(const T& value); + void Put(const T* values, int num_values) override; std::shared_ptr FlushValues() override { std::shared_ptr buffer = AllocateBuffer(this->allocator_, EstimatedDataEncodedSize()); int result_size = WriteIndices(buffer->mutable_data(), static_cast(EstimatedDataEncodedSize())); - ClearIndices(); PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false)); return buffer; } - void Put(const T* values, int num_values) override { - if (cpu_info_->CanUseSSE4_2()) { - for (int i = 0; i < num_values; i++) { - Put(values[i]); - } - } else { - for (int i = 0; i < num_values; i++) { - Put(values[i]); - } - } - } - - template - void DoubleTableSize(); - void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, int64_t valid_bits_offset) override { ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset, num_values); - if (cpu_info_->CanUseSSE4_2()) { - for (int32_t i = 0; i < num_values; i++) { - if (valid_bits_reader.IsSet()) { - Put(src[i]); - } - valid_bits_reader.Next(); - } - } else { - for (int32_t i = 0; i < num_values; i++) { - if (valid_bits_reader.IsSet()) { - Put(src[i]); - } - valid_bits_reader.Next(); + for (int32_t i = 0; i < num_values; i++) { + if (valid_bits_reader.IsSet()) { + Put(src[i]); } + valid_bits_reader.Next(); } } @@ -572,216 +541,103 @@ class DictEncoder : public Encoder { ChunkedAllocator* mem_pool() { return pool_; } /// The number of entries in the dictionary. - int num_entries() const { return static_cast(uniques_.size()); } + int num_entries() const { return memo_table_.size(); } private: + /// Clears all the indices (but leaves the dictionary). + void ClearIndices() { buffered_indices_.clear(); } + ::arrow::MemoryPool* allocator_; // For ByteArray / FixedLenByteArray data. Not owned ChunkedAllocator* pool_; - ::arrow::internal::CpuInfo* cpu_info_; - - /// Size of the table. Must be a power of 2. - int hash_table_size_; - - // Store hash_table_size_ - 1, so that j & mod_bitmask_ is equivalent to j % - // hash_table_size_, but uses far fewer CPU cycles - int mod_bitmask_; - - // We use a fixed-size hash table with linear probing - // - // These values correspond to the uniques_ array - Vector hash_slots_; - /// Indices that have not yet be written out by WriteIndices(). std::vector buffered_indices_; /// The number of bytes needed to encode the dictionary. int dict_encoded_size_; - // The unique observed values - std::vector uniques_; - - bool SlotDifferent(const T& v, hash_slot_t slot); - /// Size of each encoded dictionary value. -1 for variable-length types. int type_length_; - /// Adds value to the hash table and updates dict_encoded_size_ - void AddDictKey(const T& value); + MemoTableType memo_table_; }; template -template -int DictEncoder::Hash(const typename DType::c_type& value) { - return ::arrow::HashUtil::Hash(&value, sizeof(value), 0); -} - -template <> -template -int DictEncoder::Hash(const ByteArray& value) { - if (value.len > 0) { - DCHECK_NE(nullptr, value.ptr) << "Value ptr cannot be NULL"; - } - return ::arrow::HashUtil::Hash(value.ptr, value.len, 0); -} - -template <> -template -int DictEncoder::Hash(const FixedLenByteArray& value) { - if (type_length_ > 0) { - DCHECK_NE(nullptr, value.ptr) << "Value ptr cannot be NULL"; +void DictEncoder::Put(const T* src, int num_values) { + for (int32_t i = 0; i < num_values; i++) { + Put(src[i]); } - return ::arrow::HashUtil::Hash(value.ptr, type_length_, 0); } template -inline bool DictEncoder::SlotDifferent(const typename DType::c_type& v, - hash_slot_t slot) { - return v != uniques_[slot]; +inline void DictEncoder::Put(const T& v) { + // Put() implementation for primitive types + auto on_found = [](int32_t memo_index) {}; + auto on_not_found = [this](int32_t memo_index) { + dict_encoded_size_ += static_cast(sizeof(T)); + }; + + auto memo_index = memo_table_.GetOrInsert(v, on_found, on_not_found); + buffered_indices_.push_back(memo_index); } template <> -inline bool DictEncoder::SlotDifferent(const FixedLenByteArray& v, - hash_slot_t slot) { - return 0 != memcmp(v.ptr, uniques_[slot].ptr, type_length_); -} - -template -template -inline void DictEncoder::Put(const typename DType::c_type& v) { - int j = Hash(v) & mod_bitmask_; - hash_slot_t index = hash_slots_[j]; - - // Find an empty slot - while (HASH_SLOT_EMPTY != index && SlotDifferent(v, index)) { - // Linear probing - ++j; - if (j == hash_table_size_) j = 0; - index = hash_slots_[j]; - } - - if (index == HASH_SLOT_EMPTY) { - // Not in the hash table, so we insert it now - index = static_cast(uniques_.size()); - hash_slots_[j] = index; - AddDictKey(v); - - if (ARROW_PREDICT_FALSE(static_cast(uniques_.size()) > - hash_table_size_ * MAX_HASH_LOAD)) { - DoubleTableSize(); - } - } - - buffered_indices_.push_back(index); -} - -template -template -inline void DictEncoder::DoubleTableSize() { - int new_size = hash_table_size_ * 2; - Vector new_hash_slots(0, allocator_); - new_hash_slots.Assign(new_size, HASH_SLOT_EMPTY); - hash_slot_t index, slot; - int j; - for (int i = 0; i < hash_table_size_; ++i) { - index = hash_slots_[i]; - - if (index == HASH_SLOT_EMPTY) { - continue; - } - - // Compute the hash value mod the new table size to start looking for an - // empty slot - const typename DType::c_type& v = uniques_[index]; - - // Find an empty slot in the new hash table - j = Hash(v) & (new_size - 1); - slot = new_hash_slots[j]; - while (HASH_SLOT_EMPTY != slot && SlotDifferent(v, slot)) { - ++j; - if (j == new_size) j = 0; - slot = new_hash_slots[j]; - } - - // Copy the old slot index to the new hash table - new_hash_slots[j] = index; - } - - hash_table_size_ = new_size; - mod_bitmask_ = new_size - 1; - - hash_slots_.Swap(new_hash_slots); -} - -template -inline void DictEncoder::AddDictKey(const typename DType::c_type& v) { - uniques_.push_back(v); - dict_encoded_size_ += static_cast(sizeof(typename DType::c_type)); +inline void DictEncoder::Put(const ByteArray& v) { + static const uint8_t empty[] = {0}; + + auto on_found = [](int32_t memo_index) {}; + auto on_not_found = [&](int32_t memo_index) { + dict_encoded_size_ += static_cast(v.len + sizeof(uint32_t)); + }; + + DCHECK(v.ptr != nullptr || v.len == 0); + const void* ptr = (v.ptr != nullptr) ? v.ptr : empty; + auto memo_index = + memo_table_.GetOrInsert(ptr, static_cast(v.len), on_found, on_not_found); + buffered_indices_.push_back(memo_index); } template <> -inline void DictEncoder::AddDictKey(const ByteArray& v) { - uint8_t* heap = pool_->Allocate(v.len); - if (ARROW_PREDICT_FALSE(v.len > 0 && heap == nullptr)) { - throw ParquetException("out of memory"); - } - memcpy(heap, v.ptr, v.len); - uniques_.push_back(ByteArray(v.len, heap)); - dict_encoded_size_ += static_cast(v.len + sizeof(uint32_t)); -} +inline void DictEncoder::Put(const FixedLenByteArray& v) { + static const uint8_t empty[] = {0}; -template <> -inline void DictEncoder::AddDictKey(const FixedLenByteArray& v) { - uint8_t* heap = pool_->Allocate(type_length_); - if (ARROW_PREDICT_FALSE(type_length_ > 0 && heap == nullptr)) { - throw ParquetException("out of memory"); - } - memcpy(heap, v.ptr, type_length_); + auto on_found = [](int32_t memo_index) {}; + auto on_not_found = [this](int32_t memo_index) { dict_encoded_size_ += type_length_; }; - uniques_.push_back(FixedLenByteArray(heap)); - dict_encoded_size_ += type_length_; + DCHECK(v.ptr != nullptr || type_length_ == 0); + const void* ptr = (v.ptr != nullptr) ? v.ptr : empty; + auto memo_index = memo_table_.GetOrInsert(ptr, type_length_, on_found, on_not_found); + buffered_indices_.push_back(memo_index); } template inline void DictEncoder::WriteDict(uint8_t* buffer) { // For primitive types, only a memcpy - memcpy(buffer, uniques_.data(), sizeof(typename DType::c_type) * uniques_.size()); -} - -template <> -inline void DictEncoder::WriteDict(uint8_t* buffer) { - // For primitive types, only a memcpy - // memcpy(buffer, uniques_.data(), sizeof(typename DType::c_type) * uniques_.size()); - for (size_t i = 0; i < uniques_.size(); i++) { - buffer[i] = uniques_[i]; - } + DCHECK_EQ(dict_encoded_size_, sizeof(T) * memo_table_.size()); + memo_table_.CopyValues(0 /* start_pos */, reinterpret_cast(buffer)); } // ByteArray and FLBA already have the dictionary encoded in their data heaps template <> inline void DictEncoder::WriteDict(uint8_t* buffer) { - for (const ByteArray& v : uniques_) { - memcpy(buffer, reinterpret_cast(&v.len), sizeof(uint32_t)); + memo_table_.VisitValues(0, [&](const ::arrow::util::string_view& v) { + uint32_t len = static_cast(v.length()); + memcpy(buffer, &len, sizeof(uint32_t)); buffer += sizeof(uint32_t); - if (v.len > 0) { - DCHECK(nullptr != v.ptr) << "Value ptr cannot be NULL"; - } - memcpy(buffer, v.ptr, v.len); - buffer += v.len; - } + memcpy(buffer, v.data(), v.length()); + buffer += v.length(); + }); } template <> inline void DictEncoder::WriteDict(uint8_t* buffer) { - for (const FixedLenByteArray& v : uniques_) { - if (type_length_ > 0) { - DCHECK(nullptr != v.ptr) << "Value ptr cannot be NULL"; - } - memcpy(buffer, v.ptr, type_length_); + memo_table_.VisitValues(0, [&](const ::arrow::util::string_view& v) { + DCHECK_EQ(v.length(), static_cast(type_length_)); + memcpy(buffer, v.data(), type_length_); buffer += type_length_; - } + }); } template