Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
🐛 Fix closing DB which leaves the database locked
Browse files Browse the repository at this point in the history
  • Loading branch information
hrmhatef committed Dec 6, 2022
1 parent 94522fb commit bdde4df
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 69 deletions.
50 changes: 28 additions & 22 deletions src/database/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use neon::types::buffer::TypedArray;

use crate::batch;
use crate::database::options::IterationOption;
use crate::database::traits::JsNewWithBox;
use crate::database::traits::{JsNewWithBoxRef, Unwrap};
use crate::database::types::JsBoxRef;
use crate::database::utils;
use crate::database::DB;

Expand All @@ -15,8 +16,9 @@ pub struct Error {
message: String,
}

pub type SharedDatabase = JsBoxRef<Database>;
pub type Database = DB;
impl JsNewWithBox for Database {}
impl JsNewWithBoxRef for Database {}
impl Database {
fn send_over_channel(
channel: &Channel,
Expand All @@ -43,20 +45,20 @@ impl Database {
/// - @params(1) - callback to return the result.
/// - @callback(0) - Error.
pub fn js_clear(mut ctx: FunctionContext) -> JsResult<JsUndefined> {
// Get the `this` value as a `JsBox<Database>`
let db = ctx
.this()
.downcast_or_throw::<JsBox<Database>, _>(&mut ctx)?;
.downcast_or_throw::<SharedDatabase, _>(&mut ctx)?;
let db = db.borrow();
let callback = ctx.argument::<JsFunction>(1)?.root(&mut ctx);

let conn = db.arc_clone();
db.send(move |channel| {
let mut batch = rocksdb::WriteBatch::default();
let conn_iter = conn.iterator(rocksdb::IteratorMode::Start);
let conn_iter = conn.unwrap().iterator(rocksdb::IteratorMode::Start);
for key_val in conn_iter {
batch.delete(&(key_val.unwrap().0));
}
let result = conn.write(batch);
let result = conn.unwrap().write(batch);
Database::send_over_channel(channel, callback, result);
})
.or_else(|err| ctx.throw_error(err.to_string()))?;
Expand All @@ -67,9 +69,9 @@ impl Database {
/// js_close is handler for JS ffi.
/// js "this" - DB.
pub fn js_close(mut ctx: FunctionContext) -> JsResult<JsUndefined> {
// Get the `this` value as a `JsBox<Database>`
ctx.this()
.downcast_or_throw::<JsBox<Database>, _>(&mut ctx)?
.downcast_or_throw::<SharedDatabase, _>(&mut ctx)?
.borrow_mut()
.close()
.or_else(|err| ctx.throw_error(err.to_string()))?;

Expand All @@ -85,10 +87,10 @@ impl Database {
pub fn js_get(mut ctx: FunctionContext) -> JsResult<JsUndefined> {
let key = ctx.argument::<JsTypedArray<u8>>(0)?.as_slice(&ctx).to_vec();
let callback = ctx.argument::<JsFunction>(1)?.root(&mut ctx);
// Get the `this` value as a `JsBox<Database>`
let db = ctx
.this()
.downcast_or_throw::<JsBox<Database>, _>(&mut ctx)?;
.downcast_or_throw::<SharedDatabase, _>(&mut ctx)?;
let db = db.borrow();

db.get_by_key(key, callback)
.or_else(|err| ctx.throw_error(err.to_string()))?;
Expand All @@ -105,10 +107,10 @@ impl Database {
pub fn js_exists(mut ctx: FunctionContext) -> JsResult<JsUndefined> {
let key = ctx.argument::<JsTypedArray<u8>>(0)?.as_slice(&ctx).to_vec();
let callback = ctx.argument::<JsFunction>(1)?.root(&mut ctx);
// Get the `this` value as a `JsBox<Database>`
let db = ctx
.this()
.downcast_or_throw::<JsBox<Database>, _>(&mut ctx)?;
.downcast_or_throw::<SharedDatabase, _>(&mut ctx)?;
let db = db.borrow();

db.exists(key, callback)
.or_else(|err| ctx.throw_error(err.to_string()))?;
Expand All @@ -126,10 +128,10 @@ impl Database {
let key = ctx.argument::<JsTypedArray<u8>>(0)?.as_slice(&ctx).to_vec();
let value = ctx.argument::<JsTypedArray<u8>>(1)?.as_slice(&ctx).to_vec();
let callback = ctx.argument::<JsFunction>(2)?.root(&mut ctx);
// Get the `this` value as a `JsBox<Database>`
let db = ctx
.this()
.downcast_or_throw::<JsBox<Database>, _>(&mut ctx)?;
.downcast_or_throw::<SharedDatabase, _>(&mut ctx)?;
let db = db.borrow();

let result = db.put(&key, &value);
db.send(move |channel| {
Expand All @@ -148,10 +150,10 @@ impl Database {
pub fn js_del(mut ctx: FunctionContext) -> JsResult<JsUndefined> {
let key = ctx.argument::<JsTypedArray<u8>>(0)?.as_slice(&ctx).to_vec();
let callback = ctx.argument::<JsFunction>(1)?.root(&mut ctx);
// Get the `this` value as a `JsBox<Database>`
let db = ctx
.this()
.downcast_or_throw::<JsBox<Database>, _>(&mut ctx)?;
.downcast_or_throw::<SharedDatabase, _>(&mut ctx)?;
let db = db.borrow();

let result = db.delete(&key);
db.send(move |channel| {
Expand All @@ -175,7 +177,8 @@ impl Database {

let db = ctx
.this()
.downcast_or_throw::<JsBox<Database>, _>(&mut ctx)?;
.downcast_or_throw::<SharedDatabase, _>(&mut ctx)?;
let db = db.borrow();

let batch = Arc::clone(&batch.borrow());
let conn = db.arc_clone();
Expand All @@ -184,7 +187,7 @@ impl Database {
let inner_batch = batch.lock().unwrap();
let mut write_batch = batch::WriteBatch { batch: write_batch };
inner_batch.batch.iterate(&mut write_batch);
let result = conn.write(write_batch.batch);
let result = conn.unwrap().write(write_batch.batch);
Database::send_over_channel(channel, callback, result);
})
.or_else(|err| ctx.throw_error(err.to_string()))?;
Expand All @@ -205,16 +208,18 @@ impl Database {
let options = IterationOption::new(&mut ctx, option_inputs);
let callback_on_data = ctx.argument::<JsFunction>(1)?.root(&mut ctx);
let callback_done = ctx.argument::<JsFunction>(2)?.root(&mut ctx);
// Get the `this` value as a `JsBox<Database>`

let db = ctx
.this()
.downcast_or_throw::<JsBox<Database>, _>(&mut ctx)?;
.downcast_or_throw::<SharedDatabase, _>(&mut ctx)?;
let db = db.borrow();

let callback_on_data = Arc::new(Mutex::new(callback_on_data));
let conn = db.arc_clone();
db.send(move |channel| {
let iter = conn.iterator(utils::get_iteration_mode(&options, &mut vec![], false));
let iter =
conn.unwrap()
.iterator(utils::get_iteration_mode(&options, &mut vec![], false));
for (counter, key_val) in iter.enumerate() {
if utils::is_key_out_of_range(
&options,
Expand Down Expand Up @@ -264,7 +269,8 @@ impl Database {

let db = ctx
.this()
.downcast_or_throw::<JsBox<Database>, _>(&mut ctx)?;
.downcast_or_throw::<SharedDatabase, _>(&mut ctx)?;
let db = db.borrow();

db.checkpoint(path, callback)
.or_else(|err| ctx.throw_error(err.to_string()))?;
Expand Down
44 changes: 27 additions & 17 deletions src/database/db_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@ use neon::handle::{Handle, Root};
use neon::types::{Finalize, JsBuffer, JsFunction, JsValue};
use rocksdb::checkpoint::Checkpoint;

use crate::database::traits::NewDBWithContext;
use crate::database::types::{DbMessage, DbOptions, Kind};
use crate::database::traits::{NewDBWithContext, Unwrap};
use crate::database::types::{ArcOptionDB, DbMessage, DbOptions, Kind};

pub struct DB {
tx: mpsc::Sender<DbMessage>,
db_kind: Kind,
db: Arc<rocksdb::DB>,
db: ArcOptionDB,
}

impl Unwrap for ArcOptionDB {
fn unwrap(&self) -> &rocksdb::DB {
self.as_ref().as_ref().expect("The DB connection is None")
}
}

impl NewDBWithContext for DB {
Expand Down Expand Up @@ -58,17 +64,22 @@ impl NewDBWithContext for DB {

impl Finalize for DB {}
impl DB {
fn db(&self) -> &rocksdb::DB {
self.db.unwrap()
}

pub fn new(db: rocksdb::DB, tx: mpsc::Sender<DbMessage>, db_kind: Kind) -> Self {
Self {
tx,
db_kind,
db: Arc::new(db),
db: Arc::new(Some(db)),
}
}

// Idiomatic rust would take an owned `self` to prevent use after close
// However, it's not possible to prevent JavaScript from continuing to hold a closed database
pub fn close(&self) -> Result<(), mpsc::SendError<DbMessage>> {
pub fn close(&mut self) -> Result<(), mpsc::SendError<DbMessage>> {
self.db = Arc::new(None);
self.tx.send(DbMessage::Close)
}

Expand All @@ -85,7 +96,7 @@ impl DB {
callback: Root<JsFunction>,
) -> Result<(), mpsc::SendError<DbMessage>> {
let key = self.db_kind.key(key);
let result = self.db.get(&key);
let result = self.get(&key);
self.send(move |channel| {
channel.send(move |mut ctx| {
let callback = callback.into_inner(&mut ctx);
Expand All @@ -112,8 +123,8 @@ impl DB {
callback: Root<JsFunction>,
) -> Result<(), mpsc::SendError<DbMessage>> {
let key = self.db_kind.key(key);
let result = if self.db.key_may_exist(&key) {
self.db.get(&key).map(|res| res.is_some())
let result = if self.db().key_may_exist(&key) {
self.get(&key).map(|res| res.is_some())
} else {
Ok(false)
};
Expand Down Expand Up @@ -143,7 +154,7 @@ impl DB {
) -> Result<(), mpsc::SendError<DbMessage>> {
let conn = Arc::clone(&self.db);
self.send(move |channel| {
let result = Checkpoint::new(&conn);
let result = Checkpoint::new(conn.unwrap());

if result.is_err() {
let err = result.err().unwrap();
Expand Down Expand Up @@ -177,28 +188,28 @@ impl DB {
})
}

pub fn arc_clone(&self) -> Arc<rocksdb::DB> {
pub fn arc_clone(&self) -> ArcOptionDB {
Arc::clone(&self.db)
}

pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), rocksdb::Error> {
self.db.put(key, value)
self.db().put(key, value)
}

pub fn delete(&self, key: &[u8]) -> Result<(), rocksdb::Error> {
self.db.delete(key)
self.db().delete(key)
}

pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, rocksdb::Error> {
self.db.get(key)
self.db().get(key)
}

pub fn write(&self, batch: rocksdb::WriteBatch) -> Result<(), rocksdb::Error> {
self.db.write(batch)
self.db().write(batch)
}

pub fn path(&self) -> &std::path::Path {
self.db.path()
self.db().path()
}
}

Expand All @@ -207,9 +218,8 @@ mod tests {
use std::sync::mpsc;
use tempdir::TempDir;

use crate::types::KVPair;

use super::*;
use crate::types::KVPair;

fn temp_db() -> DB {
let temp_dir = TempDir::new("test_db").unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/database/reader_writer/reader_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use neon::handle::{Handle, Root};
use neon::result::JsResult;
use neon::types::{Finalize, JsBuffer, JsFunction, JsValue};

use crate::database::traits::Unwrap;
use crate::database::types::{JsBoxRef, Kind, SnapshotMessage};
use crate::state_db::SharedStateDB;

Expand Down Expand Up @@ -38,7 +39,7 @@ impl ReaderBase {
let db = db.borrow();
let conn = db.arc_clone();
thread::spawn(move || {
let snapshot = conn.snapshot();
let snapshot = conn.unwrap().snapshot();
while let Ok(message) = rx.recv() {
match message {
SnapshotMessage::Callback(f) => {
Expand Down
22 changes: 6 additions & 16 deletions src/database/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ use std::sync::{Arc, Mutex};
use neon::context::{Context, FunctionContext};
use neon::handle::Handle;
use neon::result::JsResult;
use neon::types::{Finalize, JsBox, JsNumber, JsString, JsValue};
use neon::types::{Finalize, JsNumber, JsString, JsValue};

use crate::database::types::{DbOptions, JsArcMutex, JsBoxRef, Kind};
use crate::types::{KVPair, KeyLength, VecOption};

pub trait Unwrap {
fn unwrap(&self) -> &rocksdb::DB;
}

pub trait Actions {
fn get(&self, key: &[u8]) -> Result<VecOption, rocksdb::Error>;
fn set(&mut self, pair: &KVPair) -> Result<(), rocksdb::Error>;
Expand Down Expand Up @@ -53,28 +57,14 @@ pub trait JsNewWithBoxRef {
let path = ctx.argument::<JsString>(0)?.value(&mut ctx);
let options = ctx.argument_opt(1);
let db_opts = T::new_with_context(&mut ctx, options)?;
let db = U::new_db_with_context(&mut ctx, path, db_opts, Kind::State)
let db = U::new_db_with_context(&mut ctx, path, db_opts, Kind::Normal)
.or_else(|err| ctx.throw_error(&err))?;
let ref_db = RefCell::new(db);

return Ok(ctx.boxed(ref_db));
}
}

pub trait JsNewWithBox {
fn js_new_with_box<T: OptionsWithContext, U: NewDBWithContext + Finalize + Send>(
mut ctx: FunctionContext,
) -> JsResult<JsBox<U>> {
let path = ctx.argument::<JsString>(0)?.value(&mut ctx);
let options = ctx.argument_opt(1);
let db_opts = T::new_with_context(&mut ctx, options)?;
let db = U::new_db_with_context(&mut ctx, path, db_opts, Kind::Normal)
.or_else(|err| ctx.throw_error(&err))?;

return Ok(ctx.boxed(db));
}
}

pub trait JsNewWithArcMutex {
fn js_new_with_arc_mutex<T: NewDBWithKeyLength + Send + Finalize + DatabaseKind>(
mut ctx: FunctionContext,
Expand Down
2 changes: 2 additions & 0 deletions src/database/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cell::RefCell;
use std::sync::Arc;

use neon::event::Channel;
use neon::types::JsBox;
Expand All @@ -12,6 +13,7 @@ pub type DbOptions = Options<KeyLength>;

pub type JsBoxRef<T> = JsBox<RefCell<T>>;
pub type JsArcMutex<T> = JsBoxRef<ArcMutex<T>>;
pub type ArcOptionDB = Arc<Option<rocksdb::DB>>;

/// Messages sent on the database channel
pub enum Message<T> {
Expand Down
7 changes: 5 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::database::db;
use crate::database::in_memory::in_memory_db;
use crate::database::reader_writer::read_writer_db;
use crate::database::reader_writer::reader_db;
use crate::database::traits::{JsNewWithArcMutex, JsNewWithBox, JsNewWithBoxRef};
use crate::database::traits::{JsNewWithArcMutex, JsNewWithBoxRef};
use crate::database::types::DbOptions;
use crate::sparse_merkle_tree::in_memory_smt;
use crate::state::state_db;
Expand Down Expand Up @@ -32,7 +32,10 @@ use state_writer::StateWriter;

#[neon::main]
fn main(mut cx: ModuleContext) -> NeonResult<()> {
cx.export_function("db_new", Database::js_new_with_box::<DbOptions, Database>)?;
cx.export_function(
"db_new",
Database::js_new_with_box_ref::<DbOptions, Database>,
)?;
cx.export_function("db_clear", Database::js_clear)?;
cx.export_function("db_close", Database::js_close)?;
cx.export_function("db_get", Database::js_get)?;
Expand Down
Loading

0 comments on commit bdde4df

Please sign in to comment.