Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standalone tests (except rollovers) #2629

Merged
merged 8 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions nidx/nidx_binding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use pyo3::prelude::*;

use nidx::api::grpc::ApiServer;
use nidx::grpc_server::GrpcServer;
use nidx::indexer::{download_message, index_resource};
use nidx::indexer::process_index_message;
use nidx::searcher::grpc::SearchServer;
use nidx::searcher::SyncedSearcher;
use nidx::searcher::{SyncStatus, SyncedSearcher};
use nidx::settings::EnvSettings;
use nidx::Settings;
use nidx_protos::prost::*;
Expand All @@ -17,6 +17,7 @@ use std::path::Path;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::sync::watch;

#[derive(Clone)]
struct SeqSource(Arc<AtomicI64>);
Expand All @@ -37,6 +38,7 @@ pub struct NidxBinding {
settings: Settings,
seq: SeqSource,
runtime: Option<Runtime>,
sync_watcher: watch::Receiver<SyncStatus>,
}

#[pymethods]
Expand All @@ -52,18 +54,16 @@ impl NidxBinding {
}

pub fn index(&mut self, bytes: Vec<u8>) -> PyResult<i64> {
// TODO: Can this be simplified into the indexer code?
let msg = IndexMessage::decode(&bytes[..]).unwrap();

let seq = self.seq.0.load(std::sync::atomic::Ordering::Relaxed);
let object_store = self.settings.indexer.as_ref().unwrap().object_store.clone();
let result = self.runtime.as_ref().unwrap().block_on(async {
let resource = download_message(object_store, &msg.storage_key).await?;
index_resource(
process_index_message(
&self.settings.metadata,
object_store,
self.settings.storage.as_ref().unwrap().object_store.clone(),
&msg.shard,
resource,
msg,
seq.into(),
)
.await
Expand All @@ -76,6 +76,16 @@ impl NidxBinding {

Ok(seq)
}

/// Wait for the searcher to be synced. Used in nucliadb tests
pub fn wait_for_sync(&mut self) {
self.runtime.as_ref().unwrap().block_on(async {
// Wait for a new sync to start
self.sync_watcher.wait_for(|s| matches!(s, SyncStatus::Syncing)).await.unwrap();
// Wait for it to finish
self.sync_watcher.wait_for(|s| matches!(s, SyncStatus::Synced)).await.unwrap();
});
}
}

impl NidxBinding {
Expand All @@ -89,15 +99,16 @@ impl NidxBinding {
tokio::task::spawn(api_server.serve(api_service));

// Searcher API
let (sync_reporter, sync_watcher) = watch::channel(SyncStatus::Syncing);
let searcher = SyncedSearcher::new(settings.metadata.clone(), Path::new("/tmp/searcher"));
let searcher_api = SearchServer::new(settings.metadata.clone(), searcher.index_cache());
let searcher_server = GrpcServer::new("localhost:0").await?;
let searcher_port = searcher_server.port()?;
tokio::task::spawn(searcher_server.serve(searcher_api.into_service()));
let settings_copy = settings.clone();
tokio::task::spawn(
async move { searcher.run(settings_copy.storage.as_ref().unwrap().object_store.clone()).await },
);
tokio::task::spawn(async move {
searcher.run(settings_copy.storage.as_ref().unwrap().object_store.clone(), Some(sync_reporter)).await
});

// Scheduler
let seq = SeqSource(Arc::new(settings.metadata.max_seq().await?.into()));
Expand All @@ -118,6 +129,7 @@ impl NidxBinding {
settings,
seq,
runtime: None,
sync_watcher,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion nidx/nidx_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use relation_node::NodeType;
use uuid::Uuid;

pub fn minimal_resource(shard_id: String) -> Resource {
let resource_id = Uuid::new_v4().to_string();
let resource_id = Uuid::new_v4().simple().to_string();

let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
let timestamp = Timestamp {
Expand Down
61 changes: 43 additions & 18 deletions nidx/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use futures::stream::StreamExt;
use nidx_protos::prost::*;
use nidx_protos::IndexMessage;
use nidx_protos::Resource;
use nidx_protos::TypeMessage;
use nidx_types::Seq;
use object_store::{DynObjectStore, ObjectStore};
use std::path::Path;
Expand Down Expand Up @@ -70,32 +71,54 @@ pub async fn run(settings: Settings) -> anyhow::Result<()> {
continue;
};

// TODO: Implement deletions
let resource = match download_message(indexer_storage.clone(), &index_message.storage_key).await {
Ok(r) => r,
Err(e) => {
warn!("Error downloading index message {e:?}");
continue;
}
};
if let Err(e) =
process_index_message(&meta, indexer_storage.clone(), segment_storage.clone(), index_message, seq).await
{
warn!("Error processing index message {e:?}");
continue;
}

match index_resource(&meta, segment_storage.clone(), &index_message.shard, resource, seq).await {
Ok(()) => {
if let Err(e) = acker.ack().await {
warn!("Error ack'ing NATS message {e:?}")
}
}
Err(e) => {
warn!("Error processing index message {e:?}")
}
};
if let Err(e) = acker.ack().await {
warn!("Error acking index message {e:?}");
continue;
}

// TODO: Delete indexer message on success
}

Ok(())
}

pub async fn process_index_message(
meta: &NidxMetadata,
indexer_storage: Arc<DynObjectStore>,
segment_storage: Arc<DynObjectStore>,
index_message: IndexMessage,
seq: Seq,
) -> anyhow::Result<()> {
match index_message.typemessage() {
TypeMessage::Deletion => delete_resource(meta, &index_message.shard, index_message.resource, seq).await,
TypeMessage::Creation => {
let resource = download_message(indexer_storage, &index_message.storage_key).await?;
index_resource(meta, segment_storage, &index_message.shard, resource, seq).await
}
}
}

pub async fn delete_resource(meta: &NidxMetadata, shard_id: &str, resource: String, seq: Seq) -> anyhow::Result<()> {
let shard_id = Uuid::parse_str(shard_id)?;
let indexes = Index::for_shard(&meta.pool, shard_id).await?;

let mut tx = meta.transaction().await?;
for index in indexes {
Deletion::create(&mut *tx, index.id, seq, &[resource.clone()]).await?;
index.updated(&mut *tx).await?;
}
tx.commit().await?;

Ok(())
}

pub async fn download_message(storage: Arc<DynObjectStore>, storage_key: &str) -> anyhow::Result<Resource> {
let get_result = storage.get(&object_store::path::Path::from(storage_key)).await?;
let bytes = get_result.bytes().await?;
Expand All @@ -120,6 +143,8 @@ pub async fn index_resource(
let num_vector_indexes = indexes.iter().filter(|i| matches!(i.kind, IndexKind::Vector)).count();
let single_vector_index = num_vector_indexes == 1;

// TODO: Index in parallel
// TODO: Save all indexes as a transaction (to avoid issues reprocessing the same message)
for index in indexes {
let output_dir = tempfile::tempdir()?;

Expand Down
8 changes: 5 additions & 3 deletions nidx/src/searcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use index_cache::IndexCache;
use object_store::DynObjectStore;
use sync::run_sync;
use sync::SyncMetadata;
use tokio::sync::watch;

use std::path::Path;
use std::sync::Arc;
Expand All @@ -40,6 +41,7 @@ use crate::grpc_server::GrpcServer;
use crate::{NidxMetadata, Settings};

pub use index_cache::IndexSearcher;
pub use sync::SyncStatus;

pub struct SyncedSearcher {
index_cache: Arc<IndexCache>,
Expand All @@ -59,7 +61,7 @@ impl SyncedSearcher {
}
}

pub async fn run(&self, storage: Arc<DynObjectStore>) {
pub async fn run(&self, storage: Arc<DynObjectStore>, watcher: Option<watch::Sender<SyncStatus>>) {
let (tx, mut rx) = tokio::sync::mpsc::channel(1000);
let index_cache_copy = self.index_cache.clone();
let refresher_task = tokio::task::spawn(async move {
Expand All @@ -69,7 +71,7 @@ impl SyncedSearcher {
}
});
let sync_task =
tokio::task::spawn(run_sync(self.meta.clone(), storage.clone(), self.sync_metadata.clone(), tx));
tokio::task::spawn(run_sync(self.meta.clone(), storage.clone(), self.sync_metadata.clone(), tx, watcher));
tokio::select! {
r = sync_task => {
println!("sync_task() completed first {:?}", r)
Expand All @@ -95,7 +97,7 @@ pub async fn run(settings: Settings) -> anyhow::Result<()> {
let api = grpc::SearchServer::new(meta.clone(), searcher.index_cache());
let server = GrpcServer::new("0.0.0.0:10001").await?;
let api_task = tokio::task::spawn(server.serve(api.into_service()));
let search_task = tokio::task::spawn(async move { searcher.run(storage).await });
let search_task = tokio::task::spawn(async move { searcher.run(storage, None).await });

tokio::select! {
r = search_task => {
Expand Down
25 changes: 16 additions & 9 deletions nidx/src/searcher/shard_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ pub async fn search(
search_request: SearchRequest,
) -> NidxResult<SearchResponse> {
let shard_id = uuid::Uuid::parse_str(&search_request.shard)?;
if search_request.vectorset.is_empty() {
return Err(NidxError::invalid("Vectorset is required"));
}

let query_plan = query_planner::build_query_plan(search_request.clone())?;

// TODO: Avoid querying here, the information can be take from synced metadata
let paragraph_index = Index::find(&meta.pool, shard_id, IndexKind::Paragraph, "paragraph").await?;
Expand All @@ -54,16 +53,24 @@ pub async fn search(
let text_index = Index::find(&meta.pool, shard_id, IndexKind::Text, "text").await?;
let text_searcher_arc = index_cache.get(&text_index.id).await?;

let vector_index = Index::find(&meta.pool, shard_id, IndexKind::Vector, &search_request.vectorset).await?;
let vector_seacher_arc = index_cache.get(&vector_index.id).await?;
// TODO: Better way to check this
let vector_seacher_arc = if query_plan.index_queries.vectors_request.is_some() {
if search_request.vectorset.is_empty() {
return Err(NidxError::invalid("Vectorset is required"));
}
let vector_index = Index::find(&meta.pool, shard_id, IndexKind::Vector, &search_request.vectorset).await?;
Some(index_cache.get(&vector_index.id).await?)
} else {
None
};

let search_results = tokio::task::spawn_blocking(move || {
blocking_search(
search_request,
paragraph_searcher_arc.as_ref().into(),
relation_searcher_arc.as_ref().into(),
text_searcher_arc.as_ref().into(),
vector_seacher_arc.as_ref().into(),
vector_seacher_arc.as_ref().map(|v| v.as_ref().into()),
)
})
.await??;
Expand All @@ -75,10 +82,10 @@ fn blocking_search(
paragraph_searcher: &ParagraphSearcher,
relation_searcher: &RelationSearcher,
text_searcher: &TextSearcher,
vector_searcher: &VectorSearcher,
vector_searcher: Option<&VectorSearcher>,
) -> anyhow::Result<SearchResponse> {
let query_plan = query_planner::build_query_plan(search_request)?;
let search_id = uuid::Uuid::new_v4().to_string();
let query_plan = query_planner::build_query_plan(search_request)?;
let mut index_queries = query_plan.index_queries;

// Apply pre-filtering to the query plan
Expand All @@ -102,7 +109,7 @@ fn blocking_search(

let vector_task = index_queries.vectors_request.map(|mut request| {
request.id = search_id.clone();
move || vector_searcher.search(&request, &index_queries.vectors_context)
move || vector_searcher.unwrap().search(&request, &index_queries.vectors_context)
});

let mut rtext = None;
Expand Down
13 changes: 13 additions & 0 deletions nidx/src/searcher/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,26 @@ use std::{
collections::{HashMap, HashSet},
path::PathBuf,
};
use tokio::sync::watch;
use tokio::sync::{mpsc::Sender, OwnedRwLockReadGuard, RwLock, RwLockReadGuard};

pub enum SyncStatus {
Syncing,
Synced,
}

pub async fn run_sync(
meta: NidxMetadata,
storage: Arc<DynObjectStore>,
index_metadata: Arc<SyncMetadata>,
notifier: Sender<IndexId>,
sync_status: Option<watch::Sender<SyncStatus>>,
) -> anyhow::Result<()> {
let mut last_updated_at = PrimitiveDateTime::MIN.replace_year(2000)?;
loop {
if let Some(ref sync_status) = sync_status {
sync_status.send(SyncStatus::Syncing)?;
}
let deleted = Index::marked_to_delete(&meta.pool).await?;
for index_id in deleted.into_iter() {
// TODO: Handle errors
Expand All @@ -51,6 +61,9 @@ pub async fn run_sync(
last_updated_at = max(last_updated_at, index.updated_at);
sync_index(&meta, storage.clone(), Arc::clone(&index_metadata), index, &notifier).await?;
}
if let Some(ref sync_status) = sync_status {
sync_status.send(SyncStatus::Synced)?;
}

tokio::time::sleep(Duration::from_secs(1)).await;
}
Expand Down
6 changes: 3 additions & 3 deletions nidx/tests/common/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ impl NidxFixture {
let searcher_port = searcher_server.port()?;
tokio::task::spawn(searcher_server.serve(searcher_api.into_service()));
let settings_copy = settings.clone();
tokio::task::spawn(
async move { searcher.run(settings_copy.storage.as_ref().unwrap().object_store.clone()).await },
);
tokio::task::spawn(async move {
searcher.run(settings_copy.storage.as_ref().unwrap().object_store.clone(), None).await
});

// Clients
let searcher_client = NidxSearcherClient::connect(format!("http://localhost:{searcher_port}")).await?;
Expand Down
18 changes: 11 additions & 7 deletions nidx/tests/synced_searcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
use std::sync::Arc;
use std::time::Duration;

use nidx::indexer::index_resource;
use nidx::metadata::Deletion;
use nidx::indexer::{index_resource, process_index_message};
use nidx::searcher::SyncedSearcher;
use nidx::{
metadata::{Index, Shard},
NidxMetadata,
};
use nidx_protos::VectorSearchRequest;
use nidx_protos::{IndexMessage, TypeMessage, VectorSearchRequest};
use nidx_tests::*;
use nidx_vector::config::VectorConfig;
use nidx_vector::VectorSearcher;
Expand All @@ -43,7 +42,7 @@ async fn test_synchronization(pool: sqlx::PgPool) -> anyhow::Result<()> {
let synced_searcher = SyncedSearcher::new(meta.clone(), work_dir.path());
let index_cache = synced_searcher.index_cache();
let storage_copy = storage.clone();
let search_task = tokio::spawn(async move { synced_searcher.run(storage_copy).await });
let search_task = tokio::spawn(async move { synced_searcher.run(storage_copy, None).await });

let index = Index::create(
&meta.pool,
Expand Down Expand Up @@ -75,9 +74,14 @@ async fn test_synchronization(pool: sqlx::PgPool) -> anyhow::Result<()> {
assert_eq!(result.documents.len(), 1);

// Delete the resource, it should disappear from results
// TODO: Test by sending a deletion message to the deletion method (not implemented yet)
Deletion::create(&meta.pool, index.id, 2i64.into(), &[resource.resource.unwrap().uuid]).await?;
index.updated(&meta.pool).await?;
let deletion = IndexMessage {
shard: index.shard_id.to_string(),
resource: resource.resource.unwrap().uuid,
typemessage: TypeMessage::Deletion.into(),
..Default::default()
};
// We will not use indexer storage here, so it's fine to pass an incorrect indexer storage
process_index_message(&meta, storage.clone(), storage.clone(), deletion, 2i64.into()).await?;

tokio::time::sleep(Duration::from_secs(2)).await;

Expand Down
Loading
Loading