Skip to content

Commit

Permalink
Call flush during end of the FuelService (#1456)
Browse files Browse the repository at this point in the history
Closes FuelLabs/fuel-core#1428

Call the `flush` function during the end of the `FuelService` to move
the processing of WAL and SST fiels from the node's launching to the
node's shutdown.

Added additional logs to track the performance of the starting/shutdown
process.
  • Loading branch information
crypto523 committed Oct 27, 2023
1 parent 2f32129 commit 3edda64
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 7 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ Description of the upcoming release here.

### Added
- [#1457](https://github.com/FuelLabs/fuel-core/pull/1457): Fixing incorrect measurement for fast(µs) opcodes.
- [#1449](https://github.com/FuelLabs/fuel-core/pull/1449): fix coin pagination in e2e test client
- [#1456](https://github.com/FuelLabs/fuel-core/pull/1456): Added flushing of the RocksDB during a graceful shutdown.
- [#1456](https://github.com/FuelLabs/fuel-core/pull/1456): Added more logs to track the service lifecycle.
- [#1449](https://github.com/FuelLabs/fuel-core/pull/1449): Fix coin pagination in e2e test client.
- [#1447](https://github.com/FuelLabs/fuel-core/pull/1447): Add timeout for continuous e2e tests
- [#1444](https://github.com/FuelLabs/fuel-core/pull/1444): Add "sanity" benchmarks for memory opcodes.
- [#1437](https://github.com/FuelLabs/fuel-core/pull/1437): Add some transaction throughput tests for basic transfers.
Expand Down
4 changes: 4 additions & 0 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ impl Database {
pub fn transaction(&self) -> DatabaseTransaction {
self.into()
}

pub fn flush(self) -> DatabaseResult<()> {
self.data.flush()
}
}

/// Mutable methods.
Expand Down
10 changes: 9 additions & 1 deletion crates/fuel-core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ impl FuelService {
#[tracing::instrument(skip_all, fields(name = %config.name))]
pub fn new(database: Database, config: Config) -> anyhow::Result<Self> {
let config = config.make_config_consistent();
database.init(&config.chain_conf)?;
let task = Task::new(database, config)?;
let runner = ServiceRunner::new(task);
let shared = runner.shared.clone();
Expand All @@ -93,6 +92,11 @@ impl FuelService {
);
Database::default()
} else {
tracing::info!(
"Opening database {:?} with cache size \"{}\"",
config.database_path,
config.max_database_cache_size
);
Database::open(&config.database_path, config.max_database_cache_size)?
}
}
Expand Down Expand Up @@ -182,9 +186,12 @@ impl Task {
/// Private inner method for initializing the fuel service task
pub fn new(database: Database, config: Config) -> anyhow::Result<Task> {
// initialize state
tracing::info!("Initializing database");
database.init(&config.chain_conf)?;
genesis::maybe_initialize_state(&config, &database)?;

// initialize sub services
tracing::info!("Initializing sub services");
let (services, shared) = sub_services::init_sub_services(&config, &database)?;
Ok(Task { services, shared })
}
Expand Down Expand Up @@ -251,6 +258,7 @@ impl RunnableTask for Task {
);
}
}
self.shared.database.flush()?;
Ok(())
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/fuel-core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ pub enum WriteOperation {
Remove,
}

pub trait TransactableStorage: BatchOperations + Debug + Send + Sync {}
pub trait TransactableStorage: BatchOperations + Debug + Send + Sync {
fn flush(&self) -> DatabaseResult<()>;
}

pub mod in_memory;
#[cfg(feature = "rocksdb")]
Expand Down
9 changes: 8 additions & 1 deletion crates/fuel-core/src/state/in_memory/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,14 @@ impl KeyValueStore for MemoryStore {

impl BatchOperations for MemoryStore {}

impl TransactableStorage for MemoryStore {}
impl TransactableStorage for MemoryStore {
fn flush(&self) -> DatabaseResult<()> {
for lock in self.inner.iter() {
lock.lock().expect("poisoned").clear();
}
Ok(())
}
}

#[cfg(test)]
mod tests {
Expand Down
10 changes: 9 additions & 1 deletion crates/fuel-core/src/state/in_memory/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,15 @@ impl KeyValueStore for MemoryTransactionView {

impl BatchOperations for MemoryTransactionView {}

impl TransactableStorage for MemoryTransactionView {}
impl TransactableStorage for MemoryTransactionView {
fn flush(&self) -> DatabaseResult<()> {
for lock in self.changes.iter() {
lock.lock().expect("poisoned lock").clear();
}
self.view_layer.flush()?;
self.data_source.flush()
}
}

#[cfg(test)]
mod tests {
Expand Down
14 changes: 13 additions & 1 deletion crates/fuel-core/src/state/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ impl RocksDb {
opts.set_row_cache(&cache);
}

DB::repair(&opts, &path).map_err(|e| DatabaseError::Other(e.into()))?;

let db = match DB::open_cf_descriptors(&opts, &path, cf_descriptors) {
Err(_) => {
// setup cfs
Expand Down Expand Up @@ -390,7 +392,17 @@ impl BatchOperations for RocksDb {
}
}

impl TransactableStorage for RocksDb {}
impl TransactableStorage for RocksDb {
fn flush(&self) -> DatabaseResult<()> {
self.db
.flush_wal(true)
.map_err(|e| anyhow::anyhow!("Unable to flush WAL file: {}", e))?;
self.db
.flush()
.map_err(|e| anyhow::anyhow!("Unable to flush SST files: {}", e))?;
Ok(())
}
}

#[cfg(test)]
mod tests {
Expand Down
4 changes: 4 additions & 0 deletions crates/services/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ where
}
});

tracing::info!("The service {} is shut down", S::NAME);

if let State::StoppedWithError(err) = stopped_state {
std::panic::resume_unwind(Box::new(err));
}
Expand Down Expand Up @@ -325,6 +327,7 @@ async fn run<S>(
}

// We can panic here, because it is inside of the task.
tracing::info!("Starting {} service", S::NAME);
let mut task = service
.into_task(&state, params)
.await
Expand Down Expand Up @@ -375,6 +378,7 @@ async fn run<S>(
}
}

tracing::info!("Shutting down {} service", S::NAME);
let shutdown = std::panic::AssertUnwindSafe(task.shutdown());
match shutdown.catch_unwind().await {
Ok(Ok(_)) => {}
Expand Down
1 change: 0 additions & 1 deletion crates/services/sync/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ where
}

async fn shutdown(self) -> anyhow::Result<()> {
tracing::info!("Sync task shutting down");
self.import_task_handle.stop_and_await().await?;
Ok(())
}
Expand Down

0 comments on commit 3edda64

Please sign in to comment.