From 1671ea0fda1970d1ba54091aef5ffbd0ba119079 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Wed, 17 Jan 2024 14:47:25 -0800 Subject: [PATCH] feat: optimize indices (#1841) Allow users to speicfy how many delta indices to be merged --- python/src/dataset.rs | 21 ++- rust/Cargo.toml | 3 +- rust/lance-index/src/lib.rs | 2 + rust/lance-index/src/optimize.rs | 39 +++++ rust/lance-index/src/traits.rs | 4 +- rust/lance/Cargo.toml | 3 +- rust/lance/src/dataset/scanner.rs | 2 +- rust/lance/src/index.rs | 133 +++++++++++++-- rust/lance/src/index/append.rs | 135 ++++++++++------ rust/lance/src/index/vector/ivf.rs | 180 +++++++++++++-------- rust/lance/src/index/vector/ivf/builder.rs | 2 +- rust/lance/src/index/vector/ivf/io.rs | 54 ++++--- 12 files changed, 409 insertions(+), 169 deletions(-) create mode 100644 rust/lance-index/src/optimize.rs diff --git a/python/src/dataset.rs b/python/src/dataset.rs index b9b8441990..a86a595de2 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -23,7 +23,7 @@ use arrow_schema::{DataType, Schema as ArrowSchema}; use async_trait::async_trait; use chrono::Duration; -use futures::StreamExt; +use futures::{StreamExt, TryFutureExt}; use lance::dataset::builder::DatasetBuilder; use lance::dataset::UpdateBuilder; use lance::dataset::{ @@ -37,6 +37,7 @@ use lance::index::{ }; use lance_arrow::as_fixed_size_list_array; use lance_core::{datatypes::Schema, format::Fragment, io::object_store::ObjectStoreParams}; +use lance_index::optimize::OptimizeOptions; use lance_index::{ vector::{ivf::IvfBuildParams, pq::PQBuildParams}, DatasetIndexExt, IndexParams, IndexType, @@ -639,10 +640,22 @@ impl Dataset { }) } - fn optimize_indices(&mut self, _kwargs: Option<&PyDict>) -> PyResult<()> { + #[pyo3(signature = (**kwargs))] + fn optimize_indices(&mut self, kwargs: Option<&PyDict>) -> PyResult<()> { let mut new_self = self.ds.as_ref().clone(); - RT.block_on(None, new_self.optimize_indices())? - .map_err(|err| PyIOError::new_err(err.to_string()))?; + let mut options: OptimizeOptions = Default::default(); + if let Some(kwargs) = kwargs { + if let Some(num_indices_to_merge) = kwargs.get_item("num_indices_to_merge")? { + options.num_indices_to_merge = num_indices_to_merge.extract()?; + } + } + RT.block_on( + None, + new_self + .optimize_indices(&options) + .map_err(|err| PyIOError::new_err(err.to_string())), + )??; + self.ds = Arc::new(new_self); Ok(()) } diff --git a/rust/Cargo.toml b/rust/Cargo.toml index e25386027f..5f9c40411d 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -81,12 +81,13 @@ datafusion-physical-expr = "34.0" either = "1.0" futures = "0.3" http = "0.2.9" +itertools = "0.12" lazy_static = "1" log = "0.4" mock_instant = { version = "0.3.1", features = ["sync"] } moka = "0.11" -num_cpus = "1.0" num-traits = "0.2" +num_cpus = "1.0" object_store = { version = "0.9.0", features = ["aws", "gcp", "azure"] } parquet = "49.0" pin-project = "1.0" diff --git a/rust/lance-index/src/lib.rs b/rust/lance-index/src/lib.rs index c9592fd86e..52ad01fcbc 100644 --- a/rust/lance-index/src/lib.rs +++ b/rust/lance-index/src/lib.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use lance_core::Result; use roaring::RoaringBitmap; +pub mod optimize; pub mod scalar; pub mod traits; pub mod vector; @@ -58,6 +59,7 @@ pub trait Index: Send + Sync { } /// Index Type +#[derive(Debug, PartialEq)] pub enum IndexType { // Preserve 0-100 for simple indices. Scalar = 0, diff --git a/rust/lance-index/src/optimize.rs b/rust/lance-index/src/optimize.rs new file mode 100644 index 0000000000..60c298b0f5 --- /dev/null +++ b/rust/lance-index/src/optimize.rs @@ -0,0 +1,39 @@ +// Copyright 2024 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Options for optimizing all indices. +#[derive(Debug)] +pub struct OptimizeOptions { + /// Number of delta indices to merge for one column. Default: 1. + /// + /// If `num_indices_to_merge` is 0, a new delta index will be created. + /// If `num_indices_to_merge` is 1, the delta updates will be merged into the latest index. + /// If `num_indices_to_merge` is more than 1, the delta updates and latest N indices + /// will be merged into one single index. + /// + /// It is up to the caller to decide how many indices to merge / keep. Callers can + /// find out how many indices are there by calling [`Dataset::index_statistics`]. + /// + /// A common usage pattern will be that, the caller can keep a large snapshot of the index of the base version, + /// and accumulate a few delta indices, then merge them into the snapshot. + pub num_indices_to_merge: usize, +} + +impl Default for OptimizeOptions { + fn default() -> Self { + Self { + num_indices_to_merge: 1, + } + } +} diff --git a/rust/lance-index/src/traits.rs b/rust/lance-index/src/traits.rs index 28d68e05bd..1511dfd27e 100644 --- a/rust/lance-index/src/traits.rs +++ b/rust/lance-index/src/traits.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use lance_core::{format::Index, Result}; -use crate::{IndexParams, IndexType}; +use crate::{optimize::OptimizeOptions, IndexParams, IndexType}; // Extends Lance Dataset with secondary index. /// @@ -85,7 +85,7 @@ pub trait DatasetIndexExt { async fn load_scalar_index_for_column(&self, col: &str) -> Result>; /// Optimize indices. - async fn optimize_indices(&mut self) -> Result<()>; + async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()>; /// Find index with a given index_name and return its serialized statistics. /// diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 3db4065eb4..ae7fa57a2c 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -42,6 +42,7 @@ clap = { version = "4.1.1", features = ["derive"], optional = true } dashmap = "5" # matches arrow-rs use half.workspace = true +itertools.workspace = true http.workspace = true object_store.workspace = true aws-config.workspace = true @@ -103,10 +104,8 @@ prost-build.workspace = true [dev-dependencies] lance-test-macros = { workspace = true } pretty_assertions = { workspace = true } - clap = { version = "4.1.1", features = ["derive"] } criterion = { workspace = true } - approx.workspace = true dirs = "5.0.0" all_asserts = "2.3.1" diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 080bccb397..c93e2150b0 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2757,7 +2757,7 @@ mod test { // UPDATE - dataset.optimize_indices().await.unwrap(); + dataset.optimize_indices(&Default::default()).await.unwrap(); let updated_version = dataset.version().version; // APPEND -> DELETE diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 49ecde3d5a..2055a2764a 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -21,11 +21,13 @@ use std::sync::Arc; use arrow_schema::DataType; use async_trait::async_trait; use futures::{stream, StreamExt, TryStreamExt}; +use itertools::Itertools; use lance_core::format::Fragment; use lance_core::io::{ read_message, read_message_from_buf, read_metadata_offset, reader::read_manifest_indexes, Reader, }; +use lance_index::optimize::OptimizeOptions; use lance_index::pb::index::Implementation; use lance_index::scalar::expression::IndexInformationProvider; use lance_index::scalar::lance_format::LanceIndexStore; @@ -46,7 +48,7 @@ pub mod vector; use crate::dataset::transaction::{Operation, Transaction}; use crate::format::Index as IndexMetadata; -use crate::index::append::append_index; +use crate::index::append::merge_indices; use crate::index::vector::remap_vector_index; use crate::io::commit::commit_transaction; use crate::{dataset::Dataset, Error, Result}; @@ -296,29 +298,43 @@ impl DatasetIndexExt for Dataset { } #[instrument(skip_all)] - async fn optimize_indices(&mut self) -> Result<()> { + async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> { let dataset = Arc::new(self.clone()); - // Append index let indices = self.load_indices().await?; + let name_to_indices = indices + .iter() + .map(|idx| (idx.name.clone(), idx)) + .into_group_map(); + let mut new_indices = vec![]; let mut removed_indices = vec![]; - for idx in indices.as_slice() { - if idx.dataset_version == self.manifest.version { - continue; - } - let Some((new_id, new_frag_ids)) = append_index(dataset.clone(), idx).await? else { + for deltas in name_to_indices.values() { + let Some((new_id, removed, mut new_frag_ids)) = + merge_indices(dataset.clone(), deltas.as_slice(), options).await? + else { continue; }; + for removed_idx in removed.iter() { + new_frag_ids |= removed_idx.fragment_bitmap.as_ref().unwrap(); + } + let last_idx = deltas.last().expect("Delte indices should not be empty"); let new_idx = IndexMetadata { uuid: new_id, - name: idx.name.clone(), - fields: idx.fields.clone(), + name: last_idx.name.clone(), // Keep the same name + fields: last_idx.fields.clone(), dataset_version: self.manifest.version, - fragment_bitmap: new_frag_ids, + fragment_bitmap: Some(new_frag_ids), }; - removed_indices.push(idx.clone()); + removed_indices.extend(removed.iter().map(|&idx| idx.clone())); + if deltas.len() > removed.len() { + new_indices.extend( + deltas[0..(deltas.len() - removed.len())] + .iter() + .map(|&idx| idx.clone()), + ); + } new_indices.push(new_idx); } @@ -533,6 +549,8 @@ impl DatasetIndexInternalExt for Dataset { #[cfg(test)] mod tests { + use crate::dataset::builder::DatasetBuilder; + use super::*; use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator}; @@ -666,4 +684,95 @@ mod tests { assert_eq!(stats["num_unindexed_rows"], 512); assert_eq!(stats["num_indexed_rows"], 512); } + + #[tokio::test] + async fn test_optimize_delta_indices() { + let test_dir = tempdir().unwrap(); + let dimensions = 16; + let column_name = "vec"; + let field = Field::new( + column_name, + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, true)), + dimensions, + ), + false, + ); + let schema = Arc::new(Schema::new(vec![field])); + + let float_arr = generate_random_array(512 * dimensions as usize); + + let vectors = + arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap(); + + let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap(); + + let reader = RecordBatchIterator::new( + vec![record_batch.clone()].into_iter().map(Ok), + schema.clone(), + ); + + let test_uri = test_dir.path().to_str().unwrap(); + let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + let params = VectorIndexParams::ivf_pq(10, 8, 2, false, MetricType::L2, 10); + dataset + .create_index( + &[column_name], + IndexType::Vector, + Some("vec_idx".into()), + ¶ms, + true, + ) + .await + .unwrap(); + + let stats: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap(); + assert_eq!(stats["num_unindexed_rows"], 0); + assert_eq!(stats["num_indexed_rows"], 512); + assert_eq!(stats["num_indexed_fragments"], 1); + assert_eq!(stats["num_indices"], 1); + + let reader = + RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone()); + dataset.append(reader, None).await.unwrap(); + let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + let stats: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap(); + assert_eq!(stats["num_unindexed_rows"], 512); + assert_eq!(stats["num_indexed_rows"], 512); + assert_eq!(stats["num_indexed_fragments"], 1); + assert_eq!(stats["num_unindexed_fragments"], 1); + assert_eq!(stats["num_indices"], 1); + + dataset + .optimize_indices(&OptimizeOptions { + num_indices_to_merge: 0, // Just create index for delta + }) + .await + .unwrap(); + let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + + let stats: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap(); + assert_eq!(stats["num_unindexed_rows"], 0); + assert_eq!(stats["num_indexed_rows"], 1024); + assert_eq!(stats["num_indexed_fragments"], 2); + assert_eq!(stats["num_unindexed_fragments"], 0); + assert_eq!(stats["num_indices"], 2); + + dataset + .optimize_indices(&OptimizeOptions { + num_indices_to_merge: 2, + }) + .await + .unwrap(); + let stats: serde_json::Value = + serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap(); + assert_eq!(stats["num_unindexed_rows"], 0); + assert_eq!(stats["num_indexed_rows"], 1024); + assert_eq!(stats["num_indexed_fragments"], 2); + assert_eq!(stats["num_unindexed_fragments"], 0); + assert_eq!(stats["num_indices"], 1); + } } diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index bd415bebe8..07740eec19 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -15,56 +15,80 @@ use std::sync::Arc; use lance_core::{format::Index as IndexMetadata, Error, Result}; -use lance_index::scalar::lance_format::LanceIndexStore; -use lance_index::IndexType; -use log::info; +use lance_index::{optimize::OptimizeOptions, scalar::lance_format::LanceIndexStore, IndexType}; use roaring::RoaringBitmap; use snafu::{location, Location}; use uuid::Uuid; +use super::vector::ivf::optimize_vector_indices; +use super::DatasetIndexInternalExt; use crate::dataset::scanner::ColumnOrdering; use crate::dataset::Dataset; -use crate::index::vector::ivf::IVFIndex; - -use super::DatasetIndexInternalExt; -/// Append new data to the index, without re-train. +/// Merge in-inflight unindexed data, with a specific number of previous indices +/// into a new index, to improve the query performance. +/// +/// The merge behavior is controlled by [`OptimizeOptions::num_indices_to_merge]. /// -/// Returns the UUID of the new index along with a vector of newly indexed fragment ids -pub async fn append_index( +/// Returns +/// ------- +/// - the UUID of the new index +/// - merged indices, +/// - Bitmap of the fragments that covered in the newly created index. +pub(crate) async fn merge_indices<'a>( dataset: Arc, - old_index: &IndexMetadata, -) -> Result)>> { - let unindexed = dataset.unindexed_fragments(&old_index.name).await?; - if unindexed.is_empty() { - return Ok(None); + old_indices: &[&'a IndexMetadata], + options: &OptimizeOptions, +) -> Result, RoaringBitmap)>> { + if old_indices.is_empty() { + return Err(Error::Index { + message: "Append index: no prevoius index found".to_string(), + location: location!(), + }); }; - let frag_bitmap = old_index.fragment_bitmap.as_ref().map(|bitmap| { - let mut bitmap = bitmap.clone(); - bitmap.extend(unindexed.iter().map(|frag| frag.id as u32)); - bitmap - }); - let column = dataset .schema() - .field_by_id(old_index.fields[0]) + .field_by_id(old_indices[0].fields[0]) .ok_or(Error::Index { message: format!( "Append index: column {} does not exist", - old_index.fields[0] + old_indices[0].fields[0] ), location: location!(), })?; - let index = dataset - .open_generic_index(&column.name, &old_index.uuid.to_string()) - .await?; + let mut indices = Vec::with_capacity(old_indices.len()); + for idx in old_indices { + let index = dataset + .open_generic_index(&column.name, &idx.uuid.to_string()) + .await?; + indices.push(index); + } + + if indices + .windows(2) + .any(|w| w[0].index_type() != w[1].index_type()) + { + return Err(Error::Index { + message: format!("Append index: invalid index deltas: {:?}", old_indices), + location: location!(), + }); + } + let unindexed = dataset.unindexed_fragments(&old_indices[0].name).await?; + + let mut frag_bitmap = RoaringBitmap::new(); + old_indices.iter().for_each(|idx| { + frag_bitmap.extend(idx.fragment_bitmap.as_ref().unwrap().iter()); + }); + unindexed.iter().for_each(|frag| { + frag_bitmap.push(frag.id as u32); + }); - match index.index_type() { + let (new_uuid, indices_merged) = match indices[0].index_type() { IndexType::Scalar => { let index = dataset - .open_scalar_index(&column.name, &old_index.uuid.to_string()) + .open_scalar_index(&column.name, &old_indices[0].uuid.to_string()) .await?; let mut scanner = dataset.scan(); @@ -84,31 +108,38 @@ pub async fn append_index( index.update(new_data_stream.into(), &new_store).await?; - Ok(Some((new_uuid, frag_bitmap))) + Ok((new_uuid, 1)) } IndexType::Vector => { - let mut scanner = dataset.scan(); - scanner.with_fragments(unindexed); - scanner.with_row_id(); - scanner.project(&[&column.name])?; - let new_data_stream = scanner.try_into_stream().await?; - - let index = dataset - .open_vector_index(&column.name, old_index.uuid.to_string().as_str()) - .await?; - - let Some(ivf_idx) = index.as_any().downcast_ref::() else { - info!("Index type: {:?} does not support append", index); - return Ok(None); + let new_data_stream = if unindexed.is_empty() { + None + } else { + let mut scanner = dataset.scan(); + scanner + .with_fragments(unindexed) + .with_row_id() + .project(&[&column.name])?; + Some(scanner.try_into_stream().await?) }; - let new_index = ivf_idx - .append(dataset.as_ref(), new_data_stream, old_index, &column.name) - .await?; - - Ok(Some((new_index, frag_bitmap))) + optimize_vector_indices( + &dataset.object_store, + &dataset.indices_dir(), + dataset.version().version, + new_data_stream, + &column.name, + &indices, + options, + ) + .await } - } + }?; + + Ok(Some(( + new_uuid, + old_indices[old_indices.len() - indices_merged..].to_vec(), + frag_bitmap, + ))) } #[cfg(test)] @@ -128,6 +159,8 @@ mod tests { use lance_testing::datagen::generate_random_array; use tempfile::tempdir; + use crate::dataset::builder::DatasetBuilder; + use crate::index::vector::ivf::IVFIndex; use crate::index::vector::{pq::PQIndex, VectorIndexParams}; use crate::index::DatasetIndexExt; @@ -193,8 +226,10 @@ mod tests { .unwrap(); assert_eq!(results[0].num_rows(), 10); // Flat search. - dataset.optimize_indices().await.unwrap(); + dataset.optimize_indices(&Default::default()).await.unwrap(); + let dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); let index = &dataset.load_indices().await.unwrap()[0]; + assert!(dataset .unindexed_fragments(&index.name) .await @@ -247,9 +282,5 @@ mod tests { .iter() .sum::(); assert_eq!(row_in_index, 2000); - assert_eq!( - dataset.index_cache_entry_count(), - 6 + dataset.versions().await.unwrap().len() - ); } } diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index bc8492214e..3d321f4eeb 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -39,9 +39,10 @@ use lance_core::io::{ local::to_local_path, ObjectWriter, Reader, RecordBatchStream, WriteExt, Writer, }; use lance_core::{ - datatypes::Field, encodings::plain::PlainEncoder, format::Index as IndexMetadata, Error, Result, + datatypes::Field, encodings::plain::PlainEncoder, io::object_store::ObjectStore, Error, Result, }; use lance_index::{ + optimize::OptimizeOptions, vector::{ ivf::{builder::load_precomputed_partitions, shuffler::shuffle_dataset, IvfBuildParams}, pq::{PQBuildParams, ProductQuantizer, ProductQuantizerImpl}, @@ -177,73 +178,6 @@ impl IVFIndex { let batch = part_index.search(&query, pre_filter).await?; Ok(batch) } - - pub(crate) async fn append( - &self, - dataset: &Dataset, - data: impl RecordBatchStream + Unpin + 'static, - metadata: &IndexMetadata, - column: &str, - ) -> Result { - let new_uuid = Uuid::new_v4(); - let object_store = dataset.object_store(); - let index_file = dataset - .indices_dir() - .child(new_uuid.to_string()) - .child(INDEX_FILE_NAME); - let mut writer = object_store.create(&index_file).await?; - - let pq_index = self - .sub_index - .as_any() - .downcast_ref::() - .ok_or(Error::Index { - message: "Only support append to IVF_PQ".to_string(), - location: location!(), - })?; - - // TODO: merge two IVF implementations. - let ivf = lance_index::vector::ivf::new_ivf_with_pq( - self.ivf.centroids.values(), - self.ivf.dimension(), - self.metric_type, - column, - pq_index.pq.clone(), - None, - )?; - - let shuffled = shuffle_dataset( - data, - column, - ivf, - None, - self.ivf.num_partitions() as u32, - pq_index.pq.num_sub_vectors(), - 10000, - 2, - None, - ) - .await?; - let mut ivf_mut = Ivf::new(self.ivf.centroids.clone()); - write_index_partitions(&mut writer, &mut ivf_mut, shuffled, Some(&[self])).await?; - let metadata = IvfPQIndexMetadata { - name: metadata.name.clone(), - column: column.to_string(), - dimension: self.ivf.dimension() as u32, - dataset_version: dataset.version().version, - metric_type: self.metric_type, - ivf: ivf_mut, - pq: pq_index.pq.clone(), - transforms: vec![], - }; - - let metadata = pb::Index::try_from(&metadata)?; - let pos = writer.write_protobuf(&metadata).await?; - writer.write_magics(pos).await?; - writer.shutdown().await?; - - Ok(new_uuid) - } } impl std::fmt::Debug for IVFIndex { @@ -252,6 +186,116 @@ impl std::fmt::Debug for IVFIndex { } } +// TODO: move to `lance-index` crate. +/// +/// Returns (new_uuid, num_indices_merged) +pub(crate) async fn optimize_vector_indices( + object_store: &ObjectStore, + index_dir: &Path, + dataset_version: u64, + unindexed: Option, + vector_column: &str, + existing_indices: &[Arc], + options: &OptimizeOptions, +) -> Result<(Uuid, usize)> { + // Senity check the indices + if existing_indices.is_empty() { + return Err(Error::Index { + message: "optimizing vector index: no existing index found".to_string(), + location: location!(), + }); + } + + let new_uuid = Uuid::new_v4(); + let index_file = index_dir.child(new_uuid.to_string()).child(INDEX_FILE_NAME); + let mut writer = object_store.create(&index_file).await?; + + let first_idx = existing_indices[0] + .as_any() + .downcast_ref::() + .ok_or(Error::Index { + message: "optimizing vector index: first index is not IVF".to_string(), + location: location!(), + })?; + + let pq_index = first_idx + .sub_index + .as_any() + .downcast_ref::() + .ok_or(Error::Index { + message: "optimizing vector index: it is not a IVF_PQ index".to_string(), + location: location!(), + })?; + let metric_type = first_idx.metric_type; + let dim = first_idx.ivf.dimension(); + + // TODO: merge `lance::vector::ivf::IVF` and `lance-index::vector::ivf::Ivf`` implementations. + let ivf = lance_index::vector::ivf::new_ivf_with_pq( + first_idx.ivf.centroids.values(), + first_idx.ivf.dimension(), + metric_type, + vector_column, + pq_index.pq.clone(), + None, + )?; + + // Shuffled un-indexed data with partition. + let shuffled = if let Some(stream) = unindexed { + Some( + shuffle_dataset( + stream, + vector_column, + ivf, + None, + first_idx.ivf.num_partitions() as u32, + pq_index.pq.num_sub_vectors(), + 10000, + 2, + None, + ) + .await?, + ) + } else { + None + }; + + let mut ivf_mut = Ivf::new(first_idx.ivf.centroids.clone()); + + let start_pos = if options.num_indices_to_merge > existing_indices.len() { + 0 + } else { + existing_indices.len() - options.num_indices_to_merge + }; + + let indices_to_merge = existing_indices[start_pos..] + .iter() + .map(|idx| { + idx.as_any().downcast_ref::().ok_or(Error::Index { + message: "optimizing vector index: it is not a IVF index".to_string(), + location: location!(), + }) + }) + .collect::>>()?; + write_index_partitions(&mut writer, &mut ivf_mut, shuffled, Some(&indices_to_merge)).await?; + let metadata = IvfPQIndexMetadata { + name: format!("_{}_idx", vector_column), + column: vector_column.to_string(), + dimension: dim as u32, + dataset_version, + metric_type, + ivf: ivf_mut, + pq: pq_index.pq.clone(), + transforms: vec![], + }; + + let metadata = pb::Index::try_from(&metadata)?; + let pos = writer.write_protobuf(&metadata).await?; + writer.write_magics(pos).await?; + writer.shutdown().await?; + + Ok((new_uuid, existing_indices.len() - start_pos)) +} + #[derive(Serialize)] pub struct IvfIndexPartitionStatistics { size: u32, diff --git a/rust/lance/src/index/vector/ivf/builder.rs b/rust/lance/src/index/vector/ivf/builder.rs index 672762dec0..a34d90195e 100644 --- a/rust/lance/src/index/vector/ivf/builder.rs +++ b/rust/lance/src/index/vector/ivf/builder.rs @@ -81,7 +81,7 @@ pub(super) async fn build_partitions( ) .await?; - write_index_partitions(writer, ivf, stream, None).await?; + write_index_partitions(writer, ivf, Some(stream), None).await?; Ok(()) } diff --git a/rust/lance/src/index/vector/ivf/io.rs b/rust/lance/src/index/vector/ivf/io.rs index 75db63ace0..9e870418f6 100644 --- a/rust/lance/src/index/vector/ivf/io.rs +++ b/rust/lance/src/index/vector/ivf/io.rs @@ -47,38 +47,40 @@ use crate::Result; pub(super) async fn write_index_partitions( writer: &mut dyn Writer, ivf: &mut Ivf, - streams: Vec>>, - existing_partitions: Option<&[&IVFIndex]>, + streams: Option>>>, + existing_indices: Option<&[&IVFIndex]>, ) -> Result<()> { // build the initial heap // TODO: extract heap sort to a separate function. let mut streams_heap = BinaryHeap::new(); let mut new_streams = vec![]; - for stream in streams { - let mut stream = Box::pin(stream.peekable()); + if let Some(streams) = streams { + for stream in streams { + let mut stream = Box::pin(stream.peekable()); - match stream.as_mut().peek().await { - Some(Ok(batch)) => { - let part_ids: &UInt32Array = batch - .column_by_name(PART_ID_COLUMN) - .expect("part id column not found") - .as_primitive(); - let part_id = part_ids.values()[0]; - streams_heap.push((Reverse(part_id), new_streams.len())); - new_streams.push(stream); - } - Some(Err(e)) => { - return Err(Error::IO { - message: format!("failed to read batch: {}", e), - location: location!(), - }); - } - None => { - return Err(Error::IO { - message: "failed to read batch: end of stream".to_string(), - location: location!(), - }); + match stream.as_mut().peek().await { + Some(Ok(batch)) => { + let part_ids: &UInt32Array = batch + .column_by_name(PART_ID_COLUMN) + .expect("part id column not found") + .as_primitive(); + let part_id = part_ids.values()[0]; + streams_heap.push((Reverse(part_id), new_streams.len())); + new_streams.push(stream); + } + Some(Err(e)) => { + return Err(Error::IO { + message: format!("failed to read batch: {}", e), + location: location!(), + }); + } + None => { + return Err(Error::IO { + message: "failed to read batch: end of stream".to_string(), + location: location!(), + }); + } } } } @@ -88,7 +90,7 @@ pub(super) async fn write_index_partitions( let mut pq_array: Vec> = vec![]; let mut row_id_array: Vec> = vec![]; - if let Some(&previous_indices) = existing_partitions.as_ref() { + if let Some(&previous_indices) = existing_indices.as_ref() { for &idx in previous_indices.iter() { let sub_index = idx.load_partition(part_id as usize, true).await?; let pq_index =