Skip to content

Commit

Permalink
feat: optimize indices (#1841)
Browse files Browse the repository at this point in the history
Allow users to speicfy how many delta indices to be merged
  • Loading branch information
eddyxu authored Jan 17, 2024
1 parent f773723 commit 1671ea0
Show file tree
Hide file tree
Showing 12 changed files with 409 additions and 169 deletions.
21 changes: 17 additions & 4 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use arrow_schema::{DataType, Schema as ArrowSchema};
use async_trait::async_trait;
use chrono::Duration;

use futures::StreamExt;
use futures::{StreamExt, TryFutureExt};
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::UpdateBuilder;
use lance::dataset::{
Expand All @@ -37,6 +37,7 @@ use lance::index::{
};
use lance_arrow::as_fixed_size_list_array;
use lance_core::{datatypes::Schema, format::Fragment, io::object_store::ObjectStoreParams};
use lance_index::optimize::OptimizeOptions;
use lance_index::{
vector::{ivf::IvfBuildParams, pq::PQBuildParams},
DatasetIndexExt, IndexParams, IndexType,
Expand Down Expand Up @@ -639,10 +640,22 @@ impl Dataset {
})
}

fn optimize_indices(&mut self, _kwargs: Option<&PyDict>) -> PyResult<()> {
#[pyo3(signature = (**kwargs))]
fn optimize_indices(&mut self, kwargs: Option<&PyDict>) -> PyResult<()> {
let mut new_self = self.ds.as_ref().clone();
RT.block_on(None, new_self.optimize_indices())?
.map_err(|err| PyIOError::new_err(err.to_string()))?;
let mut options: OptimizeOptions = Default::default();
if let Some(kwargs) = kwargs {
if let Some(num_indices_to_merge) = kwargs.get_item("num_indices_to_merge")? {
options.num_indices_to_merge = num_indices_to_merge.extract()?;
}
}
RT.block_on(
None,
new_self
.optimize_indices(&options)
.map_err(|err| PyIOError::new_err(err.to_string())),
)??;

self.ds = Arc::new(new_self);
Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ datafusion-physical-expr = "34.0"
either = "1.0"
futures = "0.3"
http = "0.2.9"
itertools = "0.12"
lazy_static = "1"
log = "0.4"
mock_instant = { version = "0.3.1", features = ["sync"] }
moka = "0.11"
num_cpus = "1.0"
num-traits = "0.2"
num_cpus = "1.0"
object_store = { version = "0.9.0", features = ["aws", "gcp", "azure"] }
parquet = "49.0"
pin-project = "1.0"
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use async_trait::async_trait;
use lance_core::Result;
use roaring::RoaringBitmap;

pub mod optimize;
pub mod scalar;
pub mod traits;
pub mod vector;
Expand Down Expand Up @@ -58,6 +59,7 @@ pub trait Index: Send + Sync {
}

/// Index Type
#[derive(Debug, PartialEq)]
pub enum IndexType {
// Preserve 0-100 for simple indices.
Scalar = 0,
Expand Down
39 changes: 39 additions & 0 deletions rust/lance-index/src/optimize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// Options for optimizing all indices.
#[derive(Debug)]
pub struct OptimizeOptions {
/// Number of delta indices to merge for one column. Default: 1.
///
/// If `num_indices_to_merge` is 0, a new delta index will be created.
/// If `num_indices_to_merge` is 1, the delta updates will be merged into the latest index.
/// If `num_indices_to_merge` is more than 1, the delta updates and latest N indices
/// will be merged into one single index.
///
/// It is up to the caller to decide how many indices to merge / keep. Callers can
/// find out how many indices are there by calling [`Dataset::index_statistics`].
///
/// A common usage pattern will be that, the caller can keep a large snapshot of the index of the base version,
/// and accumulate a few delta indices, then merge them into the snapshot.
pub num_indices_to_merge: usize,
}

impl Default for OptimizeOptions {
fn default() -> Self {
Self {
num_indices_to_merge: 1,
}
}
}
4 changes: 2 additions & 2 deletions rust/lance-index/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use lance_core::{format::Index, Result};

use crate::{IndexParams, IndexType};
use crate::{optimize::OptimizeOptions, IndexParams, IndexType};

// Extends Lance Dataset with secondary index.
///
Expand Down Expand Up @@ -85,7 +85,7 @@ pub trait DatasetIndexExt {
async fn load_scalar_index_for_column(&self, col: &str) -> Result<Option<Index>>;

/// Optimize indices.
async fn optimize_indices(&mut self) -> Result<()>;
async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()>;

/// Find index with a given index_name and return its serialized statistics.
///
Expand Down
3 changes: 1 addition & 2 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ clap = { version = "4.1.1", features = ["derive"], optional = true }
dashmap = "5"
# matches arrow-rs use
half.workspace = true
itertools.workspace = true
http.workspace = true
object_store.workspace = true
aws-config.workspace = true
Expand Down Expand Up @@ -103,10 +104,8 @@ prost-build.workspace = true
[dev-dependencies]
lance-test-macros = { workspace = true }
pretty_assertions = { workspace = true }

clap = { version = "4.1.1", features = ["derive"] }
criterion = { workspace = true }

approx.workspace = true
dirs = "5.0.0"
all_asserts = "2.3.1"
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2757,7 +2757,7 @@ mod test {

// UPDATE

dataset.optimize_indices().await.unwrap();
dataset.optimize_indices(&Default::default()).await.unwrap();
let updated_version = dataset.version().version;

// APPEND -> DELETE
Expand Down
133 changes: 121 additions & 12 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ use std::sync::Arc;
use arrow_schema::DataType;
use async_trait::async_trait;
use futures::{stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use lance_core::format::Fragment;
use lance_core::io::{
read_message, read_message_from_buf, read_metadata_offset, reader::read_manifest_indexes,
Reader,
};
use lance_index::optimize::OptimizeOptions;
use lance_index::pb::index::Implementation;
use lance_index::scalar::expression::IndexInformationProvider;
use lance_index::scalar::lance_format::LanceIndexStore;
Expand All @@ -46,7 +48,7 @@ pub mod vector;

use crate::dataset::transaction::{Operation, Transaction};
use crate::format::Index as IndexMetadata;
use crate::index::append::append_index;
use crate::index::append::merge_indices;
use crate::index::vector::remap_vector_index;
use crate::io::commit::commit_transaction;
use crate::{dataset::Dataset, Error, Result};
Expand Down Expand Up @@ -296,29 +298,43 @@ impl DatasetIndexExt for Dataset {
}

#[instrument(skip_all)]
async fn optimize_indices(&mut self) -> Result<()> {
async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()> {
let dataset = Arc::new(self.clone());
// Append index
let indices = self.load_indices().await?;

let name_to_indices = indices
.iter()
.map(|idx| (idx.name.clone(), idx))
.into_group_map();

let mut new_indices = vec![];
let mut removed_indices = vec![];
for idx in indices.as_slice() {
if idx.dataset_version == self.manifest.version {
continue;
}
let Some((new_id, new_frag_ids)) = append_index(dataset.clone(), idx).await? else {
for deltas in name_to_indices.values() {
let Some((new_id, removed, mut new_frag_ids)) =
merge_indices(dataset.clone(), deltas.as_slice(), options).await?
else {
continue;
};
for removed_idx in removed.iter() {
new_frag_ids |= removed_idx.fragment_bitmap.as_ref().unwrap();
}

let last_idx = deltas.last().expect("Delte indices should not be empty");
let new_idx = IndexMetadata {
uuid: new_id,
name: idx.name.clone(),
fields: idx.fields.clone(),
name: last_idx.name.clone(), // Keep the same name
fields: last_idx.fields.clone(),
dataset_version: self.manifest.version,
fragment_bitmap: new_frag_ids,
fragment_bitmap: Some(new_frag_ids),
};
removed_indices.push(idx.clone());
removed_indices.extend(removed.iter().map(|&idx| idx.clone()));
if deltas.len() > removed.len() {
new_indices.extend(
deltas[0..(deltas.len() - removed.len())]
.iter()
.map(|&idx| idx.clone()),
);
}
new_indices.push(new_idx);
}

Expand Down Expand Up @@ -533,6 +549,8 @@ impl DatasetIndexInternalExt for Dataset {

#[cfg(test)]
mod tests {
use crate::dataset::builder::DatasetBuilder;

use super::*;

use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator};
Expand Down Expand Up @@ -666,4 +684,95 @@ mod tests {
assert_eq!(stats["num_unindexed_rows"], 512);
assert_eq!(stats["num_indexed_rows"], 512);
}

#[tokio::test]
async fn test_optimize_delta_indices() {
let test_dir = tempdir().unwrap();
let dimensions = 16;
let column_name = "vec";
let field = Field::new(
column_name,
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dimensions,
),
false,
);
let schema = Arc::new(Schema::new(vec![field]));

let float_arr = generate_random_array(512 * dimensions as usize);

let vectors =
arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();

let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();

let reader = RecordBatchIterator::new(
vec![record_batch.clone()].into_iter().map(Ok),
schema.clone(),
);

let test_uri = test_dir.path().to_str().unwrap();
let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
let params = VectorIndexParams::ivf_pq(10, 8, 2, false, MetricType::L2, 10);
dataset
.create_index(
&[column_name],
IndexType::Vector,
Some("vec_idx".into()),
&params,
true,
)
.await
.unwrap();

let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
assert_eq!(stats["num_unindexed_rows"], 0);
assert_eq!(stats["num_indexed_rows"], 512);
assert_eq!(stats["num_indexed_fragments"], 1);
assert_eq!(stats["num_indices"], 1);

let reader =
RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
dataset.append(reader, None).await.unwrap();
let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();
let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
assert_eq!(stats["num_unindexed_rows"], 512);
assert_eq!(stats["num_indexed_rows"], 512);
assert_eq!(stats["num_indexed_fragments"], 1);
assert_eq!(stats["num_unindexed_fragments"], 1);
assert_eq!(stats["num_indices"], 1);

dataset
.optimize_indices(&OptimizeOptions {
num_indices_to_merge: 0, // Just create index for delta
})
.await
.unwrap();
let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();

let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
assert_eq!(stats["num_unindexed_rows"], 0);
assert_eq!(stats["num_indexed_rows"], 1024);
assert_eq!(stats["num_indexed_fragments"], 2);
assert_eq!(stats["num_unindexed_fragments"], 0);
assert_eq!(stats["num_indices"], 2);

dataset
.optimize_indices(&OptimizeOptions {
num_indices_to_merge: 2,
})
.await
.unwrap();
let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap();
assert_eq!(stats["num_unindexed_rows"], 0);
assert_eq!(stats["num_indexed_rows"], 1024);
assert_eq!(stats["num_indexed_fragments"], 2);
assert_eq!(stats["num_unindexed_fragments"], 0);
assert_eq!(stats["num_indices"], 1);
}
}
Loading

0 comments on commit 1671ea0

Please sign in to comment.