Skip to content

Commit

Permalink
add postgres init.
Browse files Browse the repository at this point in the history
  • Loading branch information
EthanYuan committed Nov 16, 2023
1 parent addfbd6 commit a1a2fe6
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 4 deletions.
4 changes: 2 additions & 2 deletions resource/ckb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,6 @@ block_uncles_cache_size = 30
# db_type = "postgres"
# db_name = "ckb-indexer-r"
# db_host = "127.0.0.1"
# db_port = 8532
# db_port = 5432
# db_user = "postgres"
# password = "123456"
# db_password = "123456"
83 changes: 83 additions & 0 deletions util/indexer-r/resources/create_postgres_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
CREATE TABLE block(
id SERIAL PRIMARY KEY,
block_hash BYTEA UNIQUE NOT NULL,
block_number BIGINT NOT NULL,
compact_target INTEGER NOT NULL,
parent_hash BYTEA NOT NULL,
nonce BYTEA NOT NULL,
timestamp BIGINT NOT NULL,
version INTEGER NOT NULL,
transactions_root BYTEA NOT NULL,
epoch_number INTEGER NOT NULL,
epoch_index SMALLINT NOT NULL,
epoch_length SMALLINT NOT NULL,
dao BYTEA NOT NULL,
proposals_hash BYTEA,
extra_hash BYTEA
);

CREATE TABLE block_association_proposal(
id SERIAL PRIMARY KEY,
block_hash BYTEA NOT NULL,
proposal BYTEA NOT NULL
);

CREATE TABLE block_association_uncle(
id SERIAL PRIMARY KEY,
block_hash BYTEA NOT NULL,
uncle_hash BYTEA NOT NULL
);

CREATE TABLE ckb_transaction(
id SERIAL PRIMARY KEY,
tx_hash BYTEA UNIQUE NOT NULL,
version INTEGER NOT NULL,
input_count SMALLINT NOT NULL,
output_count SMALLINT NOT NULL,
witnesses BYTEA,
block_hash BYTEA NOT NULL,
tx_index INTEGER NOT NULL
);

CREATE TABLE tx_association_header_dep(
id SERIAL PRIMARY KEY,
tx_hash BYTEA NOT NULL,
block_hash BYTEA NOT NULL
);

CREATE TABLE tx_association_cell_dep(
id SERIAL PRIMARY KEY,
tx_hash BYTEA NOT NULL,
out_point BYTEA NOT NULL,
dep_type SMALLINT NOT NULL
);

CREATE TABLE output(
id SERIAL PRIMARY KEY,
out_point BYTEA UNIQUE NOT NULL,
capacity BIGINT NOT NULL,
data BYTEA,
tx_hash BYTEA NOT NULL,
output_index INTEGER NOT NULL
);

CREATE TABLE input(
out_point BYTEA PRIMARY KEY,
since BYTEA NOT NULL,
tx_hash BYTEA NOT NULL,
input_index INTEGER NOT NULL
);

CREATE TABLE script(
id SERIAL PRIMARY KEY,
script_hash BYTEA UNIQUE NOT NULL,
script_code_hash BYTEA,
script_args BYTEA,
script_type SMALLINT
);

CREATE TABLE output_association_script(
id SERIAL PRIMARY KEY,
out_point BYTEA NOT NULL,
script_hash BYTEA NOT NULL
);
2 changes: 1 addition & 1 deletion util/indexer-r/src/indexer_handle/async_indexer_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl AsyncIndexerRHandle {
.await
.map(|res| {
res.map(|row| IndexerTip {
block_number: (row.get::<i32, _>("block_number") as u64).into(),
block_number: (row.get::<i64, _>("block_number") as u64).into(),
block_hash: bytes_to_h256(row.get("block_hash")),
})
})
Expand Down
48 changes: 47 additions & 1 deletion util/indexer-r/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{fmt::Debug, sync::Arc, time::Duration};

const MEMORY_DB: &str = ":memory:";
const SQL_CREATE_SQLITE: &str = include_str!("../../resources/create_sqlite_table.sql");
const SQL_CREATE_POSTGRES: &str = include_str!("../../resources/create_postgres_table.sql");

#[derive(Clone)]
pub struct SQLXPool {
Expand Down Expand Up @@ -95,11 +96,18 @@ impl SQLXPool {
Ok(())
}
DBDriver::Postgres => {
let require_init = self.is_postgres_require_init(db_config).await?;
let uri = build_url_for_posgres(db_config);
let mut connection_options = AnyConnectOptions::from_str(&uri)?;
connection_options.log_statements(LevelFilter::Trace);
let pool = pool_options.connect_with(connection_options).await?;
self.pool.set(pool).map_err(|_| anyhow!("set pool failed"))
self.pool
.set(pool)
.map_err(|_| anyhow!("set pool failed"))?;
if require_init {
self.create_tables_for_postgres().await?;
}
Ok(())
}
}
}
Expand Down Expand Up @@ -227,6 +235,44 @@ impl SQLXPool {
sqlx::query(SQL_CREATE_SQLITE).execute(&mut *tx).await?;
tx.commit().await.map_err(Into::into)
}

async fn create_tables_for_postgres(&mut self) -> Result<()> {
let mut tx = self.transaction().await?;
let commands = SQL_CREATE_POSTGRES.split(';');
for command in commands {
if !command.trim().is_empty() {
sqlx::query(command).execute(&mut *tx).await?;
}
}
tx.commit().await.map_err(Into::into)
}

pub async fn is_postgres_require_init(&mut self, db_config: &IndexerRConfig) -> Result<bool> {
// Connect to the "postgres" database first
let mut temp_config = db_config.clone();
temp_config.db_name = "postgres".to_string();
let uri = build_url_for_posgres(&temp_config);
log::info!("postgres uri: {}", uri);
let mut connection_options = AnyConnectOptions::from_str(&uri)?;
connection_options.log_statements(LevelFilter::Trace);
let tmp_pool_options = AnyPoolOptions::new();
let pool = tmp_pool_options.connect_with(connection_options).await?;

// Check if database exists
let query =
SQLXPool::new_query(r#"SELECT EXISTS (SELECT FROM pg_database WHERE datname = $1)"#)
.bind(db_config.db_name.as_str());
let row = query.fetch_one(&pool).await?;

// If database does not exist, create it
if !row.get::<bool, _>(0) {
let query = format!(r#"CREATE DATABASE "{}""#, db_config.db_name);
SQLXPool::new_query(&query).execute(&pool).await?;
Ok(true)
} else {
Ok(false)
}
}
}

pub(crate) fn fetch_count_sql(table_name: &str) -> String {
Expand Down

0 comments on commit a1a2fe6

Please sign in to comment.