From 1b20043150818a235335d203ac64efe790d850fb Mon Sep 17 00:00:00 2001 From: cutecutecat Date: Wed, 3 Apr 2024 11:56:59 +0800 Subject: [PATCH] fix by comments Signed-off-by: cutecutecat --- crates/index/src/optimizing/indexing.rs | 21 +++++++++++---------- crates/rayon/src/lib.rs | 10 +++++----- tests/crash/kill.py | 1 + tests/crash/test.sh | 0 tests/sealing/check.slt | 14 ++++++++++++++ tests/sealing/create.slt | 9 +++++++++ tests/sealing/test.sh | 7 +++++++ 7 files changed, 47 insertions(+), 15 deletions(-) mode change 100755 => 100644 tests/crash/test.sh create mode 100644 tests/sealing/check.slt create mode 100644 tests/sealing/create.slt create mode 100755 tests/sealing/test.sh diff --git a/crates/index/src/optimizing/indexing.rs b/crates/index/src/optimizing/indexing.rs index 917d8f9c5..76f6086a4 100644 --- a/crates/index/src/optimizing/indexing.rs +++ b/crates/index/src/optimizing/indexing.rs @@ -7,6 +7,7 @@ pub use base::index::*; use base::operator::Borrowed; pub use base::search::*; pub use base::vector::*; +use crossbeam::channel::TryRecvError; use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender}; use std::cmp::Reverse; use std::convert::Infallible; @@ -98,29 +99,29 @@ impl OptimizerIndexing { }), ) } - fn main(self, shutdown: Receiver) { + fn main(self, shutdown_rx: Receiver) { let index = self.index; loop { let view = index.view(); let threads = view.flexible.optimizing_threads; - let (finish_tx, finish) = bounded::(1); + let (finish_tx, finish_rx) = bounded::(1); rayon::ThreadPoolBuilder::new() .num_threads(threads as usize) .build_scoped(|pool| { std::thread::scope(|scope| { let handler = scope.spawn(|| { - monitor(&finish, &shutdown); + monitor(&finish_rx, &shutdown_rx); pool.stop(); }); pool.install(|| { + let _finish_tx = finish_tx; let _ = optimizing_indexing(index.clone()); - drop(finish_tx); }); let _ = handler.join(); }) }) .unwrap(); - match shutdown.recv_timeout(std::time::Duration::from_secs(60)) { + match shutdown_rx.recv_timeout(std::time::Duration::from_secs(60)) { Ok(never) => match never {}, Err(RecvTimeoutError::Disconnected) => return, Err(RecvTimeoutError::Timeout) => (), @@ -130,17 +131,17 @@ impl OptimizerIndexing { } /// Monitor the internal finish and the external shutdown of `optimizing_indexing` -fn monitor(finish: &Receiver, shutdown: &Receiver) { +fn monitor(finish_rx: &Receiver, shutdown_rx: &Receiver) { let timeout = std::time::Duration::from_secs(1); loop { - match finish.recv_timeout(timeout) { + match finish_rx.try_recv() { Ok(never) => match never {}, - Err(RecvTimeoutError::Disconnected) => { + Err(TryRecvError::Disconnected) => { return; } - Err(RecvTimeoutError::Timeout) => (), + Err(TryRecvError::Empty) => (), } - match shutdown.recv_timeout(timeout) { + match shutdown_rx.recv_timeout(timeout) { Ok(never) => match never {}, Err(RecvTimeoutError::Disconnected) => { return; diff --git a/crates/rayon/src/lib.rs b/crates/rayon/src/lib.rs index 7bf04d49f..fce0b763e 100644 --- a/crates/rayon/src/lib.rs +++ b/crates/rayon/src/lib.rs @@ -1,7 +1,7 @@ #![feature(thread_local)] use rayoff as rayon; -use std::cell::OnceCell; +use std::cell::RefCell; use std::panic::AssertUnwindSafe; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -53,10 +53,10 @@ impl ThreadPoolBuilder { match std::panic::catch_unwind(AssertUnwindSafe(|| { self.builder .start_handler(move |_| { - unsafe { STOP.set(stop_value.clone()).unwrap() }; + STOP.replace(Some(stop_value.clone())); }) .exit_handler(|_| { - unsafe { STOP.take() }; + STOP.take(); }) .panic_handler(|e| { if e.downcast_ref::().is_some() { @@ -109,12 +109,12 @@ impl<'a> ThreadPool<'a> { } #[thread_local] -static mut STOP: OnceCell> = OnceCell::new(); +static STOP: RefCell>> = RefCell::new(None); struct CheckPanic; pub fn check() { - if let Some(stop) = unsafe { STOP.get() } { + if let Some(stop) = STOP.borrow().as_ref() { if stop.load(Ordering::Relaxed) { std::panic::panic_any(CheckPanic); } diff --git a/tests/crash/kill.py b/tests/crash/kill.py index 4377c22cf..e5ed3a2b3 100644 --- a/tests/crash/kill.py +++ b/tests/crash/kill.py @@ -54,3 +54,4 @@ def process_filter(p: psutil.Process) -> bool: logging.info(f"Background worker recreated {pids}") break time.sleep(1) + \ No newline at end of file diff --git a/tests/crash/test.sh b/tests/crash/test.sh old mode 100755 new mode 100644 diff --git a/tests/sealing/check.slt b/tests/sealing/check.slt new file mode 100644 index 000000000..0d9dab0dd --- /dev/null +++ b/tests/sealing/check.slt @@ -0,0 +1,14 @@ +query I +SELECT idx_indexing FROM pg_vector_index_stat WHERE indexname = 'i'; +---- +f + +query I +SELECT idx_growing FROM pg_vector_index_stat WHERE indexname = 'i'; +---- +{} + +query I +SELECT idx_sealed FROM pg_vector_index_stat WHERE indexname = 'i'; +---- +{1000} diff --git a/tests/sealing/create.slt b/tests/sealing/create.slt new file mode 100644 index 000000000..38647c04f --- /dev/null +++ b/tests/sealing/create.slt @@ -0,0 +1,9 @@ +statement ok +CREATE TABLE t (val vector(3)); + +statement ok +INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000); + +statement ok +CREATE INDEX i ON t USING vectors (val vector_l2_ops) +WITH (options = "[indexing.hnsw]"); diff --git a/tests/sealing/test.sh b/tests/sealing/test.sh new file mode 100755 index 000000000..c5ffbb209 --- /dev/null +++ b/tests/sealing/test.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -e + +# Test the background threads `optimizing.indexing` and `optimizing.sealing` working properly +sqllogictest -u runner -d runner $(dirname $0)/create.slt +sleep 240 +sqllogictest -u runner -d runner $(dirname $0)/check.slt