Skip to content

Commit

Permalink
fix by comments
Browse files Browse the repository at this point in the history
Signed-off-by: cutecutecat <[email protected]>
  • Loading branch information
cutecutecat committed Apr 3, 2024
1 parent 2a748f3 commit 1b20043
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 15 deletions.
21 changes: 11 additions & 10 deletions crates/index/src/optimizing/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use base::index::*;
use base::operator::Borrowed;
pub use base::search::*;
pub use base::vector::*;
use crossbeam::channel::TryRecvError;
use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender};
use std::cmp::Reverse;
use std::convert::Infallible;
Expand Down Expand Up @@ -98,29 +99,29 @@ impl<O: Op> OptimizerIndexing<O> {
}),
)
}
fn main(self, shutdown: Receiver<Infallible>) {
fn main(self, shutdown_rx: Receiver<Infallible>) {
let index = self.index;
loop {
let view = index.view();
let threads = view.flexible.optimizing_threads;
let (finish_tx, finish) = bounded::<Infallible>(1);
let (finish_tx, finish_rx) = bounded::<Infallible>(1);
rayon::ThreadPoolBuilder::new()
.num_threads(threads as usize)
.build_scoped(|pool| {
std::thread::scope(|scope| {
let handler = scope.spawn(|| {
monitor(&finish, &shutdown);
monitor(&finish_rx, &shutdown_rx);
pool.stop();
});
pool.install(|| {
let _finish_tx = finish_tx;
let _ = optimizing_indexing(index.clone());
drop(finish_tx);
});
let _ = handler.join();
})
})
.unwrap();
match shutdown.recv_timeout(std::time::Duration::from_secs(60)) {
match shutdown_rx.recv_timeout(std::time::Duration::from_secs(60)) {
Ok(never) => match never {},
Err(RecvTimeoutError::Disconnected) => return,
Err(RecvTimeoutError::Timeout) => (),
Expand All @@ -130,17 +131,17 @@ impl<O: Op> OptimizerIndexing<O> {
}

/// Monitor the internal finish and the external shutdown of `optimizing_indexing`
fn monitor(finish: &Receiver<Infallible>, shutdown: &Receiver<Infallible>) {
fn monitor(finish_rx: &Receiver<Infallible>, shutdown_rx: &Receiver<Infallible>) {
let timeout = std::time::Duration::from_secs(1);
loop {
match finish.recv_timeout(timeout) {
match finish_rx.try_recv() {
Ok(never) => match never {},
Err(RecvTimeoutError::Disconnected) => {
Err(TryRecvError::Disconnected) => {
return;
}
Err(RecvTimeoutError::Timeout) => (),
Err(TryRecvError::Empty) => (),
}
match shutdown.recv_timeout(timeout) {
match shutdown_rx.recv_timeout(timeout) {
Ok(never) => match never {},
Err(RecvTimeoutError::Disconnected) => {
return;
Expand Down
10 changes: 5 additions & 5 deletions crates/rayon/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![feature(thread_local)]

use rayoff as rayon;
use std::cell::OnceCell;
use std::cell::RefCell;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -53,10 +53,10 @@ impl ThreadPoolBuilder {
match std::panic::catch_unwind(AssertUnwindSafe(|| {
self.builder
.start_handler(move |_| {
unsafe { STOP.set(stop_value.clone()).unwrap() };
STOP.replace(Some(stop_value.clone()));
})
.exit_handler(|_| {
unsafe { STOP.take() };
STOP.take();
})
.panic_handler(|e| {
if e.downcast_ref::<CheckPanic>().is_some() {
Expand Down Expand Up @@ -109,12 +109,12 @@ impl<'a> ThreadPool<'a> {
}

#[thread_local]
static mut STOP: OnceCell<Arc<AtomicBool>> = OnceCell::new();
static STOP: RefCell<Option<Arc<AtomicBool>>> = RefCell::new(None);

struct CheckPanic;

pub fn check() {
if let Some(stop) = unsafe { STOP.get() } {
if let Some(stop) = STOP.borrow().as_ref() {
if stop.load(Ordering::Relaxed) {
std::panic::panic_any(CheckPanic);
}
Expand Down
1 change: 1 addition & 0 deletions tests/crash/kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,4 @@ def process_filter(p: psutil.Process) -> bool:
logging.info(f"Background worker recreated {pids}")
break
time.sleep(1)

Empty file modified tests/crash/test.sh
100755 → 100644
Empty file.
14 changes: 14 additions & 0 deletions tests/sealing/check.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
query I
SELECT idx_indexing FROM pg_vector_index_stat WHERE indexname = 'i';
----
f

query I
SELECT idx_growing FROM pg_vector_index_stat WHERE indexname = 'i';
----
{}

query I
SELECT idx_sealed FROM pg_vector_index_stat WHERE indexname = 'i';
----
{1000}
9 changes: 9 additions & 0 deletions tests/sealing/create.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
statement ok
CREATE TABLE t (val vector(3));

statement ok
INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000);

statement ok
CREATE INDEX i ON t USING vectors (val vector_l2_ops)
WITH (options = "[indexing.hnsw]");
7 changes: 7 additions & 0 deletions tests/sealing/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -e

# Test the background threads `optimizing.indexing` and `optimizing.sealing` working properly
sqllogictest -u runner -d runner $(dirname $0)/create.slt
sleep 240
sqllogictest -u runner -d runner $(dirname $0)/check.slt

0 comments on commit 1b20043

Please sign in to comment.