diff --git a/src/yb/docdb/CMakeLists.txt b/src/yb/docdb/CMakeLists.txt index b2746bdaa0cf..a323279b5484 100644 --- a/src/yb/docdb/CMakeLists.txt +++ b/src/yb/docdb/CMakeLists.txt @@ -78,6 +78,8 @@ set(DOCDB_SRCS transaction_dump.cc transaction_status_cache.cc local_waiting_txn_registry.cc + vector_index.cc + vector_index_read.cc vector_index_update.cc wait_queue.cc ) diff --git a/src/yb/docdb/intent_aware_iterator.cc b/src/yb/docdb/intent_aware_iterator.cc index 7be1f8c0a16c..5bf2cfc917a0 100644 --- a/src/yb/docdb/intent_aware_iterator.cc +++ b/src/yb/docdb/intent_aware_iterator.cc @@ -636,6 +636,11 @@ bool IntentAwareIterator::IsRegularEntryOrderedBeforeResolvedIntent() const { regular_entry_.key, resolved_intent_sub_doc_key_encoded_.AsSlice()); } +Result IntentAwareIterator::FetchNext() { + Next(); + return Fetch(); +} + Result IntentAwareIterator::Fetch() { #ifndef NDEBUG need_fetch_ = false; diff --git a/src/yb/docdb/intent_aware_iterator.h b/src/yb/docdb/intent_aware_iterator.h index c6a2f130583f..e3d692d7cea8 100644 --- a/src/yb/docdb/intent_aware_iterator.h +++ b/src/yb/docdb/intent_aware_iterator.h @@ -106,6 +106,9 @@ class IntentAwareIterator final : public IntentAwareIteratorIf { // contain the DocHybridTime but is returned separately and optionally. Result Fetch() override; + // Utility function to execute Next and retrieve result via Fetch in one call. + Result FetchNext(); + const ReadHybridTime& read_time() const override { return read_time_; } Result RestartReadHt() const override; diff --git a/src/yb/docdb/vector_index.cc b/src/yb/docdb/vector_index.cc new file mode 100644 index 000000000000..a5c7a3301891 --- /dev/null +++ b/src/yb/docdb/vector_index.cc @@ -0,0 +1,32 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/docdb/vector_index.h" + +namespace yb::docdb { + +namespace detail { + +void AppendSubkey(dockv::KeyBytes& key, VectorIndexLevel level) { + key.AppendKeyEntryType(dockv::KeyEntryType::kUInt32); + key.AppendUInt32(level); +} + +void AppendSubkey(dockv::KeyBytes& key, VertexId id) { + key.AppendKeyEntryType(dockv::KeyEntryType::kVertexId); + key.AppendUInt64(id); +} + +} // namespace detail + +} // namespace yb::docdb diff --git a/src/yb/docdb/vector_index.h b/src/yb/docdb/vector_index.h index d826ffe8b026..5ae766864097 100644 --- a/src/yb/docdb/vector_index.h +++ b/src/yb/docdb/vector_index.h @@ -13,6 +13,9 @@ #pragma once +#include "yb/docdb/docdb_fwd.h" + +#include "yb/dockv/key_bytes.h" #include "yb/dockv/primitive_value.h" namespace yb::docdb { @@ -20,5 +23,49 @@ namespace yb::docdb { using VertexId = uint64_t; using VectorIndexLevel = uint8_t; using VectorNodeNeighbors = std::set; +constexpr VertexId kInvalidVertexId = 0; + +template +struct VectorIndexTypes { + using IndexedVector = std::vector; +}; + +template +class VectorIndexFetcher { + public: + using Types = VectorIndexTypes; + using IndexedVector = typename Types::IndexedVector; + + virtual Result GetVector( + const ReadOperationData& read_operation_data, VertexId id) = 0; + virtual Result GetNeighbors( + const ReadOperationData& read_operation_data, VertexId id, VectorIndexLevel level) = 0; + virtual ~VectorIndexFetcher() = default; +}; + +namespace detail { + +void AppendSubkey(dockv::KeyBytes& key, VectorIndexLevel level); +void AppendSubkey(dockv::KeyBytes& key, VertexId id); + +inline void AppendSubkeys(dockv::KeyBytes& key) {} + +template +void AppendSubkeys(dockv::KeyBytes& key, const T& t, Subkeys&&... subkeys) { + AppendSubkey(key, t); + AppendSubkeys(key, std::forward(subkeys)...); +} + +} // namespace detail + +template +dockv::KeyBytes MakeVectorIndexKey(VertexId id, Subkeys&&... subkeys) { + dockv::KeyBytes key; + auto key_entry_value = dockv::KeyEntryValue::VectorVertexId(id); + key_entry_value.AppendToKey(&key); + key.AppendGroupEnd(); + detail::AppendSubkeys(key, std::forward(subkeys)...); + return key; +} } // namespace yb::docdb diff --git a/src/yb/docdb/vector_index_docdb-test.cc b/src/yb/docdb/vector_index_docdb-test.cc index adb7a4719dad..55148c9a7d40 100644 --- a/src/yb/docdb/vector_index_docdb-test.cc +++ b/src/yb/docdb/vector_index_docdb-test.cc @@ -12,26 +12,36 @@ // #include "yb/docdb/docdb_test_base.h" +#include "yb/docdb/vector_index_read.h" #include "yb/docdb/vector_index_update.h" #include "yb/util/range.h" namespace yb::docdb { +using VITypes = VectorIndexTypes; + class VectorIndexDocDBTest : public DocDBTestBase { + protected: Schema CreateSchema() override { return Schema(); } + + void WriteSimple(); }; -TEST_F(VectorIndexDocDBTest, Update) { - const HybridTime hybrid_time = HybridTime::FromMicros(1000); +VITypes::IndexedVector GenVector(VertexId id) { + return {static_cast(M_E * id), static_cast(M_PI * id)}; +} + +void VectorIndexDocDBTest::WriteSimple() { + const HybridTime kHybridTime = HybridTime::FromMicros(1000); constexpr int kNumNodes = 3; const auto kNodes = Range(1, kNumNodes + 1); rocksdb::WriteBatch write_batch; - FloatVectorIndexUpdate update(hybrid_time, write_batch); + FloatVectorIndexUpdate update(kHybridTime, write_batch); for (int i : kNodes) { - update.AddVector(i, {static_cast(M_E * i), static_cast(M_PI * i)}); + update.AddVector(i, GenVector(i)); } for (int i : kNodes) { update.SetNeighbors(i, /* level= */ 0, Range(i + 1, kNumNodes + 1).ToContainer()); @@ -43,8 +53,25 @@ TEST_F(VectorIndexDocDBTest, Update) { update.DeleteDirectedEdge(2, 3, 20); update.DeleteVector(3); + update.AddVector(4, GenVector(4)); + update.SetNeighbors(4, /* level= */ 0, Range(1, 2).ToContainer()); + update.SetNeighbors(4, /* level= */ 0, Range(1, 3).ToContainer()); + + update.AddVector(5, GenVector(5)); + update.SetNeighbors(5, /* level= */ 0, Range(1, 3).ToContainer()); + update.DeleteDirectedEdge(5, 2, 0); + update.AddDirectedEdge(5, 10, 0); + update.SetNeighbors(5, /* level= */ 0, Range(1, 4).ToContainer()); + ASSERT_OK(rocksdb()->Write(write_options(), &write_batch)); +} + +TEST_F(VectorIndexDocDBTest, Update) { + WriteSimple(); + // Please note that in real system we would never get such set of data. + // But it is controlled by higher levels of the system. + // So storage layer just perform its job and store/load requested information. AssertDocDbDebugDumpStrEq(R"#( // The vector 1 itself. SubDocKey(DocKey([], [1]), [HT{ physical: 1000 }]) -> [2.71828174591064, 3.14159274101257] @@ -63,7 +90,59 @@ TEST_F(VectorIndexDocDBTest, Update) { SubDocKey(DocKey([], [3]), [HT{ physical: 1000 w: 2 }]) -> [8.15484523773193, 9.42477798461914] SubDocKey(DocKey([], [3]), [0; HT{ physical: 1000 w: 5 }]) -> [] SubDocKey(DocKey([], [3]), [30, 1; HT{ physical: 1000 w: 8 }]) -> null + // Vector 4 + SubDocKey(DocKey([], [4]), [HT{ physical: 1000 w: 11 }]) -> [10.8731269836426, 12.5663709640503] + // Overwrite vector 4 neighbors in level 0. + SubDocKey(DocKey([], [4]), [0; HT{ physical: 1000 w: 13 }]) -> [1, 2] + SubDocKey(DocKey([], [4]), [0; HT{ physical: 1000 w: 12 }]) -> [1] + // Vector 5 + SubDocKey(DocKey([], [5]), [HT{ physical: 1000 w: 14 }]) -> [13.5914087295532, 15.7079629898071] + // Overwrite vector 4 neighbors in level 0. Including overwrite to updates. + SubDocKey(DocKey([], [5]), [0; HT{ physical: 1000 w: 18 }]) -> [1, 2, 3] + SubDocKey(DocKey([], [5]), [0; HT{ physical: 1000 w: 15 }]) -> [1, 2] + SubDocKey(DocKey([], [5]), [0, 2; HT{ physical: 1000 w: 16 }]) -> DEL + SubDocKey(DocKey([], [5]), [0, 10; HT{ physical: 1000 w: 17 }]) -> null )#"); } +TEST_F(VectorIndexDocDBTest, Read) { + WriteSimple(); + + const HybridTime kReadHybridTime = HybridTime::FromMicros(1001); + + ReadOperationData read_operation_data = { + .deadline = CoarseTimePoint::max(), + .read_time = ReadHybridTime::SingleTime(kReadHybridTime), + }; + + FloatVectorIndexStorage storage(doc_db()); + + auto vector = ASSERT_RESULT(storage.GetVector(read_operation_data, 1)); + ASSERT_EQ(vector, GenVector(1)); + auto neighbors = ASSERT_RESULT(storage.GetNeighbors(read_operation_data, 1, 0)); + ASSERT_EQ(neighbors, VectorNodeNeighbors({2, 3})); + neighbors = ASSERT_RESULT(storage.GetNeighbors(read_operation_data, 1, 10)); + ASSERT_EQ(neighbors, VectorNodeNeighbors({2})); + + vector = ASSERT_RESULT(storage.GetVector(read_operation_data, 2)); + ASSERT_EQ(vector, GenVector(2)); + neighbors = ASSERT_RESULT(storage.GetNeighbors(read_operation_data, 2, 0)); + ASSERT_EQ(neighbors, VectorNodeNeighbors({3})); + neighbors = ASSERT_RESULT(storage.GetNeighbors(read_operation_data, 2, 20)); + ASSERT_EQ(neighbors, VectorNodeNeighbors({})); + + vector = ASSERT_RESULT(storage.GetVector(read_operation_data, 3)); + ASSERT_TRUE(vector.empty()); // Vector not found + + vector = ASSERT_RESULT(storage.GetVector(read_operation_data, 4)); + ASSERT_EQ(vector, GenVector(4)); + neighbors = ASSERT_RESULT(storage.GetNeighbors(read_operation_data, 4, 0)); + ASSERT_EQ(neighbors, VectorNodeNeighbors({1, 2})); + + vector = ASSERT_RESULT(storage.GetVector(read_operation_data, 5)); + ASSERT_EQ(vector, GenVector(5)); + neighbors = ASSERT_RESULT(storage.GetNeighbors(read_operation_data, 5, 0)); + ASSERT_EQ(neighbors, VectorNodeNeighbors({1, 2, 3})); +} + } // namespace yb::docdb diff --git a/src/yb/docdb/vector_index_read.cc b/src/yb/docdb/vector_index_read.cc new file mode 100644 index 000000000000..3ed801787de0 --- /dev/null +++ b/src/yb/docdb/vector_index_read.cc @@ -0,0 +1,138 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/docdb/vector_index_read.h" + +#include "yb/docdb/docdb_rocksdb_util.h" +#include "yb/docdb/intent_aware_iterator.h" + +#include "yb/util/endian_util.h" + +namespace yb::docdb { + +namespace { + +template +class VectorIndexRead { + public: + using Types = VectorIndexTypes; + using IndexedVector = typename Types::IndexedVector; + + VectorIndexRead( + const DocDB& doc_db, const ReadOperationData& read_operation_data) + : iter_(CreateIntentAwareIterator( + doc_db, + // TODO(!!!) add bloom filter usage. + BloomFilterMode::DONT_USE_BLOOM_FILTER, + boost::none, + rocksdb::kDefaultQueryId, + TransactionOperationContext(), + read_operation_data, + /* file_filter= */ nullptr, + /* iterate_upper_bound= */ nullptr, + FastBackwardScan::kFalse)) { + } + + Result GetVector(VertexId id) { + auto key = MakeVectorIndexKey(id); + iter_->Seek(key); + auto kv = VERIFY_RESULT_REF(iter_->Fetch()); + if (!kv || kv.key != key.AsSlice() || + kv.value.TryConsumeByte(dockv::ValueEntryTypeAsChar::kTombstone)) { + return IndexedVector(); + } + + return dockv::PrimitiveValue::DecodeFloatVector(kv.value); + } + + Result GetNeighbors(VertexId id, VectorIndexLevel level) { + auto vertex_level_key_bytes = MakeVectorIndexKey(id, level); + auto vertex_level_key = vertex_level_key_bytes.AsSlice(); + iter_->Seek(vertex_level_key); + auto kv = VERIFY_RESULT_REF(iter_->Fetch()); + if (!kv || !kv.key.starts_with(vertex_level_key)) { + return VectorNodeNeighbors(); + } + + VectorNodeNeighbors result; + EncodedDocHybridTime full_write_time(EncodedDocHybridTime::kMin); + // The list of neighbors. + if (kv.key == vertex_level_key) { + auto vector = VERIFY_RESULT(dockv::PrimitiveValue::DecodeUInt64Vector(kv.value)); + result.insert(vector.begin(), vector.end()); + full_write_time = kv.write_time; + for (;;) { + // Could be useful to seek in case a lot of calls to next does not move iterator to the next + // key. + kv = VERIFY_RESULT_REF(iter_->FetchNext()); + if (!kv || kv.key != vertex_level_key) { + break; + } + } + } + + auto prev_vertex_id = kInvalidVertexId; + auto vertex_level_key_size = vertex_level_key.size(); + while (kv && kv.key.starts_with(vertex_level_key)) { + if (kv.write_time < full_write_time) { + kv = VERIFY_RESULT_REF(iter_->FetchNext()); + continue; + } + auto vertex_id_slice = kv.key.WithoutPrefix(vertex_level_key_size); + RETURN_NOT_OK(dockv::ConsumeKeyEntryType(vertex_id_slice, dockv::KeyEntryType::kVertexId)); + auto vertex_id = VERIFY_RESULT((CheckedReadFull(vertex_id_slice))); + if (vertex_id != prev_vertex_id) { + auto value_type = dockv::ConsumeValueEntryType(kv.value); + if (value_type == dockv::ValueEntryType::kNullLow) { + result.insert(vertex_id); + } else if (value_type == dockv::ValueEntryType::kTombstone) { + result.erase(vertex_id); + } else { + return STATUS_FORMAT( + Corruption, "Unexpected value type for directed edge: $0 -> $1 at $2: $3", + id, vertex_id, level, value_type); + } + prev_vertex_id = vertex_id; + } + kv = VERIFY_RESULT_REF(iter_->FetchNext()); + } + + return result; + } + + private: + std::unique_ptr iter_; + // TODO(!!!) DeadlineInfo& deadline_info_; +}; + +} // namespace + +template +auto VectorIndexStorage::GetVector( + const ReadOperationData& read_operation_data, VertexId id) + -> Result::IndexedVector> { + VectorIndexRead read(doc_db_, read_operation_data); + return read.GetVector(id); +} + +template +auto VectorIndexStorage::GetNeighbors( + const ReadOperationData& read_operation_data, VertexId id, VectorIndexLevel level) + -> Result { + VectorIndexRead read(doc_db_, read_operation_data); + return read.GetNeighbors(id, level); +} + +template class VectorIndexStorage; + +} // namespace yb::docdb diff --git a/src/yb/docdb/vector_index_read.h b/src/yb/docdb/vector_index_read.h new file mode 100644 index 000000000000..193823e1c00c --- /dev/null +++ b/src/yb/docdb/vector_index_read.h @@ -0,0 +1,40 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#pragma once + +#include "yb/docdb/docdb_fwd.h" +#include "yb/docdb/key_bounds.h" +#include "yb/docdb/vector_index.h" + +namespace yb::docdb { + +template +class VectorIndexStorage : public VectorIndexFetcher { + public: + using typename VectorIndexFetcher::IndexedVector; + + explicit VectorIndexStorage(const DocDB& doc_db) + : doc_db_(doc_db) {} + + Result GetVector( + const ReadOperationData& read_operation_data, VertexId id) override; + Result GetNeighbors( + const ReadOperationData& read_operation_data, VertexId id, VectorIndexLevel level) override; + private: + const DocDB doc_db_; +}; + +using FloatVectorIndexStorage = VectorIndexStorage; + +} // namespace yb::docdb diff --git a/src/yb/docdb/vector_index_update.cc b/src/yb/docdb/vector_index_update.cc index 397a96cd0891..4634b332234b 100644 --- a/src/yb/docdb/vector_index_update.cc +++ b/src/yb/docdb/vector_index_update.cc @@ -72,37 +72,10 @@ auto VectorIndexUpdate::GetLevel(VertexId id, VectorIndexLevel l return node.levels[level]; } -namespace { - -void AppendSubkeys(dockv::KeyBytes& key) { -} - -void AppendSubkey(dockv::KeyBytes& key, VectorIndexLevel level) { - key.AppendKeyEntryType(dockv::KeyEntryType::kUInt32); - key.AppendUInt32(level); -} - -void AppendSubkey(dockv::KeyBytes& key, VertexId id) { - key.AppendKeyEntryType(dockv::KeyEntryType::kUInt64); - key.AppendUInt64(id); -} - -template -void AppendSubkeys(dockv::KeyBytes& key, const T& t, Subkeys&&... subkeys) { - AppendSubkey(key, t); - AppendSubkeys(key, std::forward(subkeys)...); -} - -} // namespace - template template dockv::KeyBytes VectorIndexUpdate::MakeKey(VertexId id, Subkeys&&... subkeys) { - dockv::KeyBytes key; - auto key_entry_value = dockv::KeyEntryValue::VectorVertexId(id); - key_entry_value.AppendToKey(&key); - key.AppendGroupEnd(); - AppendSubkeys(key, std::forward(subkeys)...); + auto key = MakeVectorIndexKey(id, std::forward(subkeys)...); key.AppendKeyEntryType(dockv::KeyEntryType::kHybridTime); key.AppendHybridTime(doc_ht_); diff --git a/src/yb/docdb/vector_index_update.h b/src/yb/docdb/vector_index_update.h index 602058820aef..dcd80e5a5ce2 100644 --- a/src/yb/docdb/vector_index_update.h +++ b/src/yb/docdb/vector_index_update.h @@ -25,7 +25,8 @@ namespace yb::docdb { template class VectorIndexUpdate { public: - using IndexedVector = std::vector; + using Types = VectorIndexTypes; + using IndexedVector = typename Types::IndexedVector; explicit VectorIndexUpdate(HybridTime ht, rocksdb::WriteBatch& write_batch) : doc_ht_(ht), write_batch_(write_batch) {} diff --git a/src/yb/dockv/primitive_value.cc b/src/yb/dockv/primitive_value.cc index 65bdab2645be..7f907bf1f275 100644 --- a/src/yb/dockv/primitive_value.cc +++ b/src/yb/dockv/primitive_value.cc @@ -227,6 +227,65 @@ Status CheckNumberOfBytes(size_t found, size_t expected, const Name& name) { name, found, expected); } +struct FloatReader { + float operator()(const uint8_t*& input) const { + return bit_cast(Read(input)); + } +}; + +struct UInt64Reader { + uint64_t operator()(const uint8_t*& input) const { + return Read(input); + } +}; + +template +Result DoDecodeVectorBody( + Slice slice, ValueEntryType value_type, const Reader& reader) { + size_t size = VERIFY_RESULT((CheckedRead(slice))); + RETURN_NOT_OK(CheckNumberOfBytes( + slice.size(), size * sizeof(typename Vector::value_type), value_type)); + Vector result(size); + + auto* input = slice.data(); + for (size_t i = 0; i != size; ++i) { + result[i] = reader(input); + } + + return result; +} + +template +Status ConsumeEntryType(Slice& slice, EntryType entry_type) { + if (slice.TryConsumeByte(static_cast(entry_type))) { + return Status::OK(); + } + if (slice.empty()) { + return STATUS_FORMAT(Corruption, "Empty slice while expecting $0", entry_type); + } + return STATUS_FORMAT(Corruption, "Invalid entry type $0 while $1 expected", + static_cast(slice[0]), entry_type); +} + +template +Result DoDecodeVector( + Slice slice, ValueEntryType value_type, const Reader& reader) { + RETURN_NOT_OK(ConsumeValueEntryType(slice, value_type)); + return DoDecodeVectorBody(slice, value_type, reader); +} + +template +void AppendEncodedVector( + ValueEntryType value_type, const Vector& v, ValueBuffer& buffer, const Writer& writer) { + auto* out = buffer.GrowByAtLeast( + 1 + sizeof(uint32_t) + v.size() * sizeof(typename Vector::value_type)); + *(out++) = static_cast(value_type); + Write(out, narrow_cast(v.size())); + for (auto entry : v) { + writer(out, entry); + } +} + } // anonymous namespace const PrimitiveValue PrimitiveValue::kInvalid = PrimitiveValue(ValueEntryType::kInvalid); @@ -1353,14 +1412,10 @@ Status PrimitiveValue::DecodeFromValue(const Slice& rocksdb_slice) { } case ValueEntryType::kFloatVector: - return DecodeVector(slice, value_type, float_vector_, [](auto*& input) { - return bit_cast(Read(input)); - }); + return DecodeVector(slice, value_type, float_vector_, FloatReader()); case ValueEntryType::kUInt64Vector: - return DecodeVector(slice, value_type, uint64_vector_, [](auto*& input) { - return Read(input); - }); + return DecodeVector(slice, value_type, uint64_vector_, UInt64Reader()); case ValueEntryType::kInvalid: [[fallthrough]]; case ValueEntryType::kPackedRowV1: [[fallthrough]]; @@ -1375,16 +1430,9 @@ Status PrimitiveValue::DecodeFromValue(const Slice& rocksdb_slice) { template Status PrimitiveValue::DecodeVector( Slice slice, ValueEntryType value_type, Vector*& vector, const Reader& reader) { - size_t size = VERIFY_RESULT((CheckedRead(slice))); - RETURN_NOT_OK(CheckNumberOfBytes( - slice.size(), size * sizeof(typename Vector::value_type), value_type)); + auto temp = VERIFY_RESULT(DoDecodeVectorBody(slice, value_type, reader)); type_ = value_type; - vector = new Vector(size); - - auto* input = slice.data(); - for (size_t i = 0; i != size; ++i) { - (*vector)[i] = reader(input); - } + vector = new Vector(std::move(temp)); return Status::OK(); } @@ -3119,18 +3167,6 @@ bool operator==(const KeyEntryValue& lhs, const KeyEntryValue& rhs) { FATAL_INVALID_ENUM_VALUE(KeyEntryType, lhs.type_); } -template -void PrimitiveValue::AppendEncodedVector( - ValueEntryType value_type, const Vector& v, ValueBuffer& buffer, const Writer& writer) { - auto* out = buffer.GrowByAtLeast( - 1 + sizeof(uint32_t) + v.size() * sizeof(typename Vector::value_type)); - *(out++) = static_cast(value_type); - Write(out, narrow_cast(v.size())); - for (auto entry : v) { - writer(out, entry); - } -} - void PrimitiveValue::AppendEncodedTo(const FloatVector& v, ValueBuffer& buffer) { AppendEncodedVector( dockv::ValueEntryType::kFloatVector, v, buffer, [](auto*& out, float entry) { @@ -3145,6 +3181,16 @@ void PrimitiveValue::AppendEncodedTo(const UInt64Vector& v, ValueBuffer& buffer) }); } +Result PrimitiveValue::DecodeFloatVector(Slice input) { + return DoDecodeVector( + input, dockv::ValueEntryType::kFloatVector, FloatReader()); +} + +Result PrimitiveValue::DecodeUInt64Vector(Slice input) { + return DoDecodeVector( + input, dockv::ValueEntryType::kUInt64Vector, UInt64Reader()); +} + Slice PrimitiveValue::NullSlice() { static const char kBuffer = ValueEntryTypeAsChar::kNullLow; return Slice(&kBuffer, 1); @@ -3155,4 +3201,12 @@ Slice PrimitiveValue::TombstoneSlice() { return Slice(&kBuffer, 1); } +Status ConsumeKeyEntryType(Slice& slice, KeyEntryType key_entry_type) { + return ConsumeEntryType(slice, key_entry_type); +} + +Status ConsumeValueEntryType(Slice& slice, ValueEntryType value_entry_type) { + return ConsumeEntryType(slice, value_entry_type); +} + } // namespace yb::dockv diff --git a/src/yb/dockv/primitive_value.h b/src/yb/dockv/primitive_value.h index a4a800e5fead..4b29d9531150 100644 --- a/src/yb/dockv/primitive_value.h +++ b/src/yb/dockv/primitive_value.h @@ -248,6 +248,9 @@ class PrimitiveValue { static void AppendEncodedTo(const FloatVector& v, ValueBuffer& out); static void AppendEncodedTo(const UInt64Vector& v, ValueBuffer& out); + static Result DecodeFloatVector(Slice input); + static Result DecodeUInt64Vector(Slice input); + template static ValueBuffer Encoded(const T& t) { ValueBuffer value; @@ -305,10 +308,6 @@ class PrimitiveValue { Status DecodeVector( Slice slice, ValueEntryType value_type, Vector*& vector, const Reader& reader); - template - static void AppendEncodedVector( - ValueEntryType value_type, const Vector& v, ValueBuffer& out, const Writer& writer); - // This is used in both the move constructor and the move assignment operator. Assumes this object // has not been constructed, or that the destructor has just been called. void MoveFrom(PrimitiveValue* other); @@ -337,4 +336,7 @@ void AppendEncodedValue(const LWQLValuePB& value, ValueBuffer* out); size_t EncodedValueSize(const QLValuePB& value); size_t EncodedValueSize(const LWQLValuePB& value); +Status ConsumeKeyEntryType(Slice& slice, KeyEntryType key_entry_type); +Status ConsumeValueEntryType(Slice& slice, ValueEntryType value_entry_type); + } // namespace yb::dockv diff --git a/src/yb/dockv/value_type.h b/src/yb/dockv/value_type.h index e100f8ab9c4c..897eb7cba170 100644 --- a/src/yb/dockv/value_type.h +++ b/src/yb/dockv/value_type.h @@ -285,6 +285,10 @@ inline ValueEntryType ConsumeValueEntryType(Slice* slice) { : DecodeValueEntryType(slice->consume_byte()); } +inline ValueEntryType ConsumeValueEntryType(Slice& slice) { + return ConsumeValueEntryType(&slice); +} + inline KeyEntryType DecodeKeyEntryType(const Slice& value) { return value.empty() ? KeyEntryType::kInvalid : static_cast(value.data()[0]); } diff --git a/src/yb/util/endian_util.h b/src/yb/util/endian_util.h index b4868de1a77b..75ab3328d08d 100644 --- a/src/yb/util/endian_util.h +++ b/src/yb/util/endian_util.h @@ -49,4 +49,13 @@ Result CheckedRead(Slice& slice) { return Load(ptr); } +template +Result CheckedReadFull(Slice& slice) { + auto result = CheckedRead(slice); + if (result.ok() && !slice.empty()) { + return STATUS_FORMAT(Corruption, "Extra data: $0", slice.ToDebugHexString()); + } + return result; +} + } // namespace yb