Skip to content

Commit

Permalink
add a option for starting db migration
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Nov 19, 2024
1 parent 7b1bf71 commit af24faa
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 64 deletions.
28 changes: 20 additions & 8 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ struct Args {
/// config for ckb
#[command(flatten)]
pub ckb: <CkbConfig as ClapSerde>::Opt,

/// option to run database migration
#[arg(
short = 'm',
long = "migrate",
help = "run database migration, default: false"
)]
pub migrate: bool,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -119,7 +127,7 @@ pub(crate) fn print_help_and_exit(code: i32) {
}

impl Config {
pub fn parse() -> Self {
pub fn parse() -> (Self, bool) {
// Parse whole args with clap
let mut args = Args::parse();

Expand Down Expand Up @@ -193,12 +201,16 @@ impl Config {
let cch = services.contains(&Service::CCH).then_some(cch);
let rpc = services.contains(&Service::RPC).then_some(rpc);
let ckb = services.contains(&Service::CkbChain).then_some(ckb);
Self {
fiber,
cch,
rpc,
ckb,
base_dir,
}

(
Self {
fiber,
cch,
rpc,
ckb,
base_dir,
},
args.migrate,
)
}
}
2 changes: 1 addition & 1 deletion src/fiber/tests/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct MockNetworkGraph {
impl MockNetworkGraph {
pub fn new(node_num: usize) -> Self {
let temp_path = tempfile::tempdir().unwrap();
let store = Store::new(temp_path.path());
let store = Store::new(temp_path.path()).expect("create store failed");
let keypairs = generate_key_pairs(node_num + 1);
let (secret_key1, public_key1) = keypairs[0];
let mut graph = NetworkGraph::new(store, public_key1.into());
Expand Down
2 changes: 1 addition & 1 deletion src/fiber/tests/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ fn test_history_eval_probability_range() {
fn test_history_load_store() {
let dir = tempdir().unwrap();
let path = dir.path().join("test_history_load_store");
let store = Store::new(path);
let store = Store::new(path).expect("created store failed");
let mut history = PaymentHistory::new(generate_pubkey().into(), None, store.clone());
let from = generate_pubkey();
let target = generate_pubkey();
Expand Down
22 changes: 12 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,22 @@ pub async fn main() -> Result<(), ExitMessage> {

let _span = info_span!("node", node = fnn::get_node_prefix()).entered();

let config = Config::parse();
debug!("Parsed config: {:?}", &config);
let (config, run_migrate) = Config::parse();

let store_path = config
.fiber
.as_ref()
.ok_or_else(|| ExitMessage("fiber config is required but absent".to_string()))?
.store_path();
if run_migrate {
Store::run_migrate(store_path).map_err(|err| ExitMessage(err.to_string()))?;
return Ok(());
}
let store = Store::new(store_path).map_err(|err| ExitMessage(err.to_string()))?;

let tracker = new_tokio_task_tracker();
let token = new_tokio_cancellation_token();
let root_actor = RootActor::start(tracker, token).await;

let store = Store::new(
config
.fiber
.as_ref()
.ok_or_else(|| ExitMessage("fiber config is required but absent".to_string()))?
.store_path(),
);
let subscribers = ChannelSubscribers::default();

let (fiber_command_sender, network_graph) = match config.fiber.clone() {
Expand Down
2 changes: 1 addition & 1 deletion src/store/db_migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl DbMigrate {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
let mut migrations = Migrations::default();
migrations.add_migration(Arc::new(DefaultMigration::new()));

// add more migrations here
let db = Arc::new(DB::open_default(path).expect("Failed to open rocksdb"));

DbMigrate { migrations, db }
Expand Down
1 change: 1 addition & 0 deletions src/store/migrations/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
// following new migration should be added here ...
// pub(crate) mod sample;
4 changes: 2 additions & 2 deletions src/store/migrations/sample.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{migration::migration::Migration, Error};
use crate::{store::migration::Migration, Error};
use indicatif::ProgressBar;
use rocksdb::DB;
use std::sync::Arc;

const INIT_DB_VERSION: &str = "20351116135521";
const INIT_DB_VERSION: &str = "20311116135521";

pub struct SampleMigration {
version: String,
Expand Down
74 changes: 40 additions & 34 deletions src/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ pub struct Store {
}

impl Store {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
let db = Self::open_or_create_db(path).expect("Failed to open rocksdb");
Self { db }
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, String> {
let db = Self::open_or_create_db(path, false)?;
Ok(Self { db })
}

pub fn run_migrate<P: AsRef<Path>>(path: P) -> Result<(), String> {
Self::open_or_create_db(path, true).map(|_| ())
}

fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<Vec<u8>> {
Expand Down Expand Up @@ -80,7 +84,7 @@ impl Store {
}

/// Open or create a rocksdb
fn open_or_create_db<P: AsRef<Path>>(path: P) -> Result<Arc<DB>, String> {
fn open_or_create_db<P: AsRef<Path>>(path: P, run_migrate: bool) -> Result<Arc<DB>, String> {
let migrate = DbMigrate::new(path.as_ref());
if !migrate.need_init() {
match migrate.check() {
Expand All @@ -97,50 +101,52 @@ impl Store {
return Ok(migrate.db());
}
Ordering::Less => {
let path_buf = path.as_ref().to_path_buf();
let input = prompt(format!("\
\n\
Fiber need to run some database migrations.\n\
\n\
Once the migration started, the data will be no longer compatible with all older version,\n\
so we strongly recommended you to backup the old data {} before migrating.\n\
\n\
If the migration failed, try to delete all data and sync from scratch.\n\
\nIf you want to migrate the data, please input YES, otherwise, the current process will exit.\n\
> ", path_buf.display()).as_str());

if input.trim().to_lowercase() != "yes" {
error!("Migration was declined since the user didn't confirm.");
return Err("need to migrate".to_string());
if !run_migrate {
return Err("Fiber need to run some database migrations, please run `fnn` with option `--migrate` to start migrations.".to_string());
} else {
let path_buf = path.as_ref().to_path_buf();
let input = Self::prompt(format!("\
Once the migration started, the data will be no longer compatible with all older version,\n\
so we strongly recommended you to backup the old data {} before migrating.\n\
\n\
\nIf you want to migrate the data, please input YES, otherwise, the current process will exit.\n\
> ", path_buf.display()).as_str());

if input.trim().to_lowercase() != "yes" {
error!("Migration was declined since the user didn't confirm.");
return Err("need to run database migration".to_string());
}
eprintln!("begin to migrate db ...");
let db = migrate.migrate().expect("failed to migrate db");
eprintln!(
"db migrated successfully, now your can restart the fiber node ..."
);
Ok(db)
}
info!("now begin to migrate db ...");
let db = migrate.migrate().expect("failed to migrate db");
info!("db migrated successfully ...");
Ok(db)
}
}
} else {
info!("now begin to init db version ...");
info!("begin to init db version ...");
migrate
.init_db_version()
.expect("failed to init db version");
Ok(migrate.db())
}
}
}

fn prompt(msg: &str) -> String {
let stdout = stdout();
let mut stdout = stdout.lock();
let stdin = stdin();
fn prompt(msg: &str) -> String {
let stdout = stdout();
let mut stdout = stdout.lock();
let stdin = stdin();

write!(stdout, "{msg}").unwrap();
stdout.flush().unwrap();
write!(stdout, "{msg}").unwrap();
stdout.flush().unwrap();

let mut input = String::new();
let _ = stdin.read_line(&mut input);
let mut input = String::new();
let _ = stdin.read_line(&mut input);

input
input
}
}

pub struct Batch {
Expand Down
14 changes: 7 additions & 7 deletions src/store/tests/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn mock_channel() -> ChannelInfo {
fn test_store_invoice() {
let dir = tempdir().unwrap();
let path = dir.path().join("invoice_store");
let store = Store::new(path);
let store = Store::new(path).expect("created store failed");

let preimage = gen_sha256_hash();
let invoice = InvoiceBuilder::new(Currency::Fibb)
Expand Down Expand Up @@ -102,7 +102,7 @@ fn test_store_invoice() {
fn test_store_channels() {
let dir = tempdir().unwrap();
let path = dir.path().join("invoice_store");
let store = Store::new(path);
let store = Store::new(path).expect("created store failed");

let mut channels = vec![];
for _ in 0..10 {
Expand Down Expand Up @@ -136,7 +136,7 @@ fn test_store_channels() {
fn test_store_nodes() {
let dir = tempdir().unwrap();
let path = dir.path().join("invoice_store");
let store = Store::new(path);
let store = Store::new(path).expect("created store failed");

let mut nodes = vec![];
for _ in 0..10 {
Expand Down Expand Up @@ -180,7 +180,7 @@ fn test_compact_signature() {
fn test_store_wacthtower() {
let dir = tempdir().unwrap();
let path = dir.path().join("watchtower_store");
let store = Store::new(path);
let store = Store::new(path).expect("created store failed");

let channel_id = gen_sha256_hash();
let funding_tx_lock = Script::default();
Expand Down Expand Up @@ -313,7 +313,7 @@ fn test_channel_actor_state_store() {
let bincode_encoded = bincode::serialize(&state).unwrap();
let _new_state: ChannelActorState = bincode::deserialize(&bincode_encoded).unwrap();

let store = Store::new(tempdir().unwrap().path().join("store"));
let store = Store::new(tempdir().unwrap().path().join("store")).expect("create store failed");
assert!(store.get_channel_actor_state(&state.id).is_none());
store.insert_channel_actor_state(state.clone());
assert!(store.get_channel_actor_state(&state.id).is_some());
Expand All @@ -325,7 +325,7 @@ fn test_channel_actor_state_store() {
fn test_store_payment_session() {
let dir = tempdir().unwrap();
let path = dir.path().join("payment_history_store");
let store = Store::new(path);
let store = Store::new(path).expect("created store failed");
let payment_hash = gen_sha256_hash();
let payment_data = SendPaymentData {
target_pubkey: gen_rand_public_key(),
Expand Down Expand Up @@ -353,7 +353,7 @@ fn test_store_payment_session() {
fn test_store_payment_history() {
let dir = tempdir().unwrap();
let path = dir.path().join("payment_history_store");
let mut store = Store::new(path);
let mut store = Store::new(path).expect("created store failed");

let pubkey = gen_rand_public_key();
let target = gen_rand_public_key();
Expand Down

0 comments on commit af24faa

Please sign in to comment.