diff --git a/resource/ckb.toml b/resource/ckb.toml index 6468fc5c98e..72d74a913f5 100644 --- a/resource/ckb.toml +++ b/resource/ckb.toml @@ -207,6 +207,6 @@ block_uncles_cache_size = 30 # db_type = "postgres" # db_name = "ckb-indexer-r" # db_host = "127.0.0.1" -# db_port = 8532 +# db_port = 5432 # db_user = "postgres" -# password = "123456" +# db_password = "123456" diff --git a/util/indexer-r/resources/create_postgres_table.sql b/util/indexer-r/resources/create_postgres_table.sql new file mode 100644 index 00000000000..77b895aeb1b --- /dev/null +++ b/util/indexer-r/resources/create_postgres_table.sql @@ -0,0 +1,83 @@ +CREATE TABLE block( + id SERIAL PRIMARY KEY, + block_hash BYTEA UNIQUE NOT NULL, + block_number BIGINT NOT NULL, + compact_target INTEGER NOT NULL, + parent_hash BYTEA NOT NULL, + nonce BYTEA NOT NULL, + timestamp BIGINT NOT NULL, + version INTEGER NOT NULL, + transactions_root BYTEA NOT NULL, + epoch_number INTEGER NOT NULL, + epoch_index SMALLINT NOT NULL, + epoch_length SMALLINT NOT NULL, + dao BYTEA NOT NULL, + proposals_hash BYTEA, + extra_hash BYTEA +); + +CREATE TABLE block_association_proposal( + id SERIAL PRIMARY KEY, + block_hash BYTEA NOT NULL, + proposal BYTEA NOT NULL +); + +CREATE TABLE block_association_uncle( + id SERIAL PRIMARY KEY, + block_hash BYTEA NOT NULL, + uncle_hash BYTEA NOT NULL +); + +CREATE TABLE ckb_transaction( + id SERIAL PRIMARY KEY, + tx_hash BYTEA UNIQUE NOT NULL, + version INTEGER NOT NULL, + input_count SMALLINT NOT NULL, + output_count SMALLINT NOT NULL, + witnesses BYTEA, + block_hash BYTEA NOT NULL, + tx_index INTEGER NOT NULL +); + +CREATE TABLE tx_association_header_dep( + id SERIAL PRIMARY KEY, + tx_hash BYTEA NOT NULL, + block_hash BYTEA NOT NULL +); + +CREATE TABLE tx_association_cell_dep( + id SERIAL PRIMARY KEY, + tx_hash BYTEA NOT NULL, + out_point BYTEA NOT NULL, + dep_type SMALLINT NOT NULL +); + +CREATE TABLE output( + id SERIAL PRIMARY KEY, + out_point BYTEA UNIQUE NOT NULL, + capacity BIGINT NOT NULL, + data BYTEA, + tx_hash BYTEA NOT NULL, + output_index INTEGER NOT NULL +); + +CREATE TABLE input( + out_point BYTEA PRIMARY KEY, + since BYTEA NOT NULL, + tx_hash BYTEA NOT NULL, + input_index INTEGER NOT NULL +); + +CREATE TABLE script( + id SERIAL PRIMARY KEY, + script_hash BYTEA UNIQUE NOT NULL, + script_code_hash BYTEA, + script_args BYTEA, + script_type SMALLINT +); + +CREATE TABLE output_association_script( + id SERIAL PRIMARY KEY, + out_point BYTEA NOT NULL, + script_hash BYTEA NOT NULL +); \ No newline at end of file diff --git a/util/indexer-r/src/indexer_handle/async_indexer_handle.rs b/util/indexer-r/src/indexer_handle/async_indexer_handle.rs index fcb1882bdf6..0cb0f2d5ca6 100644 --- a/util/indexer-r/src/indexer_handle/async_indexer_handle.rs +++ b/util/indexer-r/src/indexer_handle/async_indexer_handle.rs @@ -35,7 +35,7 @@ impl AsyncIndexerRHandle { .await .map(|res| { res.map(|row| IndexerTip { - block_number: (row.get::("block_number") as u64).into(), + block_number: (row.get::("block_number") as u64).into(), block_hash: bytes_to_h256(row.get("block_hash")), }) }) diff --git a/util/indexer-r/src/store/mod.rs b/util/indexer-r/src/store/mod.rs index 76434bd4991..0cc20d37963 100644 --- a/util/indexer-r/src/store/mod.rs +++ b/util/indexer-r/src/store/mod.rs @@ -21,6 +21,7 @@ use std::{fmt::Debug, sync::Arc, time::Duration}; const MEMORY_DB: &str = ":memory:"; const SQL_CREATE_SQLITE: &str = include_str!("../../resources/create_sqlite_table.sql"); +const SQL_CREATE_POSTGRES: &str = include_str!("../../resources/create_postgres_table.sql"); #[derive(Clone)] pub struct SQLXPool { @@ -95,11 +96,18 @@ impl SQLXPool { Ok(()) } DBDriver::Postgres => { + let require_init = self.is_postgres_require_init(db_config).await?; let uri = build_url_for_posgres(db_config); let mut connection_options = AnyConnectOptions::from_str(&uri)?; connection_options.log_statements(LevelFilter::Trace); let pool = pool_options.connect_with(connection_options).await?; - self.pool.set(pool).map_err(|_| anyhow!("set pool failed")) + self.pool + .set(pool) + .map_err(|_| anyhow!("set pool failed"))?; + if require_init { + self.create_tables_for_postgres().await?; + } + Ok(()) } } } @@ -227,6 +235,44 @@ impl SQLXPool { sqlx::query(SQL_CREATE_SQLITE).execute(&mut *tx).await?; tx.commit().await.map_err(Into::into) } + + async fn create_tables_for_postgres(&mut self) -> Result<()> { + let mut tx = self.transaction().await?; + let commands = SQL_CREATE_POSTGRES.split(';'); + for command in commands { + if !command.trim().is_empty() { + sqlx::query(command).execute(&mut *tx).await?; + } + } + tx.commit().await.map_err(Into::into) + } + + pub async fn is_postgres_require_init(&mut self, db_config: &IndexerRConfig) -> Result { + // Connect to the "postgres" database first + let mut temp_config = db_config.clone(); + temp_config.db_name = "postgres".to_string(); + let uri = build_url_for_posgres(&temp_config); + log::info!("postgres uri: {}", uri); + let mut connection_options = AnyConnectOptions::from_str(&uri)?; + connection_options.log_statements(LevelFilter::Trace); + let tmp_pool_options = AnyPoolOptions::new(); + let pool = tmp_pool_options.connect_with(connection_options).await?; + + // Check if database exists + let query = + SQLXPool::new_query(r#"SELECT EXISTS (SELECT FROM pg_database WHERE datname = $1)"#) + .bind(db_config.db_name.as_str()); + let row = query.fetch_one(&pool).await?; + + // If database does not exist, create it + if !row.get::(0) { + let query = format!(r#"CREATE DATABASE "{}""#, db_config.db_name); + SQLXPool::new_query(&query).execute(&pool).await?; + Ok(true) + } else { + Ok(false) + } + } } pub(crate) fn fetch_count_sql(table_name: &str) -> String {