Skip to content

Commit

Permalink
[#23460] DocDB: Read vector index data
Browse files Browse the repository at this point in the history
Summary:
This diff implements reading vector index data stored in DocDB.
Jira: DB-12380

Test Plan: VectorIndexDocDBTest.Read

Reviewers: mbautin, aleksandr.ponomarenko

Reviewed By: mbautin

Subscribers: ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D37155
  • Loading branch information
spolitov committed Aug 12, 2024
1 parent 69d4052 commit 706e97d
Show file tree
Hide file tree
Showing 14 changed files with 453 additions and 64 deletions.
2 changes: 2 additions & 0 deletions src/yb/docdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ set(DOCDB_SRCS
transaction_dump.cc
transaction_status_cache.cc
local_waiting_txn_registry.cc
vector_index.cc
vector_index_read.cc
vector_index_update.cc
wait_queue.cc
)
Expand Down
5 changes: 5 additions & 0 deletions src/yb/docdb/intent_aware_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,11 @@ bool IntentAwareIterator::IsRegularEntryOrderedBeforeResolvedIntent() const {
regular_entry_.key, resolved_intent_sub_doc_key_encoded_.AsSlice());
}

Result<const FetchedEntry&> IntentAwareIterator::FetchNext() {
Next();
return Fetch();
}

