From a42549e3bd939bf3ce7bb1a7c3df66c5c671ea9d Mon Sep 17 00:00:00 2001 From: Sergei Politov Date: Mon, 5 Aug 2024 08:04:04 +0300 Subject: [PATCH] [#23377] DocDB: Implement the way to apply vector index updates to DocDB Summary: In order to manage vector indexes we should have a way to store and load them. Please check implemented format in corresponding GitHub issue. Jira: DB-12299 Test Plan: vector_index_update-test Reviewers: mbautin, timur, aleksandr.ponomarenko Reviewed By: mbautin, aleksandr.ponomarenko Subscribers: ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D36972 --- src/yb/common/doc_hybrid_time.h | 4 + src/yb/docdb/CMakeLists.txt | 2 + src/yb/docdb/docdb_debug.cc | 11 +- src/yb/docdb/vector_index.h | 24 ++++ src/yb/docdb/vector_index_docdb-test.cc | 69 +++++++++++ src/yb/docdb/vector_index_update.cc | 116 ++++++++++++++++++ src/yb/docdb/vector_index_update.h | 62 ++++++++++ src/yb/dockv/key_entry_value.h | 1 + src/yb/dockv/primitive_value.cc | 152 +++++++++++++++++++----- src/yb/dockv/primitive_value.h | 26 ++++ src/yb/dockv/value_type.h | 3 + src/yb/util/endian_util.h | 52 ++++++++ src/yb/util/range.h | 21 ++++ src/yb/util/slice.cc | 4 +- src/yb/util/slice.h | 18 ++- 15 files changed, 520 insertions(+), 45 deletions(-) create mode 100644 src/yb/docdb/vector_index.h create mode 100644 src/yb/docdb/vector_index_docdb-test.cc create mode 100644 src/yb/docdb/vector_index_update.cc create mode 100644 src/yb/docdb/vector_index_update.h create mode 100644 src/yb/util/endian_util.h diff --git a/src/yb/common/doc_hybrid_time.h b/src/yb/common/doc_hybrid_time.h index 9e5b4201114a..0d06eaf4b939 100644 --- a/src/yb/common/doc_hybrid_time.h +++ b/src/yb/common/doc_hybrid_time.h @@ -129,6 +129,10 @@ class DocHybridTime { HybridTime hybrid_time() const { return hybrid_time_; } IntraTxnWriteId write_id() const { return write_id_; } + void IncrementWriteId() { + ++write_id_; + } + // Returns pointer to byte after last used byte. char* EncodedInDocDbFormat(char* dest) const; diff --git a/src/yb/docdb/CMakeLists.txt b/src/yb/docdb/CMakeLists.txt index 7c9b96f4f3e0..b2746bdaa0cf 100644 --- a/src/yb/docdb/CMakeLists.txt +++ b/src/yb/docdb/CMakeLists.txt @@ -78,6 +78,7 @@ set(DOCDB_SRCS transaction_dump.cc transaction_status_cache.cc local_waiting_txn_registry.cc + vector_index_update.cc wait_queue.cc ) @@ -143,6 +144,7 @@ ADD_YB_TEST(shared_lock_manager-test) ADD_YB_TEST(consensus_frontier-test) ADD_YB_TEST(compaction_file_filter-test) ADD_YB_TEST(usearch_vector_index-test) +ADD_YB_TEST(vector_index_docdb-test) if(YB_BUILD_FUZZ_TARGETS) # A library with common code shared between DocDB fuzz tests. diff --git a/src/yb/docdb/docdb_debug.cc b/src/yb/docdb/docdb_debug.cc index 1ff0ffbf850c..4a5551449b9f 100644 --- a/src/yb/docdb/docdb_debug.cc +++ b/src/yb/docdb/docdb_debug.cc @@ -53,15 +53,8 @@ template void ProcessDumpEntry( Slice key, Slice value, SchemaPackingProvider* schema_packing_provider /*null ok*/, StorageDbType db_type, IncludeBinary include_binary, DumpStringFunc func) { - auto [key_str, value_str] = DumpEntryToString(key, value, schema_packing_provider, db_type); - if (!key_str.ok()) { - func(key_str.status().ToString()); - } - if (!value_str.ok()) { - func(value_str.status().CloneAndAppend(". Key: " + *key_str).ToString()); - } else { - func(Format("$0 -> $1", *key_str, *value_str)); - } + auto [key_res, value_res] = DumpEntryToString(key, value, schema_packing_provider, db_type); + func(Format("$0 -> $1", key_res, value_res)); if (include_binary) { func(Format("$0 -> $1\n", FormatSliceAsStr(key), FormatSliceAsStr(value))); } diff --git a/src/yb/docdb/vector_index.h b/src/yb/docdb/vector_index.h new file mode 100644 index 000000000000..d826ffe8b026 --- /dev/null +++ b/src/yb/docdb/vector_index.h @@ -0,0 +1,24 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#pragma once + +#include "yb/dockv/primitive_value.h" + +namespace yb::docdb { + +using VertexId = uint64_t; +using VectorIndexLevel = uint8_t; +using VectorNodeNeighbors = std::set; + +} // namespace yb::docdb diff --git a/src/yb/docdb/vector_index_docdb-test.cc b/src/yb/docdb/vector_index_docdb-test.cc new file mode 100644 index 000000000000..adb7a4719dad --- /dev/null +++ b/src/yb/docdb/vector_index_docdb-test.cc @@ -0,0 +1,69 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/docdb/docdb_test_base.h" +#include "yb/docdb/vector_index_update.h" + +#include "yb/util/range.h" + +namespace yb::docdb { + +class VectorIndexDocDBTest : public DocDBTestBase { + Schema CreateSchema() override { + return Schema(); + } +}; + +TEST_F(VectorIndexDocDBTest, Update) { + const HybridTime hybrid_time = HybridTime::FromMicros(1000); + constexpr int kNumNodes = 3; + const auto kNodes = Range(1, kNumNodes + 1); + rocksdb::WriteBatch write_batch; + FloatVectorIndexUpdate update(hybrid_time, write_batch); + for (int i : kNodes) { + update.AddVector(i, {static_cast(M_E * i), static_cast(M_PI * i)}); + } + for (int i : kNodes) { + update.SetNeighbors(i, /* level= */ 0, Range(i + 1, kNumNodes + 1).ToContainer()); + } + for (int i : kNodes) { + update.AddDirectedEdge(i, (i % kNumNodes) + 1, i * 10); + } + + update.DeleteDirectedEdge(2, 3, 20); + update.DeleteVector(3); + + ASSERT_OK(rocksdb()->Write(write_options(), &write_batch)); + + AssertDocDbDebugDumpStrEq(R"#( + // The vector 1 itself. + SubDocKey(DocKey([], [1]), [HT{ physical: 1000 }]) -> [2.71828174591064, 3.14159274101257] + // The neighbors of the vector 1 in level 0. + SubDocKey(DocKey([], [1]), [0; HT{ physical: 1000 w: 3 }]) -> [2, 3] + // The added edge from vector 1 to vector 2 in level 10. + SubDocKey(DocKey([], [1]), [10, 2; HT{ physical: 1000 w: 6 }]) -> null + // The same for remaining vectors. + SubDocKey(DocKey([], [2]), [HT{ physical: 1000 w: 1 }]) -> [5.43656349182129, 6.28318548202515] + SubDocKey(DocKey([], [2]), [0; HT{ physical: 1000 w: 4 }]) -> [3] + // Delete the edge from vector 2 to vector 3 in level 20. + SubDocKey(DocKey([], [2]), [20, 3; HT{ physical: 1000 w: 9 }]) -> DEL + SubDocKey(DocKey([], [2]), [20, 3; HT{ physical: 1000 w: 7 }]) -> null + // Delete the vector 3. + SubDocKey(DocKey([], [3]), [HT{ physical: 1000 w: 10 }]) -> DEL + SubDocKey(DocKey([], [3]), [HT{ physical: 1000 w: 2 }]) -> [8.15484523773193, 9.42477798461914] + SubDocKey(DocKey([], [3]), [0; HT{ physical: 1000 w: 5 }]) -> [] + SubDocKey(DocKey([], [3]), [30, 1; HT{ physical: 1000 w: 8 }]) -> null + )#"); +} + +} // namespace yb::docdb diff --git a/src/yb/docdb/vector_index_update.cc b/src/yb/docdb/vector_index_update.cc new file mode 100644 index 000000000000..397a96cd0891 --- /dev/null +++ b/src/yb/docdb/vector_index_update.cc @@ -0,0 +1,116 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/docdb/vector_index_update.h" + +#include "yb/dockv/doc_key.h" + +#include "yb/util/decimal.h" + +namespace yb::docdb { + +template +void VectorIndexUpdate::AddVector(VertexId id, IndexedVector vector) { + write_batch_.Put(MakeKey(id).AsSlice(), dockv::PrimitiveValue::Encoded(vector).AsSlice()); + nodes_[id].vector = std::move(vector); +} + +template +void VectorIndexUpdate::DeleteVector(yb::docdb::VertexId id) { + write_batch_.Put(MakeKey(id).AsSlice(), dockv::PrimitiveValue::TombstoneSlice()); + nodes_[id].tombstone = true; +} + +template +void VectorIndexUpdate::SetNeighbors( + VertexId id, VectorIndexLevel level, VectorNodeNeighbors new_neighbors) { + write_batch_.Put( + MakeKey(id, level), + dockv::PrimitiveValue::Encoded( + dockv::UInt64Vector{new_neighbors.begin(), new_neighbors.end()}).AsSlice()); + + GetLevel(id, level).neighbors = std::move(new_neighbors); +} + +template +void VectorIndexUpdate::AddDirectedEdge( + VertexId a, VertexId b, VectorIndexLevel level) { + write_batch_.Put(MakeKey(a, level, b), dockv::PrimitiveValue::NullSlice()); + + auto& vector_info = GetLevel(a, level); + vector_info.neighbors.insert(b); + vector_info.deleted_neighbors.erase(b); +} + +template +void VectorIndexUpdate::DeleteDirectedEdge( + VertexId a, VertexId b, VectorIndexLevel level) { + write_batch_.Put(MakeKey(a, level, b), dockv::PrimitiveValue::TombstoneSlice()); + + auto& vector_info = GetLevel(a, level); + vector_info.neighbors.erase(b); + vector_info.deleted_neighbors.insert(b); +} + +template +auto VectorIndexUpdate::GetLevel(VertexId id, VectorIndexLevel level) -> + VectorIndexUpdate::IndexedVectorLevelInfo& { + auto& node = nodes_[id]; + if (level >= node.levels.size()) { + node.levels.resize(level + 1); + } + return node.levels[level]; +} + +namespace { + +void AppendSubkeys(dockv::KeyBytes& key) { +} + +void AppendSubkey(dockv::KeyBytes& key, VectorIndexLevel level) { + key.AppendKeyEntryType(dockv::KeyEntryType::kUInt32); + key.AppendUInt32(level); +} + +void AppendSubkey(dockv::KeyBytes& key, VertexId id) { + key.AppendKeyEntryType(dockv::KeyEntryType::kUInt64); + key.AppendUInt64(id); +} + +template +void AppendSubkeys(dockv::KeyBytes& key, const T& t, Subkeys&&... subkeys) { + AppendSubkey(key, t); + AppendSubkeys(key, std::forward(subkeys)...); +} + +} // namespace + +template +template +dockv::KeyBytes VectorIndexUpdate::MakeKey(VertexId id, Subkeys&&... subkeys) { + dockv::KeyBytes key; + auto key_entry_value = dockv::KeyEntryValue::VectorVertexId(id); + key_entry_value.AppendToKey(&key); + key.AppendGroupEnd(); + AppendSubkeys(key, std::forward(subkeys)...); + key.AppendKeyEntryType(dockv::KeyEntryType::kHybridTime); + key.AppendHybridTime(doc_ht_); + + doc_ht_.IncrementWriteId(); + + return key; +} + +template class VectorIndexUpdate; + +} // namespace yb::docdb diff --git a/src/yb/docdb/vector_index_update.h b/src/yb/docdb/vector_index_update.h new file mode 100644 index 000000000000..602058820aef --- /dev/null +++ b/src/yb/docdb/vector_index_update.h @@ -0,0 +1,62 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#pragma once + +#include +#include + +#include "yb/docdb/vector_index.h" + +#include "yb/rocksdb/write_batch.h" + +namespace yb::docdb { + +template +class VectorIndexUpdate { + public: + using IndexedVector = std::vector; + + explicit VectorIndexUpdate(HybridTime ht, rocksdb::WriteBatch& write_batch) + : doc_ht_(ht), write_batch_(write_batch) {} + + void AddVector(VertexId id, IndexedVector v); + void DeleteVector(VertexId id); + void SetNeighbors(VertexId id, VectorIndexLevel level, VectorNodeNeighbors new_neighbors); + void AddDirectedEdge(VertexId a, VertexId b, VectorIndexLevel level); + void DeleteDirectedEdge(VertexId a, VertexId b, VectorIndexLevel level); + + private: + struct IndexedVectorLevelInfo { + VectorNodeNeighbors neighbors; + VectorNodeNeighbors deleted_neighbors; + }; + + IndexedVectorLevelInfo& GetLevel(VertexId id, VectorIndexLevel level); + template + dockv::KeyBytes MakeKey(VertexId id, Subkeys&&... subkeys); + + struct IndexedVectorInfo { + bool tombstone = false; + IndexedVector vector; + std::vector levels; + }; + + DocHybridTime doc_ht_; + std::unordered_map nodes_; + rocksdb::WriteBatch& write_batch_; +}; + +using FloatVectorIndexUpdate = VectorIndexUpdate; + +} // namespace yb::docdb diff --git a/src/yb/dockv/key_entry_value.h b/src/yb/dockv/key_entry_value.h index b019130e0753..1ee4ab85f728 100644 --- a/src/yb/dockv/key_entry_value.h +++ b/src/yb/dockv/key_entry_value.h @@ -149,6 +149,7 @@ class KeyEntryValue { static KeyEntryValue UInt32(uint32_t v, SortOrder sort_order = SortOrder::kAscending); static KeyEntryValue Int64(int64_t v, SortOrder sort_order = SortOrder::kAscending); static KeyEntryValue UInt64(uint64_t v, SortOrder sort_order = SortOrder::kAscending); + static KeyEntryValue VectorVertexId(uint64_t v); static KeyEntryValue MakeTimestamp( const Timestamp& timestamp, SortOrder sort_order = SortOrder::kAscending); static KeyEntryValue MakeInetAddress( diff --git a/src/yb/dockv/primitive_value.cc b/src/yb/dockv/primitive_value.cc index afe5cfcb2854..65bdab2645be 100644 --- a/src/yb/dockv/primitive_value.cc +++ b/src/yb/dockv/primitive_value.cc @@ -38,6 +38,7 @@ #include "yb/util/bytes_formatter.h" #include "yb/util/compare_util.h" #include "yb/util/decimal.h" +#include "yb/util/endian_util.h" #include "yb/util/fast_varint.h" #include "yb/util/net/inetaddress.h" #include "yb/util/result.h" @@ -60,17 +61,19 @@ using yb::util::DecodeDoubleFromKey; // default clause so that we can ensure that we're handling all possible primitive value types // at compile time. #define IGNORE_NON_PRIMITIVE_VALUE_TYPES_IN_SWITCH \ - case ValueEntryType::kArray: FALLTHROUGH_INTENDED; \ - case ValueEntryType::kInvalid: FALLTHROUGH_INTENDED; \ - case ValueEntryType::kJsonb: FALLTHROUGH_INTENDED; \ - case ValueEntryType::kObject: FALLTHROUGH_INTENDED; \ - case ValueEntryType::kPackedRowV1: FALLTHROUGH_INTENDED; \ - case ValueEntryType::kPackedRowV2: FALLTHROUGH_INTENDED; \ - case ValueEntryType::kRedisList: FALLTHROUGH_INTENDED; \ - case ValueEntryType::kRedisSet: FALLTHROUGH_INTENDED; \ - case ValueEntryType::kRedisSortedSet: FALLTHROUGH_INTENDED; \ - case ValueEntryType::kRedisTS: FALLTHROUGH_INTENDED; \ - case ValueEntryType::kRowLock: FALLTHROUGH_INTENDED; \ + case ValueEntryType::kArray: [[fallthrough]]; \ + case ValueEntryType::kInvalid: [[fallthrough]]; \ + case ValueEntryType::kJsonb: [[fallthrough]]; \ + case ValueEntryType::kObject: [[fallthrough]]; \ + case ValueEntryType::kPackedRowV1: [[fallthrough]]; \ + case ValueEntryType::kPackedRowV2: [[fallthrough]]; \ + case ValueEntryType::kRedisList: [[fallthrough]]; \ + case ValueEntryType::kRedisSet: [[fallthrough]]; \ + case ValueEntryType::kRedisSortedSet: [[fallthrough]]; \ + case ValueEntryType::kRedisTS: [[fallthrough]]; \ + case ValueEntryType::kRowLock: [[fallthrough]]; \ + case ValueEntryType::kFloatVector: [[fallthrough]]; \ + case ValueEntryType::kUInt64Vector: [[fallthrough]]; \ case ValueEntryType::kTombstone: \ break @@ -100,6 +103,8 @@ namespace yb::dockv { namespace { +using VectorEndian = LittleEndian; + bool IsTrue(ValueEntryType type) { return type == ValueEntryType::kTrue; } @@ -213,11 +218,21 @@ inline bool IsCollationEncodedString(Slice val) { return !val.empty() && val[0] == '\0'; } +template +Status CheckNumberOfBytes(size_t found, size_t expected, const Name& name) { + if (found == expected) { + return Status::OK(); + } + return STATUS_FORMAT(Corruption, "Invalid number of bytes to decode $0: $1, need $2", + name, found, expected); +} + } // anonymous namespace const PrimitiveValue PrimitiveValue::kInvalid = PrimitiveValue(ValueEntryType::kInvalid); const PrimitiveValue PrimitiveValue::kTombstone = PrimitiveValue(ValueEntryType::kTombstone); const PrimitiveValue PrimitiveValue::kObject = PrimitiveValue(ValueEntryType::kObject); +const PrimitiveValue PrimitiveValue::kNull = PrimitiveValue(ValueEntryType::kNullLow); const KeyEntryValue KeyEntryValue::kLivenessColumn = KeyEntryValue::SystemColumnId( SystemColumnIds::kLivenessColumn); @@ -311,6 +326,10 @@ std::string PrimitiveValue::ValueToString() const { return Substitute("SubTransactionId($0)", uint32_val_); case ValueEntryType::kWriteId: return Format("WriteId($0)", int32_val_); + case ValueEntryType::kFloatVector: + return AsString(*float_vector_); + case ValueEntryType::kUInt64Vector: + return AsString(*uint64_vector_); case ValueEntryType::kMaxByte: return "0xff"; } @@ -391,6 +410,7 @@ void KeyEntryValue::AppendToKey(KeyBytes* key_bytes) const { return; case KeyEntryType::kUInt64: + case KeyEntryType::kVertexId: key_bytes->AppendUInt64(uint64_val_); return; @@ -953,6 +973,7 @@ Status KeyEntryValue::DecodeKey(Slice* slice, KeyEntryValue* out) { case KeyEntryType::kUInt64Descending: FALLTHROUGH_INTENDED; case KeyEntryType::kUInt64: + case KeyEntryType::kVertexId: if (slice->size() < sizeof(uint64_t)) { return STATUS_SUBSTITUTE(Corruption, "Not enough bytes to decode a 64-bit integer: $0", @@ -1215,10 +1236,7 @@ Status PrimitiveValue::DecodeFromValue(const Slice& rocksdb_slice) { return Status::OK(); case ValueEntryType::kGinNull: - if (slice.size() != sizeof(uint8_t)) { - return STATUS_FORMAT(Corruption, "Invalid number of bytes for a $0: $1", - value_type, slice.size()); - } + RETURN_NOT_OK(CheckNumberOfBytes(slice.size(), sizeof(uint8_t), value_type)); type_ = value_type; gin_null_val_ = slice.data()[0]; return Status::OK(); @@ -1226,10 +1244,7 @@ Status PrimitiveValue::DecodeFromValue(const Slice& rocksdb_slice) { case ValueEntryType::kInt32: FALLTHROUGH_INTENDED; case ValueEntryType::kWriteId: FALLTHROUGH_INTENDED; case ValueEntryType::kFloat: - if (slice.size() != sizeof(int32_t)) { - return STATUS_FORMAT(Corruption, "Invalid number of bytes for a $0: $1", - value_type, slice.size()); - } + RETURN_NOT_OK(CheckNumberOfBytes(slice.size(), sizeof(int32_t), value_type)); type_ = value_type; int32_val_ = BigEndian::Load32(slice.data()); return Status::OK(); @@ -1337,6 +1352,16 @@ Status PrimitiveValue::DecodeFromValue(const Slice& rocksdb_slice) { return Status::OK(); } + case ValueEntryType::kFloatVector: + return DecodeVector(slice, value_type, float_vector_, [](auto*& input) { + return bit_cast(Read(input)); + }); + + case ValueEntryType::kUInt64Vector: + return DecodeVector(slice, value_type, uint64_vector_, [](auto*& input) { + return Read(input); + }); + case ValueEntryType::kInvalid: [[fallthrough]]; case ValueEntryType::kPackedRowV1: [[fallthrough]]; case ValueEntryType::kPackedRowV2: [[fallthrough]]; @@ -1347,6 +1372,22 @@ Status PrimitiveValue::DecodeFromValue(const Slice& rocksdb_slice) { false, Corruption, "Wrong value type $0 in $1", value_type, rocksdb_slice.ToDebugHexString()); } +template +Status PrimitiveValue::DecodeVector( + Slice slice, ValueEntryType value_type, Vector*& vector, const Reader& reader) { + size_t size = VERIFY_RESULT((CheckedRead(slice))); + RETURN_NOT_OK(CheckNumberOfBytes( + slice.size(), size * sizeof(typename Vector::value_type), value_type)); + type_ = value_type; + vector = new Vector(size); + + auto* input = slice.data(); + for (size_t i = 0; i != size; ++i) { + (*vector)[i] = reader(input); + } + return Status::OK(); +} + Status PrimitiveValue::DecodeToQLValuePB( const Slice& rocksdb_slice, DataType data_type, QLValuePB* ql_value) { @@ -1514,10 +1555,7 @@ Status PrimitiveValue::DecodeToQLValuePB( case ValueEntryType::kTransactionId: FALLTHROUGH_INTENDED; case ValueEntryType::kTableId: FALLTHROUGH_INTENDED; case ValueEntryType::kUuid: { - if (slice.size() != kUuidSize) { - return STATUS_FORMAT(Corruption, "Invalid number of bytes to decode Uuid: $0, need $1", - slice.size(), kUuidSize); - } + RETURN_NOT_OK(CheckNumberOfBytes(slice.size(), kUuidSize, "Uuid")); Uuid uuid = VERIFY_RESULT(Uuid::FromComparable(slice)); if (data_type == DataType::UUID) { QLValue::set_uuid_value(uuid, ql_value); @@ -1551,13 +1589,15 @@ Status PrimitiveValue::DecodeToQLValuePB( break; } - case ValueEntryType::kObject: FALLTHROUGH_INTENDED; - case ValueEntryType::kArray: FALLTHROUGH_INTENDED; - case ValueEntryType::kRedisList: FALLTHROUGH_INTENDED; - case ValueEntryType::kRedisSet: FALLTHROUGH_INTENDED; - case ValueEntryType::kRedisTS: FALLTHROUGH_INTENDED; - case ValueEntryType::kRedisSortedSet: FALLTHROUGH_INTENDED; - case ValueEntryType::kGinNull: + case ValueEntryType::kObject: [[fallthrough]]; + case ValueEntryType::kArray: [[fallthrough]]; + case ValueEntryType::kRedisList: [[fallthrough]]; + case ValueEntryType::kRedisSet: [[fallthrough]]; + case ValueEntryType::kRedisTS: [[fallthrough]]; + case ValueEntryType::kRedisSortedSet: [[fallthrough]]; + case ValueEntryType::kGinNull: [[fallthrough]]; + case ValueEntryType::kFloatVector: [[fallthrough]]; + case ValueEntryType::kUInt64Vector: break; case ValueEntryType::kInvalid: [[fallthrough]]; @@ -1862,6 +1902,10 @@ PrimitiveValue::~PrimitiveValue() { delete inetaddress_val_; } else if (type_ == ValueEntryType::kFrozen) { delete frozen_val_; + } else if (type_ == ValueEntryType::kFloatVector) { + delete float_vector_; + } else if (type_ == ValueEntryType::kUInt64Vector) { + delete uint64_vector_; } // HybridTime does not need its destructor to be called, because it is a simple wrapper over an // unsigned 64-bit integer. @@ -2453,6 +2497,13 @@ KeyEntryValue KeyEntryValue::GinNull(uint8_t v) { return result; } +KeyEntryValue KeyEntryValue::VectorVertexId(uint64_t v) { + KeyEntryValue result; + result.type_ = KeyEntryType::kVertexId; + result.uint64_val_ = v; + return result; +} + KeyEntryValue::~KeyEntryValue() { Destroy(); } @@ -2676,7 +2727,8 @@ std::string KeyEntryValue::ToString(AutoDecodeKeys auto_decode_keys) const { case KeyEntryType::kUInt32: case KeyEntryType::kUInt32Descending: return std::to_string(uint32_val_); - case KeyEntryType::kUInt64: FALLTHROUGH_INTENDED; + case KeyEntryType::kUInt64: [[fallthrough]]; + case KeyEntryType::kVertexId: [[fallthrough]]; case KeyEntryType::kUInt64Descending: return std::to_string(uint64_val_); case KeyEntryType::kInt64Descending: FALLTHROUGH_INTENDED; @@ -2791,6 +2843,7 @@ int KeyEntryValue::CompareTo(const KeyEntryValue& other) const { case KeyEntryType::kUInt64Descending: return CompareUsingLessThan(other.uint64_val_, uint64_val_); case KeyEntryType::kUInt64: + case KeyEntryType::kVertexId: return CompareUsingLessThan(uint64_val_, other.uint64_val_); case KeyEntryType::kInt64: FALLTHROUGH_INTENDED; case KeyEntryType::kArrayIndex: @@ -3011,6 +3064,7 @@ bool operator==(const KeyEntryValue& lhs, const KeyEntryValue& rhs) { case KeyEntryType::kUInt64Descending: FALLTHROUGH_INTENDED; case KeyEntryType::kUInt64: + case KeyEntryType::kVertexId: return lhs.uint64_val_ == rhs.uint64_val_; case KeyEntryType::kInt64Descending: FALLTHROUGH_INTENDED; @@ -3065,4 +3119,40 @@ bool operator==(const KeyEntryValue& lhs, const KeyEntryValue& rhs) { FATAL_INVALID_ENUM_VALUE(KeyEntryType, lhs.type_); } +template +void PrimitiveValue::AppendEncodedVector( + ValueEntryType value_type, const Vector& v, ValueBuffer& buffer, const Writer& writer) { + auto* out = buffer.GrowByAtLeast( + 1 + sizeof(uint32_t) + v.size() * sizeof(typename Vector::value_type)); + *(out++) = static_cast(value_type); + Write(out, narrow_cast(v.size())); + for (auto entry : v) { + writer(out, entry); + } +} + +void PrimitiveValue::AppendEncodedTo(const FloatVector& v, ValueBuffer& buffer) { + AppendEncodedVector( + dockv::ValueEntryType::kFloatVector, v, buffer, [](auto*& out, float entry) { + Write(out, bit_cast(util::CanonicalizeFloat(entry))); + }); +} + +void PrimitiveValue::AppendEncodedTo(const UInt64Vector& v, ValueBuffer& buffer) { + AppendEncodedVector( + dockv::ValueEntryType::kUInt64Vector, v, buffer, [](auto*& out, uint64_t entry) { + Write(out, entry); + }); +} + +Slice PrimitiveValue::NullSlice() { + static const char kBuffer = ValueEntryTypeAsChar::kNullLow; + return Slice(&kBuffer, 1); +} + +Slice PrimitiveValue::TombstoneSlice() { + static const char kBuffer = ValueEntryTypeAsChar::kTombstone; + return Slice(&kBuffer, 1); +} + } // namespace yb::dockv diff --git a/src/yb/dockv/primitive_value.h b/src/yb/dockv/primitive_value.h index d1a8006b54d3..a4a800e5fead 100644 --- a/src/yb/dockv/primitive_value.h +++ b/src/yb/dockv/primitive_value.h @@ -47,11 +47,15 @@ YB_DEFINE_ENUM(ListExtendOrder, (APPEND)(PREPEND_BLOCK)(PREPEND)) // A necessary use of a forward declaration to avoid circular inclusion. class SubDocument; +using FloatVector = std::vector; +using UInt64Vector = std::vector; + class PrimitiveValue { public: static const PrimitiveValue kInvalid; static const PrimitiveValue kTombstone; static const PrimitiveValue kObject; + static const PrimitiveValue kNull; using Type = ValueEntryType; @@ -241,6 +245,19 @@ class PrimitiveValue { write_time_ = write_time; } + static void AppendEncodedTo(const FloatVector& v, ValueBuffer& out); + static void AppendEncodedTo(const UInt64Vector& v, ValueBuffer& out); + + template + static ValueBuffer Encoded(const T& t) { + ValueBuffer value; + AppendEncodedTo(t, value); + return value; + } + + static Slice NullSlice(); + static Slice TombstoneSlice(); + protected: static constexpr int64_t kUninitializedWriteTime = std::numeric_limits::min(); @@ -276,12 +293,21 @@ class PrimitiveValue { // This is used in SubDocument to hold a pointer to a map or a vector. void* complex_data_structure_; uint8_t gin_null_val_; + FloatVector* float_vector_; + UInt64Vector* uint64_vector_; }; private: template static PrimitiveValue DoFromQLValuePB(const PB& value); + template + Status DecodeVector( + Slice slice, ValueEntryType value_type, Vector*& vector, const Reader& reader); + + template + static void AppendEncodedVector( + ValueEntryType value_type, const Vector& v, ValueBuffer& out, const Writer& writer); // This is used in both the move constructor and the move assignment operator. Assumes this object // has not been constructed, or that the destructor has just been called. diff --git a/src/yb/dockv/value_type.h b/src/yb/dockv/value_type.h index 27817d04ae2f..e100f8ab9c4c 100644 --- a/src/yb/dockv/value_type.h +++ b/src/yb/dockv/value_type.h @@ -95,6 +95,7 @@ namespace yb::dockv { ((kString, 'S')) /* ASCII code 83 */ \ ((kTrue, 'T')) /* ASCII code 84 */ \ ((kUInt64, 'U')) /* ASCII code 85 */ \ + ((kVertexId, 'V')) /* ASCII code 86 */ \ ((kExternalIntents, 'Z')) /* ASCII code 90 */ \ ((kArrayIndex, '[')) /* ASCII code 91 */ \ ((kCollString, '\\')) /* ASCII code 92 */ \ @@ -175,6 +176,8 @@ namespace yb::dockv { ((kString, 'S')) /* ASCII code 83 */ \ ((kTrue, 'T')) /* ASCII code 84 */ \ ((kUInt64, 'U')) /* ASCII code 85 */ \ + ((kFloatVector, 'V')) /* ASCII code 86 */ \ + ((kUInt64Vector, 'W')) /* ASCII code 87 */ \ ((kTombstone, 'X')) /* ASCII code 88 */ \ ((kArrayIndex, '[')) /* ASCII code 91 */ \ ((kCollString, '\\')) /* ASCII code 92 */ \ diff --git a/src/yb/util/endian_util.h b/src/yb/util/endian_util.h new file mode 100644 index 000000000000..b4868de1a77b --- /dev/null +++ b/src/yb/util/endian_util.h @@ -0,0 +1,52 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#pragma once + +#include "yb/gutil/endian.h" + +#include "yb/util/slice.h" +#include "yb/util/status.h" + +namespace yb { + +// Write value encoded with specified endian and update output address. +template +std::enable_if_t Write(SingleByteType*& p, T v) { + Store(p, v); + p += sizeof(T); +} + +// Read value encoded with specified endian and update input address. +template +std::enable_if_t Read(SingleByteType*& p) { + auto ptr = p; + p += sizeof(T); + return Load(ptr); +} + +// Read value encoded with specified endian from slice and remove read prefix. +// Return failure if value cannot be read from the slice. +template +Result CheckedRead(Slice& slice) { + if (slice.size() < sizeof(T)) { + return STATUS_FORMAT( + Corruption, "Not enough bytes to read: $0, need $1", slice.size(), sizeof(T)); + } + + auto ptr = slice.data(); + slice.RemovePrefix(sizeof(T)); + return Load(ptr); +} + +} // namespace yb diff --git a/src/yb/util/range.h b/src/yb/util/range.h index 4f3ed8c44c67..bfd94e830101 100644 --- a/src/yb/util/range.h +++ b/src/yb/util/range.h @@ -108,6 +108,23 @@ class RangeIterator : public std::iterator Int step_; }; +template +class RangeObject; + +template +class RangeObjectToContainerHelper { + public: + explicit RangeObjectToContainerHelper(const RangeObject& range) : range_(range) {} + + template + operator Out() const { + return Out(range_.begin(), range_.end()); + } + + private: + const RangeObject& range_; +}; + template class RangeObject { public: @@ -142,6 +159,10 @@ class RangeObject { return {new_start, new_stop, new_step}; } + RangeObjectToContainerHelper ToContainer() const { + return RangeObjectToContainerHelper(*this); + } + private: [[nodiscard]] static Int NormalizedStop(Int start, Int stop, Int step) { auto diff = stop - start; diff --git a/src/yb/util/slice.cc b/src/yb/util/slice.cc index 5994717832ab..725cc1e2e974 100644 --- a/src/yb/util/slice.cc +++ b/src/yb/util/slice.cc @@ -145,7 +145,7 @@ uint8_t Slice::operator[](size_t n) const { return begin_[n]; } -void Slice::remove_prefix(size_t n) { +void Slice::RemovePrefix(size_t n) { DCHECK_LE(n, size()); begin_ += n; } @@ -164,7 +164,7 @@ Slice Slice::WithoutPrefix(size_t n) const { return Slice(begin_ + n, end_); } -void Slice::remove_suffix(size_t n) { +void Slice::RemoveSuffix(size_t n) { DCHECK_LE(n, size()); end_ -= n; } diff --git a/src/yb/util/slice.h b/src/yb/util/slice.h index d3c845115603..e8166b6d8fc0 100644 --- a/src/yb/util/slice.h +++ b/src/yb/util/slice.h @@ -131,13 +131,21 @@ class Slice { uint8_t operator[](size_t n) const; // Change this slice to refer to an empty array - void clear() { + void Clear() { begin_ = to_uchar_ptr(""); end_ = begin_; } + [[deprecated]] void clear() { + Clear(); + } + // Drop the first "n" bytes from this slice. - void remove_prefix(size_t n); + void RemovePrefix(size_t n); + + [[deprecated]] void remove_prefix(size_t n) { + RemovePrefix(n); + } Slice Prefix(size_t n) const; @@ -147,7 +155,11 @@ class Slice { Slice WithoutPrefix(size_t n) const; // Drop the last "n" bytes from this slice. - void remove_suffix(size_t n); + void RemoveSuffix(size_t n); + + [[deprecated]] void remove_suffix(size_t n) { + RemoveSuffix(n); + } Slice Suffix(size_t n) const;