diff --git a/pkg/sql/vecindex/BUILD.bazel b/pkg/sql/vecindex/BUILD.bazel index b11f0a052005..3760e61bf04a 100644 --- a/pkg/sql/vecindex/BUILD.bazel +++ b/pkg/sql/vecindex/BUILD.bazel @@ -48,6 +48,7 @@ go_test( "//pkg/util/log", "//pkg/util/num32", "//pkg/util/stop", + "//pkg/util/timeutil", "//pkg/util/vector", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/sql/vecindex/fixup_processor.go b/pkg/sql/vecindex/fixup_processor.go index 98c927f1c04e..391b22bc64f0 100644 --- a/pkg/sql/vecindex/fixup_processor.go +++ b/pkg/sql/vecindex/fixup_processor.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/vecindex/internal" "github.com/cockroachdb/cockroach/pkg/sql/vecindex/vecstore" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/num32" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/vector" "github.com/cockroachdb/errors" @@ -334,18 +335,51 @@ func (fp *fixupProcessor) splitPartition( tempLeftOffsets, tempRightOffsets := kmeans.Compute(&vectors, tempOffsets) leftSplit, rightSplit := fp.splitPartitionData( - ctx, partition, &vectors, tempLeftOffsets, tempRightOffsets) + ctx, partition, vectors, tempLeftOffsets, tempRightOffsets) if parentPartition != nil { // De-link the splitting partition from its parent partition. childKey := vecstore.ChildKey{PartitionKey: partitionKey} - _, err = fp.index.removeFromPartition(ctx, txn, parentPartitionKey, childKey) + count, err := fp.index.removeFromPartition(ctx, txn, parentPartitionKey, childKey) if err != nil { return errors.Wrapf(err, "removing splitting partition %d from its parent %d", partitionKey, parentPartitionKey) } - // TODO(andyk): Move vectors to/from split partition. + if count != 0 { + // Move any vectors to sibling partitions that have closer centroids. + // Lazily get parent vectors only if they're actually needed. + var parentVectors vector.Set + getParentVectors := func() (vector.Set, error) { + if parentVectors.Dims != 0 { + return parentVectors, nil + } + var err error + parentVectors, err = fp.getFullVectorsForPartition( + ctx, txn, parentPartitionKey, parentPartition) + return parentVectors, err + } + + err = fp.moveVectorsToSiblings( + ctx, txn, parentPartitionKey, parentPartition, getParentVectors, partitionKey, &leftSplit) + if err != nil { + return err + } + err = fp.moveVectorsToSiblings( + ctx, txn, parentPartitionKey, parentPartition, getParentVectors, partitionKey, &rightSplit) + if err != nil { + return err + } + + // Move any vectors at the same level that are closer to the new split + // centroids than they are to their own centroids. + if err = fp.linkNearbyVectors(ctx, txn, partitionKey, leftSplit.Partition); err != nil { + return err + } + if err = fp.linkNearbyVectors(ctx, txn, partitionKey, rightSplit.Partition); err != nil { + return err + } + } } // Insert the two new partitions into the index. This only adds their data @@ -392,23 +426,19 @@ func (fp *fixupProcessor) splitPartition( // Link the two new partitions into the K-means tree by inserting them // into the parent level. This can trigger a further split, this time of // the parent level. - fp.searchCtx = searchContext{ - Ctx: ctx, - Workspace: fp.workspace, - Txn: txn, - Level: parentPartition.Level() + 1, - } + searchCtx := fp.reuseSearchContext(ctx, txn) + searchCtx.Level = parentPartition.Level() + 1 - fp.searchCtx.Randomized = leftSplit.Partition.Centroid() + searchCtx.Randomized = leftSplit.Partition.Centroid() childKey := vecstore.ChildKey{PartitionKey: leftPartitionKey} - err = fp.index.insertHelper(&fp.searchCtx, childKey, true /* allowRetry */) + err = fp.index.insertHelper(searchCtx, childKey, true /* allowRetry */) if err != nil { return errors.Wrapf(err, "inserting left partition for split of partition %d", partitionKey) } - fp.searchCtx.Randomized = rightSplit.Partition.Centroid() + searchCtx.Randomized = rightSplit.Partition.Centroid() childKey = vecstore.ChildKey{PartitionKey: rightPartitionKey} - err = fp.index.insertHelper(&fp.searchCtx, childKey, true /* allowRetry */) + err = fp.index.insertHelper(searchCtx, childKey, true /* allowRetry */) if err != nil { return errors.Wrapf(err, "inserting right partition for split of partition %d", partitionKey) } @@ -432,7 +462,7 @@ func (fp *fixupProcessor) splitPartition( func (fp *fixupProcessor) splitPartitionData( ctx context.Context, splitPartition *vecstore.Partition, - vectors *vector.Set, + vectors vector.Set, leftOffsets, rightOffsets []uint64, ) (leftSplit, rightSplit splitData) { // Copy centroid distances and child keys so they can be split. @@ -461,7 +491,8 @@ func (fp *fixupProcessor) splitPartitionData( right := int(rightOffsets[ri]) if right >= len(leftOffsets) { - panic("expected equal number of left and right offsets that need to be swapped") + panic(errors.AssertionFailedf( + "expected equal number of left and right offsets that need to be swapped")) } // Swap vectors. @@ -480,22 +511,172 @@ func (fp *fixupProcessor) splitPartitionData( ri++ } - leftVectorSet := *vectors + leftVectorSet := vectors rightVectorSet := leftVectorSet.SplitAt(len(leftOffsets)) leftCentroidDistances := centroidDistances[:len(leftOffsets):len(leftOffsets)] leftChildKeys := childKeys[:len(leftOffsets):len(leftOffsets)] - leftSplit.Init(ctx, fp.index.quantizer, &leftVectorSet, + leftSplit.Init(ctx, fp.index.quantizer, leftVectorSet, leftCentroidDistances, leftChildKeys, splitPartition.Level()) rightCentroidDistances := centroidDistances[len(leftOffsets):] rightChildKeys := childKeys[len(leftOffsets):] - rightSplit.Init(ctx, fp.index.quantizer, &rightVectorSet, + rightSplit.Init(ctx, fp.index.quantizer, rightVectorSet, rightCentroidDistances, rightChildKeys, splitPartition.Level()) return leftSplit, rightSplit } +// moveVectorsToSiblings checks each vector in the new split partition to see if +// it's now closer to a sibling partition's centroid than it is to its own +// centroid. If that's true, then move the vector to the sibling partition. Pass +// function to lazily fetch parent vectors, as it's expensive and is only needed +// if vectors actually need to be moved. +func (fp *fixupProcessor) moveVectorsToSiblings( + ctx context.Context, + txn vecstore.Txn, + parentPartitionKey vecstore.PartitionKey, + parentPartition *vecstore.Partition, + getParentVectors func() (vector.Set, error), + oldPartitionKey vecstore.PartitionKey, + split *splitData, +) error { + for i := 0; i < split.Vectors.Count; i++ { + if split.Vectors.Count == 1 && split.Partition.Level() != vecstore.LeafLevel { + // Don't allow so many vectors to be moved that a non-leaf partition + // ends up empty. This would violate a key constraint that the K-means + // tree is always fully balanced. + break + } + + vector := split.Vectors.At(i) + + // If distance to new centroid is <= distance to old centroid, then skip. + newCentroidDistance := split.Partition.QuantizedSet().GetCentroidDistances()[i] + if newCentroidDistance <= split.OldCentroidDistances[i] { + continue + } + + // Get the full vectors for the parent partition's children. + parentVectors, err := getParentVectors() + if err != nil { + return err + } + + // Check whether the vector is closer to a sibling centroid than its own + // new centroid. + minDistanceOffset := -1 + for parent := 0; parent < parentVectors.Count; parent++ { + squaredDistance := num32.L2Distance(parentVectors.At(parent), vector) + if squaredDistance < newCentroidDistance { + newCentroidDistance = squaredDistance + minDistanceOffset = parent + } + } + if minDistanceOffset == -1 { + continue + } + + siblingPartitionKey := parentPartition.ChildKeys()[minDistanceOffset].PartitionKey + log.VEventf(ctx, 3, "moving vector from splitting partition %d to sibling partition %d", + oldPartitionKey, siblingPartitionKey) + + // Found a sibling child partition that's closer, so insert the vector + // there instead. + childKey := split.Partition.ChildKeys()[i] + _, err = fp.index.addToPartition(ctx, txn, parentPartitionKey, siblingPartitionKey, vector, childKey) + if err != nil { + return errors.Wrapf(err, "moving vector to partition %d", siblingPartitionKey) + } + + // Remove the vector's data from the new partition. The remove operation + // backfills data at the current index with data from the last index. + // Therefore, don't increment the iteration index, since the next item + // is in the same location as the last. + split.ReplaceWithLast(i) + i-- + } + + return nil +} + +// linkNearbyVectors searches for vectors at the same level that are close to +// the given split partition's centroid. If they are closer than they are to +// their own centroid, then move them to the split partition. +func (fp *fixupProcessor) linkNearbyVectors( + ctx context.Context, + txn vecstore.Txn, + oldPartitionKey vecstore.PartitionKey, + partition *vecstore.Partition, +) error { + // TODO(andyk): Add way to filter search set in order to skip vectors deeper + // down in the search rather than afterwards. + searchCtx := fp.reuseSearchContext(ctx, txn) + searchCtx.Options = SearchOptions{ReturnVectors: true} + searchCtx.Level = partition.Level() + searchCtx.Randomized = partition.Centroid() + + // Don't link more vectors than the number of remaining slots in the split + // partition, to avoid triggering another split. + maxResults := fp.index.options.MaxPartitionSize - partition.Count() + if maxResults < 1 { + return nil + } + searchSet := vecstore.SearchSet{MaxResults: maxResults} + err := fp.index.searchHelper(searchCtx, &searchSet, true /* allowRetry */) + if err != nil { + return err + } + + tempVector := fp.workspace.AllocVector(fp.index.quantizer.GetRandomDims()) + defer fp.workspace.FreeVector(tempVector) + + // Filter the results. + results := searchSet.PopUnsortedResults() + for i := range results { + result := &results[i] + + // Skip vectors that are closer to their own centroid than they are to + // the split partition's centroid. + if result.QuerySquaredDistance >= result.CentroidDistance*result.CentroidDistance { + continue + } + + log.VEventf(ctx, 3, "linking vector from partition %d to splitting partition %d", + result.ChildKey.PartitionKey, oldPartitionKey) + + // Leaf vectors from the primary index need to be randomized. + vector := result.Vector + if partition.Level() == vecstore.LeafLevel { + fp.index.quantizer.RandomizeVector(ctx, vector, tempVector, false /* invert */) + vector = tempVector + } + + // Remove the vector from the other partition. + count, err := fp.index.removeFromPartition(ctx, txn, result.ParentPartitionKey, result.ChildKey) + if err != nil { + return err + } + if count == 0 && partition.Level() > vecstore.LeafLevel { + // Removing the vector will result in an empty non-leaf partition, which + // is not allowed, as the K-means tree would not be fully balanced. Add + // the vector back to the partition. This is a very rare case and that + // partition is likely to be merged away regardless. + _, err = fp.index.store.AddToPartition( + ctx, txn, result.ParentPartitionKey, vector, result.ChildKey) + if err != nil { + return err + } + continue + } + + // Add the vector to the split partition. + partition.Add(ctx, vector, result.ChildKey) + } + + return nil +} + // getFullVectorsForPartition fetches the full-size vectors (potentially // randomized by the quantizer) that are quantized by the given partition. func (fp *fixupProcessor) getFullVectorsForPartition( @@ -543,3 +724,17 @@ func (fp *fixupProcessor) getFullVectorsForPartition( return vectors, nil } + +// reuseSearchContext initializes the reusable search context, including reusing +// its temp slices. +func (fp *fixupProcessor) reuseSearchContext(ctx context.Context, txn vecstore.Txn) *searchContext { + fp.searchCtx = searchContext{ + Ctx: ctx, + Workspace: fp.workspace, + Txn: txn, + tempKeys: fp.searchCtx.tempKeys, + tempCounts: fp.searchCtx.tempCounts, + tempVectorsWithKeys: fp.searchCtx.tempVectorsWithKeys, + } + return &fp.searchCtx +} diff --git a/pkg/sql/vecindex/fixup_processor_test.go b/pkg/sql/vecindex/fixup_processor_test.go index 8bf115ef0483..d357ff795e3c 100644 --- a/pkg/sql/vecindex/fixup_processor_test.go +++ b/pkg/sql/vecindex/fixup_processor_test.go @@ -130,7 +130,7 @@ func TestSplitPartitionData(t *testing.T) { tempVectors := vector.MakeSet(2) tempVectors.AddSet(&vectors) leftSplit, rightSplit := index.fixups.splitPartitionData( - ctx, splitPartition, &tempVectors, tc.leftOffsets, tc.rightOffsets) + ctx, splitPartition, tempVectors, tc.leftOffsets, tc.rightOffsets) validate(&leftSplit, tc.expectedLeft) validate(&rightSplit, tc.expectedRight) diff --git a/pkg/sql/vecindex/quantize/quantizer.go b/pkg/sql/vecindex/quantize/quantizer.go index 5f30cbc5c781..fe8f435e7b15 100644 --- a/pkg/sql/vecindex/quantize/quantizer.go +++ b/pkg/sql/vecindex/quantize/quantizer.go @@ -34,12 +34,11 @@ type Quantizer interface { GetRandomDims() int // RandomizeVector optionally performs a random orthogonal transformation - // (ROT) on the input vector and writes it to the output vector. If - // invert=false, the input vector is "original" and the caller is - // responsible for allocating the "randomized" output vector, with length - // equal to GetRandomDims(). If invert=true, the input vector is - // "randomized" and the caller is responsible for allocating the "original" - // output vector. + // (ROT) on the input vector and writes it to the output vector. The caller + // is responsible for allocating the output vector with length equal to + // GetRandomDims(). If invert is true, then a previous ROT is reversed in + // order to recover the original vector. The caller is responsible for + // allocating the output vector with length equal to GetOriginalDims(). // // Randomizing vectors distributes skew more evenly across dimensions and // across vectors in a set. Distance and angle between any two vectors @@ -49,7 +48,7 @@ type Quantizer interface { // // NOTE: This step may be a no-op for some quantization algorithms, which // may simply copy the original slice to the randomized slice, unchanged. - RandomizeVector(ctx context.Context, original vector.T, randomized vector.T, invert bool) + RandomizeVector(ctx context.Context, input vector.T, output vector.T, invert bool) // Quantize quantizes a set of input vectors and returns their compressed // form as a quantized vector set. Input vectors should already have been diff --git a/pkg/sql/vecindex/quantize/rabitq.go b/pkg/sql/vecindex/quantize/rabitq.go index 06b5f92d8346..f50b67b26bcb 100644 --- a/pkg/sql/vecindex/quantize/rabitq.go +++ b/pkg/sql/vecindex/quantize/rabitq.go @@ -115,12 +115,12 @@ func (q *raBitQuantizer) GetRandomDims() int { // RandomizeVector implements the Quantizer interface. func (q *raBitQuantizer) RandomizeVector( - ctx context.Context, original vector.T, randomized vector.T, invert bool, + ctx context.Context, input vector.T, output vector.T, invert bool, ) { if !invert { - num32.MulMatrixByVector(&q.rot, original, randomized, num32.NoTranspose) + num32.MulMatrixByVector(&q.rot, input, output, num32.NoTranspose) } else { - num32.MulMatrixByVector(&q.rot, randomized, original, num32.Transpose) + num32.MulMatrixByVector(&q.rot, input, output, num32.Transpose) } } diff --git a/pkg/sql/vecindex/quantize/rabitq_test.go b/pkg/sql/vecindex/quantize/rabitq_test.go index b8d7da704071..98d767c867f3 100644 --- a/pkg/sql/vecindex/quantize/rabitq_test.go +++ b/pkg/sql/vecindex/quantize/rabitq_test.go @@ -183,7 +183,7 @@ func TestRaBitRandomizeVector(t *testing.T) { // Ensure that inverting RandomizeVector recovers original vector. randomizedInv := make([]float32, dims) - quantizer.RandomizeVector(ctx, randomizedInv, randomized.At(i), true /* invert */) + quantizer.RandomizeVector(ctx, randomized.At(i), randomizedInv, true /* invert */) for j, val := range original.At(i) { require.InDelta(t, val, randomizedInv[j], 0.00001) } diff --git a/pkg/sql/vecindex/quantize/unquantizer.go b/pkg/sql/vecindex/quantize/unquantizer.go index 745c538936cf..e1d21bcea551 100644 --- a/pkg/sql/vecindex/quantize/unquantizer.go +++ b/pkg/sql/vecindex/quantize/unquantizer.go @@ -41,17 +41,17 @@ func (q *unQuantizer) GetRandomDims() int { // RandomizeVector implements the Quantizer interface. func (q *unQuantizer) RandomizeVector( - ctx context.Context, original vector.T, randomized vector.T, invert bool, + ctx context.Context, input vector.T, output vector.T, invert bool, ) { - if len(original) != q.dims { + if len(input) != q.dims { panic(errors.AssertionFailedf( - "original dimensions %d do not match quantizer dims %d", len(original), q.dims)) + "input dimensions %d do not match quantizer dims %d", len(input), q.dims)) } - if len(randomized) != q.dims { + if len(output) != q.dims { panic(errors.AssertionFailedf( - "randomized dimensions %d do not match quantizer dims %d", len(original), q.dims)) + "output dimensions %d do not match quantizer dims %d", len(output), q.dims)) } - copy(randomized, original) + copy(output, input) } // Quantize implements the Quantizer interface. diff --git a/pkg/sql/vecindex/quantize/unquantizer_test.go b/pkg/sql/vecindex/quantize/unquantizer_test.go index f13cd3688f73..1abe2817d5b7 100644 --- a/pkg/sql/vecindex/quantize/unquantizer_test.go +++ b/pkg/sql/vecindex/quantize/unquantizer_test.go @@ -52,6 +52,13 @@ func TestUnQuantizerSimple(t *testing.T) { require.Equal(t, []float32{29, 5, 61, 25, 61}, roundFloats(distances, 2)) require.Equal(t, []float32{0, 0, 0, 0, 0}, roundFloats(errorBounds, 2)) + // Call RandomizeVector. + output := vector.T{3, 4} + quantizer.RandomizeVector(ctx, vector.T{1, 2}, output, false /* invert */) + require.Equal(t, vector.T{1, 2}, output) + quantizer.RandomizeVector(ctx, vector.T{5, 6}, output, true /* invert */) + require.Equal(t, vector.T{5, 6}, output) + // Remove quantized vectors. quantizedSet.ReplaceWithLast(1) quantizedSet.ReplaceWithLast(3) diff --git a/pkg/sql/vecindex/split_data.go b/pkg/sql/vecindex/split_data.go index a1ee6dba518e..949f36040dcc 100644 --- a/pkg/sql/vecindex/split_data.go +++ b/pkg/sql/vecindex/split_data.go @@ -32,14 +32,14 @@ type splitData struct { func (s *splitData) Init( ctx context.Context, quantizer quantize.Quantizer, - vectors *vector.Set, + vectors vector.Set, oldCentroidDistances []float32, childKeys []vecstore.ChildKey, level vecstore.Level, ) { - s.Vectors = *vectors + s.Vectors = vectors s.OldCentroidDistances = oldCentroidDistances - quantizedSet := quantizer.Quantize(ctx, vectors) + quantizedSet := quantizer.Quantize(ctx, &s.Vectors) s.Partition = vecstore.NewPartition(quantizer, quantizedSet, childKeys, level) } diff --git a/pkg/sql/vecindex/testdata/delete.ddt b/pkg/sql/vecindex/testdata/delete.ddt index 9fbe9521cdee..3b51d2a65ce2 100644 --- a/pkg/sql/vecindex/testdata/delete.ddt +++ b/pkg/sql/vecindex/testdata/delete.ddt @@ -1,5 +1,5 @@ # ---------- -# Construct new index with one vector in the root. +# Delete remaining vector in the root. # ---------- new-index min-partition-size=1 max-partition-size=3 beam-size=2 vec1: (1, 2) @@ -8,14 +8,13 @@ vec1: (1, 2) │ └───• vec1 (1, 2) -# Delete remaining vector in the root. delete vec1 ---- • 1 (0, 0) # ---------- -# Construct new index with only duplicate vectors. +# Delete vectors with duplicate values. # ---------- new-index min-partition-size=1 max-partition-size=3 beam-size=2 vec1: (1, 2) @@ -57,7 +56,7 @@ vec5 └───• vec6 (1, 2) # ---------- -# Construct new index with multiple levels. +# Delete vector from index with multiple levels. # ---------- new-index min-partition-size=1 max-partition-size=3 beam-size=1 vec1: (1, 2) @@ -135,7 +134,7 @@ vec6 └───• vec8 (-2, 8) # ---------- -# Construct new index with multiple levels. +# Delete vectors from primary index, but not from secondary index. # ---------- new-index min-partition-size=1 max-partition-size=3 beam-size=2 vec1: (1, 2) diff --git a/pkg/sql/vecindex/testdata/insert.ddt b/pkg/sql/vecindex/testdata/insert.ddt index a58c198709f7..6ba02b61faf1 100644 --- a/pkg/sql/vecindex/testdata/insert.ddt +++ b/pkg/sql/vecindex/testdata/insert.ddt @@ -1,5 +1,5 @@ # ---------- -# Construct empty index. +# Simple insert tests. # ---------- new-index min-partition-size=1 max-partition-size=4 beam-size=2 ---- @@ -47,21 +47,24 @@ vec7: (0, 0) vec8: (0, 4) vec9: (-2, 8) ---- -• 1 (3.6667, 3.4) +• 1 (7.25, 4.75) │ -├───• 2 (7, 4.8) +├───• 2 (11, 6) │ │ │ ├───• vec6 (14, 1) -│ ├───• vec2 (5, 6) -│ ├───• vec3 (4, 3) -│ ├───• vec4 (4, 3) │ └───• vec5 (8, 11) │ -└───• 3 (0.3333, 2) +├───• 4 (0.3333, 2) +│ │ +│ ├───• vec8 (0, 4) +│ ├───• vec7 (0, 0) +│ └───• vec1 (1, 2) +│ +└───• 5 (2.75, 5) │ - ├───• vec1 (1, 2) - ├───• vec7 (0, 0) - ├───• vec8 (0, 4) + ├───• vec2 (5, 6) + ├───• vec4 (4, 3) + ├───• vec3 (4, 3) └───• vec9 (-2, 8) # Overwrite vector with a new value that won't be found in the index, causing @@ -73,37 +76,37 @@ vec9: (-2, 8) insert vec2: (-5, -5) ---- -• 1 (3.6667, 3.4) +• 1 (7.25, 4.75) │ -├───• 2 (7, 4.8) +├───• 2 (11, 6) │ │ │ ├───• vec6 (14, 1) -│ ├───• vec2 (-5, -5) -│ ├───• vec3 (4, 3) -│ ├───• vec4 (4, 3) │ └───• vec5 (8, 11) │ -├───• 4 (-1, 6) +├───• 4 (0.3333, 2) │ │ -│ ├───• vec9 (-2, 8) -│ └───• vec8 (0, 4) +│ ├───• vec8 (0, 4) +│ ├───• vec7 (0, 0) +│ ├───• vec1 (1, 2) +│ └───• vec2 (-5, -5) │ -└───• 5 (-1.3333, -1) +└───• 5 (2.75, 5) │ - ├───• vec7 (0, 0) - ├───• vec1 (1, 2) - └───• vec2 (-5, -5) + ├───• vec2 (-5, -5) + ├───• vec4 (4, 3) + ├───• vec3 (4, 3) + └───• vec9 (-2, 8) search max-results=10 beam-size=8 (-5, -5) ---- -vec2: 0 (centroid=2.3324) -vec7: 50 (centroid=1.6667) -vec1: 85 (centroid=3.8006) -vec8: 106 (centroid=2.2361) -vec3: 145 (centroid=3.4986) -vec4: 145 (centroid=3.4986) -vec9: 178 (centroid=2.2361) -vec6: 397 (centroid=7.9649) -vec5: 425 (centroid=6.2801) +vec2: 0 (centroid=2.4622) +vec7: 50 (centroid=2.0276) +vec1: 85 (centroid=0.6667) +vec8: 106 (centroid=2.0276) +vec3: 145 (centroid=2.3585) +vec4: 145 (centroid=2.3585) +vec9: 178 (centroid=5.6181) +vec6: 397 (centroid=5.831) +vec5: 425 (centroid=5.831) 10 leaf vectors, 13 vectors, 9 full vectors, 4 partitions diff --git a/pkg/sql/vecindex/testdata/search-features.ddt b/pkg/sql/vecindex/testdata/search-features.ddt index 8c6f2c557d16..5e2b7202eb55 100644 --- a/pkg/sql/vecindex/testdata/search-features.ddt +++ b/pkg/sql/vecindex/testdata/search-features.ddt @@ -1,96 +1,96 @@ # Load 500 512-dimension features and search them. Use small partition size to # ensure a deeper tree. -new-index dims=512 min-partition-size=4 max-partition-size=16 quality-samples=4 beam-size=2 load-features=1000 hide-tree +new-index dims=512 min-partition-size=4 max-partition-size=16 quality-samples=8 beam-size=4 load-features=1000 hide-tree ---- Created index with 1000 vectors with 512 dimensions. -# Start with 1 result and default beam size of 2. +# Start with 1 result and default beam size of 4. search max-results=1 use-feature=5000 ---- -vec356: 0.5976 (centroid=0.5046) -18 leaf vectors, 34 vectors, 3 full vectors, 4 partitions +vec356: 0.5976 (centroid=0.5024) +43 leaf vectors, 74 vectors, 3 full vectors, 7 partitions # Search for additional results. search max-results=6 use-feature=5000 ---- -vec356: 0.5976 (centroid=0.5046) -vec95: 0.7008 (centroid=0.5551) -vec11: 0.777 (centroid=0.6306) -vec848: 0.7958 (centroid=0.5294) -vec246: 0.8141 (centroid=0.5237) -vec650: 0.8432 (centroid=0.6338) -18 leaf vectors, 34 vectors, 10 full vectors, 4 partitions +vec356: 0.5976 (centroid=0.5024) +vec302: 0.6601 (centroid=0.4991) +vec329: 0.6871 (centroid=0.5033) +vec386: 0.7301 (centroid=0.5117) +vec240: 0.7723 (centroid=0.4702) +vec347: 0.7745 (centroid=0.5095) +43 leaf vectors, 74 vectors, 16 full vectors, 7 partitions # Use a larger beam size. search max-results=6 use-feature=5000 beam-size=8 ---- -vec771: 0.5624 (centroid=0.631) -vec356: 0.5976 (centroid=0.5046) -vec640: 0.6525 (centroid=0.6245) -vec329: 0.6871 (centroid=0.5083) -vec95: 0.7008 (centroid=0.5551) -vec386: 0.7301 (centroid=0.5489) -70 leaf vectors, 115 vectors, 17 full vectors, 13 partitions +vec771: 0.5624 (centroid=0.6671) +vec356: 0.5976 (centroid=0.5024) +vec302: 0.6601 (centroid=0.4991) +vec329: 0.6871 (centroid=0.5033) +vec95: 0.7008 (centroid=0.5941) +vec386: 0.7301 (centroid=0.5117) +96 leaf vectors, 143 vectors, 23 full vectors, 13 partitions # Turn off re-ranking, which results in increased inaccuracy. search max-results=6 use-feature=5000 beam-size=8 skip-rerank ---- -vec771: 0.5937 ±0.0437 (centroid=0.631) -vec356: 0.6205 ±0.0328 (centroid=0.5046) -vec640: 0.6564 ±0.0433 (centroid=0.6245) -vec329: 0.6787 ±0.0311 (centroid=0.5083) -vec95: 0.7056 ±0.0388 (centroid=0.5551) -vec386: 0.7212 ±0.0336 (centroid=0.5489) -70 leaf vectors, 115 vectors, 0 full vectors, 13 partitions +vec771: 0.6053 ±0.0461 (centroid=0.6671) +vec356: 0.6163 ±0.0323 (centroid=0.5024) +vec302: 0.6365 ±0.0321 (centroid=0.4991) +vec329: 0.6609 ±0.0333 (centroid=0.5033) +vec11: 0.7085 ±0.0389 (centroid=0.5695) +vec95: 0.7165 ±0.0394 (centroid=0.5941) +96 leaf vectors, 143 vectors, 0 full vectors, 13 partitions # Return top 25 results with large beam size. -search max-results=25 use-feature=5000 beam-size=64 +search max-results=25 use-feature=5000 beam-size=32 ---- -vec771: 0.5624 (centroid=0.631) -vec356: 0.5976 (centroid=0.5046) -vec640: 0.6525 (centroid=0.6245) -vec302: 0.6601 (centroid=0.5159) -vec329: 0.6871 (centroid=0.5083) -vec95: 0.7008 (centroid=0.5551) -vec249: 0.7268 (centroid=0.4459) -vec386: 0.7301 (centroid=0.5489) -vec309: 0.7311 (centroid=0.5569) -vec633: 0.7513 (centroid=0.4747) -vec117: 0.7576 (centroid=0.5211) -vec556: 0.7595 (centroid=0.459) -vec25: 0.761 (centroid=0.4394) -vec776: 0.7633 (centroid=0.4892) -vec872: 0.7707 (centroid=0.5141) -vec859: 0.7708 (centroid=0.5757) -vec240: 0.7723 (centroid=0.5266) -vec347: 0.7745 (centroid=0.5297) -vec11: 0.777 (centroid=0.6306) -vec340: 0.7858 (centroid=0.5312) -vec239: 0.7878 (centroid=0.5127) -vec704: 0.7916 (centroid=0.5169) -vec423: 0.7956 (centroid=0.4941) -vec220: 0.7957 (centroid=0.4916) -vec848: 0.7958 (centroid=0.5294) -683 leaf vectors, 787 vectors, 100 full vectors, 74 partitions +vec771: 0.5624 (centroid=0.6671) +vec356: 0.5976 (centroid=0.5024) +vec640: 0.6525 (centroid=0.5124) +vec302: 0.6601 (centroid=0.4991) +vec329: 0.6871 (centroid=0.5033) +vec95: 0.7008 (centroid=0.5941) +vec386: 0.7301 (centroid=0.5117) +vec309: 0.7311 (centroid=0.601) +vec633: 0.7513 (centroid=0.4651) +vec117: 0.7576 (centroid=0.5399) +vec556: 0.7595 (centroid=0.5536) +vec25: 0.761 (centroid=0.4783) +vec872: 0.7707 (centroid=0.5177) +vec240: 0.7723 (centroid=0.4702) +vec347: 0.7745 (centroid=0.5095) +vec11: 0.777 (centroid=0.5695) +vec340: 0.7858 (centroid=0.4752) +vec704: 0.7916 (centroid=0.6659) +vec423: 0.7956 (centroid=0.4682) +vec848: 0.7958 (centroid=0.5798) +vec720: 0.8012 (centroid=0.4557) +vec387: 0.8038 (centroid=0.5598) +vec637: 0.8039 (centroid=0.5473) +vec410: 0.8062 (centroid=0.5447) +vec979: 0.8066 (centroid=0.621) +342 leaf vectors, 441 vectors, 84 full vectors, 42 partitions # Test recall at different beam sizes. recall topk=10 beam-size=4 samples=50 ---- -50.00% recall@10 -44.26 leaf vectors, 75.42 vectors, 20.38 full vectors, 7.00 partitions +55.60% recall@10 +47.44 leaf vectors, 76.34 vectors, 20.88 full vectors, 7.00 partitions recall topk=10 beam-size=8 samples=50 ---- -70.40% recall@10 -85.90 leaf vectors, 136.26 vectors, 24.44 full vectors, 13.00 partitions +75.60% recall@10 +93.90 leaf vectors, 142.62 vectors, 24.54 full vectors, 13.00 partitions recall topk=10 beam-size=16 samples=50 ---- -85.20% recall@10 -169.94 leaf vectors, 263.62 vectors, 27.90 full vectors, 25.00 partitions +91.20% recall@10 +186.54 leaf vectors, 275.74 vectors, 27.58 full vectors, 25.00 partitions recall topk=10 beam-size=32 samples=50 ---- -97.00% recall@10 -336.46 leaf vectors, 440.46 vectors, 31.52 full vectors, 42.00 partitions +98.60% recall@10 +371.72 leaf vectors, 470.72 vectors, 32.00 full vectors, 42.00 partitions diff --git a/pkg/sql/vecindex/testdata/search.ddt b/pkg/sql/vecindex/testdata/search.ddt index 11950b7d1e09..f7f7a3bdfdee 100644 --- a/pkg/sql/vecindex/testdata/search.ddt +++ b/pkg/sql/vecindex/testdata/search.ddt @@ -1,5 +1,5 @@ # ---------- -# Construct new index with only root-level vectors. +# Search tree with only root-level vectors. # ---------- new-index min-partition-size=1 max-partition-size=4 beam-size=2 vec1: (1, 2) @@ -28,7 +28,7 @@ vec1: 13 (centroid=2.2361) 3 leaf vectors, 3 vectors, 3 full vectors, 1 partitions # ---------- -# Construct new index with multiple levels. +# Search tree with multiple levels. # ---------- new-index min-partition-size=1 max-partition-size=4 beam-size=2 vec1: (1, 2) @@ -45,51 +45,51 @@ vec11: (1, 1) vec12: (5, 4) vec13: (6, 2) ---- -• 1 (1.6, 4.3) +• 1 (1.5, 1.875) │ -├───• 5 (1, -1) +├───• 2 (1, -2) │ │ -│ ├───• vec6 (1, -6) -│ ├───• vec1 (1, 2) -│ └───• vec11 (1, 1) +│ ├───• vec11 (1, 1) +│ └───• vec6 (1, -6) │ -├───• 4 (-1.5, 5) +├───• 5 (0.3333, 9) │ │ -│ ├───• vec4 (-4, 5) -│ ├───• vec10 (0, 3) │ ├───• vec8 (-2, 8) -│ └───• vec7 (0, 4) +│ ├───• vec9 (2, 8) +│ └───• vec5 (1, 11) │ -├───• 6 (1.5, 9.5) +├───• 6 (5.5, 3.25) │ │ -│ ├───• vec5 (1, 11) -│ └───• vec9 (2, 8) +│ ├───• vec3 (4, 3) +│ ├───• vec13 (6, 2) +│ ├───• vec12 (5, 4) +│ └───• vec2 (7, 4) │ -└───• 7 (5.3333, 3.6667) +└───• 7 (-1.3333, 4) │ - ├───• vec3 (4, 3) - ├───• vec2 (7, 4) - ├───• vec12 (5, 4) - └───• vec13 (6, 2) + ├───• vec7 (0, 4) + ├───• vec10 (0, 3) + ├───• vec4 (-4, 5) + └───• vec1 (1, 2) # Search for closest vectors with beam-size=1. search max-results=2 beam-size=1 (1, 6) ---- -vec7: 5 (centroid=1.8028) -vec10: 10 (centroid=2.5) +vec7: 5 (centroid=1.3333) +vec10: 10 (centroid=1.6667) 4 leaf vectors, 8 vectors, 4 full vectors, 2 partitions # Search for closest vectors with beam-size=2. search max-results=2 beam-size=2 (1, 6) ---- -vec7: 5 (centroid=1.8028) -vec9: 5 (centroid=1.5811) -6 leaf vectors, 10 vectors, 6 full vectors, 3 partitions +vec7: 5 (centroid=1.3333) +vec9: 5 (centroid=1.9437) +7 leaf vectors, 11 vectors, 7 full vectors, 3 partitions # ---------- -# Construct new index with only duplicate vectors. +# Search tree with only duplicate vectors. # ---------- new-index min-partition-size=1 max-partition-size=4 beam-size=2 vec1: (4, 9) @@ -123,8 +123,8 @@ vec3: 2 (centroid=0) 6 leaf vectors, 8 vectors, 6 full vectors, 3 partitions # ---------- -# Construct new index with duplicate keys. This can happen when a vector is -# updated in the primary index, but it cannot be found in the secondary index. +# Search tree with duplicate keys. This can happen when a vector is updated in +# the primary index, but it cannot be found in the secondary index. # ---------- new-index min-partition-size=1 max-partition-size=3 beam-size=2 vec1: (1, 2) diff --git a/pkg/sql/vecindex/testdata/split.ddt b/pkg/sql/vecindex/testdata/split.ddt index 3075c9b3b8af..38fda793be84 100644 --- a/pkg/sql/vecindex/testdata/split.ddt +++ b/pkg/sql/vecindex/testdata/split.ddt @@ -1,5 +1,6 @@ -# Simple partition split cases. - +# ---------- +# Test simple partition splits. +# ---------- new-index min-partition-size=1 max-partition-size=4 beam-size=2 ---- • 1 (0, 0) @@ -51,56 +52,222 @@ vec7: (5, 8) # Trigger another split that adds a level to the tree. insert vec8: (-2, -3) -vec9: (4, 2) +vec9: (4, 1) vec10: (3, 5) vec11: (3, 2) vec12: (4, 4) +vec13: (3, 4) +vec14: (3, 3) ---- -• 1 (4.0694, 3.4028) +• 1 (5.2917, 4.375) │ -├───• 10 (6.5556, 5.8889) +├───• 10 (2.5, 2.1667) │ │ -│ ├───• 5 (6.5, 9.5) +│ ├───• 9 (3.5, 4) │ │ │ -│ │ ├───• vec4 (8, 11) -│ │ └───• vec7 (5, 8) +│ │ ├───• vec12 (4, 4) +│ │ ├───• vec3 (4, 3) +│ │ ├───• vec10 (3, 5) +│ │ └───• vec13 (3, 4) │ │ -│ ├───• 4 (9.6667, 3.6667) +│ ├───• 8 (3, 2.5) │ │ │ -│ │ ├───• vec2 (7, 4) -│ │ ├───• vec6 (8, 6) -│ │ └───• vec5 (14, 1) +│ │ ├───• vec14 (3, 3) +│ │ ├───• vec11 (3, 2) +│ │ └───• vec9 (4, 1) │ │ -│ └───• 8 (3.5, 4.5) +│ └───• 7 (1, 0) │ │ -│ ├───• vec12 (4, 4) -│ └───• vec10 (3, 5) +│ ├───• vec1 (1, 2) +│ └───• vec8 (-2, -3) │ -└───• 11 (1.5833, 0.9167) +└───• 11 (8.0833, 6.5833) │ - ├───• 7 (-0.5, -0.5) + ├───• 4 (9.6667, 3.6667) │ │ - │ ├───• vec8 (-2, -3) - │ └───• vec1 (1, 2) + │ ├───• vec2 (7, 4) + │ ├───• vec6 (8, 6) + │ └───• vec5 (14, 1) │ - └───• 9 (3.6667, 2.3333) + └───• 5 (6.5, 9.5) │ - ├───• vec3 (4, 3) - ├───• vec9 (4, 2) - └───• vec11 (3, 2) + ├───• vec4 (8, 11) + └───• vec7 (5, 8) # Search for closest vectors with beam-size=1. -search max-results=2 beam-size=1 -(1, 3) +search max-results=3 beam-size=1 +(4, 7) +---- +vec7: 2 (centroid=2.1213) +vec4: 32 (centroid=2.1213) +2 leaf vectors, 6 vectors, 2 full vectors, 3 partitions + +# Search for closest vectors with beam-size=3. +search max-results=3 beam-size=3 +(4, 7) +---- +vec7: 2 (centroid=2.1213) +vec10: 5 (centroid=1.118) +vec12: 9 (centroid=0.5) +9 leaf vectors, 16 vectors, 5 full vectors, 6 partitions + +# ---------- +# Test linking nearby vectors from other partitions. +# ---------- +new-index min-partition-size=1 max-partition-size=4 beam-size=2 +vec1: (-2, -2) +vec2: (0, 0) +vec3: (2, 2) +vec4: (4, 4) +vec5: (5, 5) +vec6: (6, 6) +vec7: (5, 5) +---- +• 1 (2.5, 2.5) +│ +├───• 2 (5, 5) +│ │ +│ ├───• vec6 (6, 6) +│ ├───• vec5 (5, 5) +│ ├───• vec4 (4, 4) +│ └───• vec7 (5, 5) +│ +└───• 3 (0, 0) + │ + ├───• vec3 (2, 2) + ├───• vec2 (0, 0) + └───• vec1 (-2, -2) + +# Add vectors to partition 2 until it splits and then pulls in vec3 from +# partition 3. +insert +vec8: (4, 3) +vec9: (3, 4) +---- +• 1 (2.5, 2.5) +│ +├───• 3 (0, 0) +│ │ +│ ├───• vec1 (-2, -2) +│ └───• vec2 (0, 0) +│ +├───• 4 (3.6667, 3.6667) +│ │ +│ ├───• vec9 (3, 4) +│ ├───• vec8 (4, 3) +│ ├───• vec4 (4, 4) +│ └───• vec3 (2, 2) +│ +└───• 5 (5.3333, 5.3333) + │ + ├───• vec7 (5, 5) + ├───• vec5 (5, 5) + └───• vec6 (6, 6) + +# ---------- +# Test moving vectors to other partitions during split. +# ---------- +new-index min-partition-size=1 max-partition-size=4 beam-size=2 +vec1: (0, 0) +vec2: (-1, 1) +vec3: (1, 1) +vec4: (0, -2) +vec5: (-1, -2) +vec6: (1, -2) ---- -vec11: 5 (centroid=0.7454) -vec3: 9 (centroid=0.7454) -3 leaf vectors, 7 vectors, 3 full vectors, 3 partitions +• 1 (0, -0.6667) +│ +├───• 2 (0, -2) +│ │ +│ ├───• vec6 (1, -2) +│ ├───• vec5 (-1, -2) +│ └───• vec4 (0, -2) +│ +└───• 3 (0, 0.6667) + │ + ├───• vec3 (1, 1) + ├───• vec2 (-1, 1) + └───• vec1 (0, 0) -# Search for closest vectors with beam-size=2. -search max-results=2 beam-size=2 -(1, 3) +# Add vectors to partition 3 until it splits, leaving vec1 further away from +# the new centroids than to the centroid of partition 2. Expect it to move to +# partition 2. +insert +vec7: (-4, 4) +vec8: (4, 4) ---- -vec1: 1 (centroid=2.9155) -vec11: 5 (centroid=0.7454) -5 leaf vectors, 9 vectors, 4 full vectors, 4 partitions +• 1 (0, -0.6667) +│ +├───• 2 (0, -2) +│ │ +│ ├───• vec6 (1, -2) +│ ├───• vec5 (-1, -2) +│ ├───• vec4 (0, -2) +│ └───• vec1 (0, 0) +│ +├───• 4 (-2.5, 2.5) +│ │ +│ ├───• vec7 (-4, 4) +│ └───• vec2 (-1, 1) +│ +└───• 5 (1.6667, 1.6667) + │ + ├───• vec8 (4, 4) + └───• vec3 (1, 1) + +# ---------- +# Test edge cases that occur with tiny max partition sizes. +# ---------- +new-index min-partition-size=0 max-partition-size=1 beam-size=2 +vec1: (-5, -5) +vec2: (5, 5) +vec3: (5, -4) +---- +• 1 (0, -2.25) +│ +├───• 6 (5, 0.5) +│ │ +│ ├───• 5 (5, -4) +│ │ │ +│ │ └───• vec3 (5, -4) +│ │ +│ └───• 4 (5, 5) +│ │ +│ └───• vec2 (5, 5) +│ +└───• 7 (-5, -5) + │ + └───• 3 (-5, -5) + │ + └───• vec1 (-5, -5) + +insert +vec4: (4, 4) +---- +• 1 (-0.125, -2.375) +│ +├───• 12 (4.75, 0.25) +│ │ +│ ├───• 11 (5, -4) +│ │ │ +│ │ └───• 5 (5, -4) +│ │ │ +│ │ └───• vec3 (5, -4) +│ │ +│ └───• 10 (4.5, 4.5) +│ │ +│ ├───• 9 (5, 5) +│ │ │ +│ │ └───• vec2 (5, 5) +│ │ +│ └───• 8 (4, 4) +│ │ +│ └───• vec4 (4, 4) +│ +└───• 13 (-5, -5) + │ + └───• 7 (-5, -5) + │ + └───• 3 (-5, -5) + │ + └───• vec1 (-5, -5) diff --git a/pkg/sql/vecindex/vecstore/in_memory_store.go b/pkg/sql/vecindex/vecstore/in_memory_store.go index 38ee2a3ee5cd..fbb93ea6b9d9 100644 --- a/pkg/sql/vecindex/vecstore/in_memory_store.go +++ b/pkg/sql/vecindex/vecstore/in_memory_store.go @@ -41,6 +41,12 @@ type inMemoryTxn struct { lock lockType // updated is true if any in-memory state has been updated. updated bool + // unbalancedKey, if non-zero, records a non-leaf partition that had all of + // its vectors removed during the transaction. If, by the end of the + // transaction, the partition is still empty, the store will panic, since + // this violates the constraint that the K-means tree is always fully + // balanced. + unbalancedKey PartitionKey } // InMemoryStore implements the Store interface over in-memory partitions and @@ -84,6 +90,21 @@ func (s *InMemoryStore) BeginTransaction(ctx context.Context) (Txn, error) { // CommitTransaction implements the Store interface. func (s *InMemoryStore) CommitTransaction(ctx context.Context, txn Txn) error { inMemTxn := txn.(*inMemoryTxn) + + // Panic if the K-means tree contains an empty non-leaf partition after the + // transaction ends, as this violates the constraint that the K-means tree + // is always full balanced. + if inMemTxn.unbalancedKey != 0 { + s.mu.Lock() + defer s.mu.Unlock() + + partition, ok := s.mu.index[inMemTxn.unbalancedKey] + if ok && partition.Count() == 0 { + panic(errors.AssertionFailedf( + "K-means tree is unbalanced, with empty non-leaf partition %d", inMemTxn.unbalancedKey)) + } + } + switch inMemTxn.lock { case dataLock: s.txnLock.RUnlock() @@ -210,8 +231,16 @@ func (s *InMemoryStore) RemoveFromPartition( if !ok { return 0, ErrPartitionNotFound } + if partition.ReplaceWithLastByKey(childKey) { - return partition.Count(), nil + count := partition.Count() + if count == 0 && partition.Level() > LeafLevel { + // A non-leaf partition has zero vectors. If this is still true at the + // end of the transaction, the K-means tree will be unbalanced, which + // violates a key constraint. + txn.(*inMemoryTxn).unbalancedKey = partitionKey + } + return count, nil } return -1, nil } diff --git a/pkg/sql/vecindex/vector_index.go b/pkg/sql/vecindex/vector_index.go index 7f1dbb7c1eb9..e8fe0cb530a1 100644 --- a/pkg/sql/vecindex/vector_index.go +++ b/pkg/sql/vecindex/vector_index.go @@ -73,7 +73,8 @@ type SearchOptions struct { // reduce accuracy. It is currently only used for testing. SkipRerank bool // ReturnVectors specifies whether to return the original full-size vectors - // in search results. + // in search results. If this is a leaf-level search then the returned + // vectors have not been randomized. ReturnVectors bool } @@ -273,7 +274,7 @@ func (vi *VectorIndex) Delete( if err != nil { return err } - results := searchSet.PopResults() + results := searchSet.PopUnsortedResults() if len(results) == 0 { // Retry search with significantly higher beam size. if baseBeamSize == vi.options.BaseBeamSize { @@ -331,7 +332,7 @@ func (vi *VectorIndex) insertHelper( if err != nil { return err } - results := searchSet.PopResults() + results := searchSet.PopUnsortedResults() parentPartitionKey := results[0].ParentPartitionKey partitionKey := results[0].ChildKey.PartitionKey _, err = vi.addToPartition(parentSearchCtx.Ctx, parentSearchCtx.Txn, parentPartitionKey, @@ -428,7 +429,7 @@ func (vi *VectorIndex) searchHelper( for { results := subSearchSet.PopUnsortedResults() - if len(results) == 0 { + if len(results) == 0 && searchLevel > vecstore.LeafLevel { // This should never happen, as it means that interior partition(s) // have no children. The vector deletion logic should prevent that. panic(errors.AssertionFailedf( @@ -698,13 +699,17 @@ func (vi *VectorIndex) Format( var buf bytes.Buffer // Format each number to 4 decimal places, removing unnecessary trailing - // zeros. + // zeros. Don't print negative zero, since this causes diffs when running on + // Linux vs. Mac. formatFloat := func(value float32) string { s := strconv.FormatFloat(float64(value), 'f', 4, 32) if strings.Contains(s, ".") { s = strings.TrimRight(s, "0") s = strings.TrimRight(s, ".") } + if s == "-0" { + return "0" + } return s } @@ -754,7 +759,7 @@ func (vi *VectorIndex) Format( // the original vector. random := partition.Centroid() original := make(vector.T, len(random)) - vi.quantizer.RandomizeVector(ctx, original, random, true /* invert */) + vi.quantizer.RandomizeVector(ctx, random, original, true /* invert */) buf.WriteString(parentPrefix) buf.WriteString("• ") buf.WriteString(strconv.FormatInt(int64(partitionKey), 10)) diff --git a/pkg/sql/vecindex/vector_index_test.go b/pkg/sql/vecindex/vector_index_test.go index 21e6b32149ce..d9206cd3f0ea 100644 --- a/pkg/sql/vecindex/vector_index_test.go +++ b/pkg/sql/vecindex/vector_index_test.go @@ -10,9 +10,11 @@ import ( "cmp" "context" "fmt" + "math/rand" "sort" "strconv" "strings" + "sync" "testing" "github.com/cockroachdb/cockroach/pkg/sql/vecindex/internal" @@ -23,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/num32" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/vector" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" @@ -250,23 +253,52 @@ func (s *testState) Insert(d *datadriven.TestData) string { } } - // Insert vectors into the store. - for i := 0; i < vectors.Count; i++ { - // Insert within the scope of a transaction. - txn := beginTransaction(s.Ctx, s.T, s.InMemStore) - s.InMemStore.InsertVector(txn, childKeys[i].PrimaryKey, vectors.At(i)) - require.NoError(s.T, s.Index.Insert(s.Ctx, txn, vectors.At(i), childKeys[i].PrimaryKey)) - commitTransaction(s.Ctx, s.T, s.InMemStore, txn) + // Insert vectors into the store in randomly-sized blocks. + var rng *rand.Rand + if s.Index.cancel == nil { + // Block size needs to be deterministic. + rng = rand.New(rand.NewSource(42)) + } else { + rng = rand.New(rand.NewSource(timeutil.Now().UnixNano())) + } + var wait sync.WaitGroup + i := 0 + for i < vectors.Count { + step := rng.Intn(s.Options.MaxPartitionSize*2) + 1 + + // Insert block of vectors within the scope of a transaction. + insertBlock := func(start, end int) { + txn := beginTransaction(s.Ctx, s.T, s.InMemStore) + for j := start; j < end; j++ { + s.InMemStore.InsertVector(txn, childKeys[j].PrimaryKey, vectors.At(j)) + require.NoError(s.T, s.Index.Insert(s.Ctx, txn, vectors.At(j), childKeys[j].PrimaryKey)) + } + commitTransaction(s.Ctx, s.T, s.InMemStore, txn) + } - if (i+1)%s.Options.MaxPartitionSize == 0 { - // Periodically, run synchronous fixups so that test results are - // deterministic. - require.NoError(s.T, s.runAllFixups(true /* skipBackground */)) + // If background fixups are not enabled, do inserts in series, since the + // test needs to be deterministic. + end := min(i+step, vectors.Count) + if s.Index.cancel == nil { + insertBlock(i, end) + + // Run synchronous fixups so that test results are deterministic. + require.NoError(s.T, s.runAllFixups()) + } else { + // Run inserts in parallel. + wait.Add(1) + go func(i int) { + insertBlock(i, end) + wait.Done() + }(i) } + + i += step } + wait.Wait() // Handle any remaining fixups. - require.NoError(s.T, s.runAllFixups(false /* skipBackground */)) + require.NoError(s.T, s.runAllFixups()) if hideTree { return fmt.Sprintf("Created index with %d vectors with %d dimensions.\n", @@ -468,13 +500,11 @@ func (s *testState) ValidateTree(d *datadriven.TestData) string { } // runAllFixups forces all pending fixups to be processed. -func (s *testState) runAllFixups(skipBackground bool) error { +func (s *testState) runAllFixups() error { if s.Index.cancel != nil { // Background fixup goroutine is running, so wait until it has processed // all fixups. - if !skipBackground { - s.Index.fixups.Wait() - } + s.Index.fixups.Wait() return nil } // Synchronously run fixups.