Result<const FetchedEntry&> IntentAwareIterator::Fetch() {
#ifndef NDEBUG
need_fetch_ = false;
Expand Down
3 changes: 3 additions & 0 deletions src/yb/docdb/intent_aware_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class IntentAwareIterator final : public IntentAwareIteratorIf {
// contain the DocHybridTime but is returned separately and optionally.
Result<const FetchedEntry&> Fetch() override;

// Utility function to execute Next and retrieve result via Fetch in one call.
Result<const FetchedEntry&> FetchNext();

const ReadHybridTime& read_time() const override { return read_time_; }
Result<HybridTime> RestartReadHt() const override;

Expand Down
32 changes: 32 additions & 0 deletions src/yb/docdb/vector_index.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/docdb/vector_index.h"

namespace yb::docdb {

namespace detail {

void AppendSubkey(dockv::KeyBytes& key, VectorIndexLevel level) {
key.AppendKeyEntryType(dockv::KeyEntryType::kUInt32);
key.AppendUInt32(level);
}

void AppendSubkey(dockv::KeyBytes& key, VertexId id) {
key.AppendKeyEntryType(dockv::KeyEntryType::kVertexId);
key.AppendUInt64(id);
}

} // namespace detail

} // namespace yb::docdb
47 changes: 47 additions & 0 deletions src/yb/docdb/vector_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,59 @@

#pragma once

#include "yb/docdb/docdb_fwd.h"

#include "yb/dockv/key_bytes.h"
#include "yb/dockv/primitive_value.h"

namespace yb::docdb {

using VertexId = uint64_t;
using VectorIndexLevel = uint8_t;
using VectorNodeNeighbors = std::set<VertexId>;
constexpr VertexId kInvalidVertexId = 0;

template <class CoordinateType>
struct VectorIndexTypes {
using IndexedVector = std::vector<CoordinateType>;
};

template <class CoordinateType>
class VectorIndexFetcher {
public:
using Types = VectorIndexTypes<CoordinateType>;
using IndexedVector = typename Types::IndexedVector;

virtual Result<IndexedVector> GetVector(
const ReadOperationData& read_operation_data, VertexId id) = 0;
virtual Result<VectorNodeNeighbors> GetNeighbors(
const ReadOperationData& read_operation_data, VertexId id, VectorIndexLevel level) = 0;
virtual ~VectorIndexFetcher() = default;
};

namespace detail {

void AppendSubkey(dockv::KeyBytes& key, VectorIndexLevel level);
void AppendSubkey(dockv::KeyBytes& key, VertexId id);

inline void AppendSubkeys(dockv::KeyBytes& key) {}

template <class T, class... Subkeys>
void AppendSubkeys(dockv::KeyBytes& key, const T& t, Subkeys&&... subkeys) {
AppendSubkey(key, t);
AppendSubkeys(key, std::forward<Subkeys>(subkeys)...);
}

} // namespace detail

template <class... Subkeys>
dockv::KeyBytes MakeVectorIndexKey(VertexId id, Subkeys&&... subkeys) {
dockv::KeyBytes key;
auto key_entry_value = dockv::KeyEntryValue::VectorVertexId(id);
key_entry_value.AppendToKey(&key);
key.AppendGroupEnd();
detail::AppendSubkeys(key, std::forward<Subkeys>(subkeys)...);
return key;
}

} // namespace yb::docdb
87 changes: 83 additions & 4 deletions src/yb/docdb/vector_index_docdb-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,36 @@
//

#include "yb/docdb/docdb_test_base.h"
#include "yb/docdb/vector_index_read.h"
#include "yb/docdb/vector_index_update.h"

#include "yb/util/range.h"

namespace yb::docdb {

using VITypes = VectorIndexTypes<float>;

class VectorIndexDocDBTest : public DocDBTestBase {
protected:
Schema CreateSchema() override {
return Schema();
}

void WriteSimple();
};

TEST_F(VectorIndexDocDBTest, Update) {
const HybridTime hybrid_time = HybridTime::FromMicros(1000);
VITypes::IndexedVector GenVector(VertexId id) {
return {static_cast<float>(M_E * id), static_cast<float>(M_PI * id)};
}

void VectorIndexDocDBTest::WriteSimple() {
const HybridTime kHybridTime = HybridTime::FromMicros(1000);
constexpr int kNumNodes = 3;
const auto kNodes = Range(1, kNumNodes + 1);
rocksdb::WriteBatch write_batch;
FloatVectorIndexUpdate update(hybrid_time, write_batch);
FloatVectorIndexUpdate update(kHybridTime, write_batch);
for (int i : kNodes) {
update.AddVector(i, {static_cast<float>(M_E * i), static_cast<float>(M_PI * i)});
update.AddVector(i, GenVector(i));
}
for (int i : kNodes) {
update.SetNeighbors(i, /* level= */ 0, Range(i + 1, kNumNodes + 1).ToContainer());
Expand All @@ -43,8 +53,25 @@ TEST_F(VectorIndexDocDBTest, Update) {
update.DeleteDirectedEdge(2, 3, 20);
update.DeleteVector(3);

update.AddVector(4, GenVector(4));
update.SetNeighbors(4, /* level= */ 0, Range(1, 2).ToContainer());
update.SetNeighbors(4, /* level= */ 0, Range(1, 3).ToContainer());

update.AddVector(5, GenVector(5));
update.SetNeighbors(5, /* level= */ 0, Range(1, 3).ToContainer());
update.DeleteDirectedEdge(5, 2, 0);
update.AddDirectedEdge(5, 10, 0);
update.SetNeighbors(5, /* level= */ 0, Range(1, 4).ToContainer());

ASSERT_OK(rocksdb()->Write(write_options(), &write_batch));
}

TEST_F(VectorIndexDocDBTest, Update) {
WriteSimple();

// Please note that in real system we would never get such set of data.
// But it is controlled by higher levels of the system.
// So storage layer just perform its job and store/load requested information.
AssertDocDbDebugDumpStrEq(R"#(
// The vector 1 itself.
SubDocKey(DocKey([], [1]), [HT{ physical: 1000 }]) -> [2.71828174591064, 3.14159274101257]
Expand All @@ -63,7 +90,59 @@ TEST_F(VectorIndexDocDBTest, Update) {
SubDocKey(DocKey([], [3]), [HT{ physical: 1000 w: 2 }]) -> [8.15484523773193, 9.42477798461914]
SubDocKey(DocKey([], [3]), [0; HT{ physical: 1000 w: 5 }]) -> []
SubDocKey(DocKey([], [3]), [30, 1; HT{ physical: 1000 w: 8 }]) -> null
// Vector 4
SubDocKey(DocKey([], [4]), [HT{ physical: 1000 w: 11 }]) -> [10.8731269836426, 12.5663709640503]
// Overwrite vector 4 neighbors in level 0.
SubDocKey(DocKey([], [4]), [0; HT{ physical: 1000 w: 13 }]) -> [1, 2]
SubDocKey(DocKey([], [4]), [0; HT{ physical: 1000 w: 12 }]) -> [1]
// Vector 5
SubDocKey(DocKey([], [5]), [HT{ physical: 1000 w: 14 }]) -> [13.5914087295532, 15.7079629898071]
// Overwrite vector 4 neighbors in level 0. Including overwrite to updates.
SubDocKey(DocKey([], [5]), [0; HT{ physical: 1000 w: 18 }]) -> [1, 2, 3]
SubDocKey(DocKey([], [5]), [0; HT{ physical: 1000 w: 15 }]) -> [1, 2]
SubDocKey(DocKey([], [5]), [0, 2; HT{ physical: 1000 w: 16 }]) -> DEL
SubDocKey(DocKey([], [5]), [0, 10; HT{ physical: 1000 w: 17 }]) -> null
)#");
}

TEST_F(VectorIndexDocDBTest, Read) {
WriteSimple();

const HybridTime kReadHybridTime = HybridTime::FromMicros(1001);

ReadOperationData read_operation_data = {
.deadline = CoarseTimePoint::max(),
.read_time = ReadHybridTime::SingleTime(kReadHybridTime),
};

FloatVectorIndexStorage storage(doc_db());

auto vector = ASSERT_RESULT(storage.GetVector(read_operation_data, 1));
ASSERT_EQ(vector, GenVector(1));
auto neighbors = ASSERT_RESULT(storage.GetNeighbors(read_operation_data, 1, 0));
ASSERT_EQ(neighbors, VectorNodeNeighbors({2, 3}));
neighbors = ASSERT_RESULT(storage.GetNeighbors(read_operation_data, 1, 10));
ASSERT_EQ(neighbors, VectorNodeNeighbors({2}));

vector = ASSERT_RESULT(storage.GetVector(read_operation_data, 2));
ASSERT_EQ(vector, GenVector(2));
neighbors = ASSERT_RESULT(storage.GetNeighbors(read_operation_data, 2, 0));
ASSERT_EQ(neighbors, VectorNodeNeighbors({3}));
neighbors = ASSERT_RESULT(storage.GetNeighbors(read_operation_data, 2, 20));
ASSERT_EQ(neighbors, VectorNodeNeighbors({}));

vector = ASSERT_RESULT(storage.GetVector(read_operation_data, 3));
ASSERT_TRUE(vector.empty()); // Vector not found

vector = ASSERT_RESULT(storage.GetVector(read_operation_data, 4));
ASSERT_EQ(vector, GenVector(4));
neighbors = ASSERT_RESULT(storage.GetNeighbors(read_operation_data, 4, 0));
ASSERT_EQ(neighbors, VectorNodeNeighbors({1, 2}));

vector = ASSERT_RESULT(storage.GetVector(read_operation_data, 5));
ASSERT_EQ(vector, GenVector(5));
neighbors = ASSERT_RESULT(storage.GetNeighbors(read_operation_data, 5, 0));
ASSERT_EQ(neighbors, VectorNodeNeighbors({1, 2, 3}));
}

} // namespace yb::docdb
138 changes: 138 additions & 0 deletions src/yb/docdb/vector_index_read.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/docdb/vector_index_read.h"

#include "yb/docdb/docdb_rocksdb_util.h"
#include "yb/docdb/intent_aware_iterator.h"

#include "yb/util/endian_util.h"

namespace yb::docdb {

namespace {

template <class CoordinateType>
class VectorIndexRead {
public:
using Types = VectorIndexTypes<CoordinateType>;
using IndexedVector = typename Types::IndexedVector;

VectorIndexRead(
const DocDB& doc_db, const ReadOperationData& read_operation_data)
: iter_(CreateIntentAwareIterator(
doc_db,
// TODO(!!!) add bloom filter usage.
BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none,
rocksdb::kDefaultQueryId,
TransactionOperationContext(),
read_operation_data,
/* file_filter= */ nullptr,
/* iterate_upper_bound= */ nullptr,
FastBackwardScan::kFalse)) {
}

Result<IndexedVector> GetVector(VertexId id) {
auto key = MakeVectorIndexKey(id);
iter_->Seek(key);
auto kv = VERIFY_RESULT_REF(iter_->Fetch());
if (!kv || kv.key != key.AsSlice() ||
kv.value.TryConsumeByte(dockv::ValueEntryTypeAsChar::kTombstone)) {
return IndexedVector();
}

return dockv::PrimitiveValue::DecodeFloatVector(kv.value);
}

Result<VectorNodeNeighbors> GetNeighbors(VertexId id, VectorIndexLevel level) {
auto vertex_level_key_bytes = MakeVectorIndexKey(id, level);
auto vertex_level_key = vertex_level_key_bytes.AsSlice();
iter_->Seek(vertex_level_key);
auto kv = VERIFY_RESULT_REF(iter_->Fetch());
if (!kv || !kv.key.starts_with(vertex_level_key)) {
return VectorNodeNeighbors();
}

VectorNodeNeighbors result;
EncodedDocHybridTime full_write_time(EncodedDocHybridTime::kMin);
// The list of neighbors.
if (kv.key == vertex_level_key) {
auto vector = VERIFY_RESULT(dockv::PrimitiveValue::DecodeUInt64Vector(kv.value));
result.insert(vector.begin(), vector.end());
full_write_time = kv.write_time;
for (;;) {
// Could be useful to seek in case a lot of calls to next does not move iterator to the next
// key.
kv = VERIFY_RESULT_REF(iter_->FetchNext());
if (!kv || kv.key != vertex_level_key) {
break;
}
}
}

auto prev_vertex_id = kInvalidVertexId;
auto vertex_level_key_size = vertex_level_key.size();
while (kv && kv.key.starts_with(vertex_level_key)) {
if (kv.write_time < full_write_time) {
kv = VERIFY_RESULT_REF(iter_->FetchNext());
continue;
}
auto vertex_id_slice = kv.key.WithoutPrefix(vertex_level_key_size);
RETURN_NOT_OK(dockv::ConsumeKeyEntryType(vertex_id_slice, dockv::KeyEntryType::kVertexId));
auto vertex_id = VERIFY_RESULT((CheckedReadFull<uint64_t, BigEndian>(vertex_id_slice)));
if (vertex_id != prev_vertex_id) {
auto value_type = dockv::ConsumeValueEntryType(kv.value);
if (value_type == dockv::ValueEntryType::kNullLow) {
result.insert(vertex_id);
} else if (value_type == dockv::ValueEntryType::kTombstone) {
result.erase(vertex_id);
} else {
return STATUS_FORMAT(
Corruption, "Unexpected value type for directed edge: $0 -> $1 at $2: $3",
id, vertex_id, level, value_type);
}
prev_vertex_id = vertex_id;
}
kv = VERIFY_RESULT_REF(iter_->FetchNext());
}

return result;
}

private:
std::unique_ptr<IntentAwareIterator> iter_;
// TODO(!!!) DeadlineInfo& deadline_info_;
};

} // namespace

template <class CoordinateType>
auto VectorIndexStorage<CoordinateType>::GetVector(
const ReadOperationData& read_operation_data, VertexId id)
-> Result<VectorIndexStorage<CoordinateType>::IndexedVector> {
VectorIndexRead<CoordinateType> read(doc_db_, read_operation_data);
return read.GetVector(id);
}

template <class CoordinateType>
auto VectorIndexStorage<CoordinateType>::GetNeighbors(
const ReadOperationData& read_operation_data, VertexId id, VectorIndexLevel level)
-> Result<VectorNodeNeighbors> {
VectorIndexRead<CoordinateType> read(doc_db_, read_operation_data);
return read.GetNeighbors(id, level);
}

template class VectorIndexStorage<float>;

} // namespace yb::docdb
Loading

0 comments on commit 706e97d

Please sign in to comment.