Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pq refactor #25

Open
wants to merge 8 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions vectorlink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ itertools = "0.10"
chrono = "0.4.26"
rayon = "1.8.0"
libc = "0.2.153"
linfa = "0.7.0"
linfa-clustering = "0.7.0"
ndarray = "0.15.6"

[dev-dependencies]
assert_float_eq = "1.1.3"
Expand Down
63 changes: 28 additions & 35 deletions vectorlink/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ use urlencoding::encode;

use crate::{
comparator::{
Centroid16Comparator, DiskOpenAIComparator, OpenAIComparator, Quantized16Comparator,
Centroid16Comparator, DiskOpenAIComparator, DomainQuantizer, HnswQuantizer16,
OpenAIComparator, Quantized16Comparator,
},
configuration::HnswConfiguration,
domain::{PqDerivedDomainInfo16, PqDerivedDomainInitializer16},
indexer::{create_index_name, index_serialization_path},
openai::{embeddings_for, EmbeddingError, Model},
server::Operation,
store::VectorFile,
vecmath::{Embedding, CENTROID_16_LENGTH, EMBEDDING_LENGTH, QUANTIZED_16_EMBEDDING_LENGTH},
vectors::VectorStore,
};
Expand Down Expand Up @@ -56,36 +59,13 @@ pub enum VectorizationError {
Io(#[from] io::Error),
}

async fn save_embeddings(
vec_file: &mut File,
offset: usize,
embeddings: &[Embedding],
) -> Result<(), VectorizationError> {
let transmuted = unsafe {
std::slice::from_raw_parts(
embeddings.as_ptr() as *const u8,
std::mem::size_of_val(embeddings),
)
};
vec_file
.seek(SeekFrom::Start(
(offset * std::mem::size_of::<Embedding>()) as u64,
))
.await?;
vec_file.write_all(transmuted).await?;
vec_file.flush().await?;
vec_file.sync_data().await?;

Ok(())
}

pub async fn vectorize_from_operations<
S: Stream<Item = io::Result<Operation>>,
P: AsRef<Path> + Unpin,
>(
api_key: &str,
model: Model,
vec_file: &mut File,
vec_file: &mut VectorFile<Embedding>,
op_stream: S,
progress_file_path: P,
) -> Result<usize, VectorizationError> {
Expand Down Expand Up @@ -122,7 +102,7 @@ pub async fn vectorize_from_operations<
let (embeddings, chunk_failures) = embeds.unwrap()?;
eprintln!("retrieved embeddings");

save_embeddings(vec_file, offset as usize, &embeddings).await?;
vec_file.append_vector_range(&embeddings)?;
eprintln!("saved embeddings");
failures += chunk_failures;
offset += embeddings.len() as u64;
Expand Down Expand Up @@ -190,7 +170,7 @@ pub async fn index_using_operations_and_vectors<
op_file_path: P2,
size: usize,
id_offset: u64,
quantize_hnsw: bool,
quantize_hnsw: Option<&str>,
model: Model,
) -> Result<(), IndexingError> {
// Start at last hnsw offset
Expand Down Expand Up @@ -257,20 +237,37 @@ pub async fn index_using_operations_and_vectors<
.collect();

eprintln!("ready to generate hnsw");
let hnsw = if quantize_hnsw {
let hnsw = if let Some(pq_name) = quantize_hnsw {
let number_of_vectors = NUMBER_OF_CENTROIDS / 10;
let c = DiskOpenAIComparator::new(
domain_obj.name().to_owned(),
Arc::new(domain_obj.immutable_file()),
);

let derived_domain_info = domain_obj.get_derived_domain_info(pq_name);
if derived_domain_info.is_none() {
eprintln!("pq derived domain ({pq_name}) doesn't exist yet. constructing now");
domain_obj
.create_derived(pq_name.to_string(), PqDerivedDomainInitializer16::default())
.unwrap(); // TODO
}
// lazy - we just look it up again and now it should exist
let derived_domain_info: PqDerivedDomainInfo16 =
domain_obj.get_derived_domain_info(pq_name).unwrap();

let quantizer = derived_domain_info.quantizer.clone());

let quantized_comparator =
Quantized16Comparator::load(&vs, domain.to_string(), pq_name.to_string())?;

let hnsw: QuantizedHnsw<
EMBEDDING_LENGTH,
CENTROID_16_LENGTH,
QUANTIZED_16_EMBEDDING_LENGTH,
Centroid16Comparator,
Quantized16Comparator,
DiskOpenAIComparator,
> = QuantizedHnsw::new(number_of_vectors, c);
Arc<HnswQuantizer16>,
> = QuantizedHnsw::generate(quantizer, quantized_comparator, c, vecs);
HnswConfiguration::SmallQuantizedOpenAi(model, hnsw)
} else {
let hnsw = Hnsw::generate(comparator, vecs, 24, 48, 12);
Expand Down Expand Up @@ -303,11 +300,7 @@ pub async fn index_from_operations_file<P: AsRef<Path>>(

let mut vector_path = staging_path.clone();
vector_path.push("vectors");
let mut vec_file = OpenOptions::new()
.create(true)
.write(true)
.open(&vector_path)
.await?;
let mut vec_file = VectorFile::open_create(&vector_path, true)?;
let mut progress_file_path = staging_path.clone();
progress_file_path.push("progress");

Expand Down
Loading
Loading