Skip to content

Commit

Permalink
Merge pull request #3 from movementlabsxyz/musitdev/add_request_sleep…
Browse files Browse the repository at this point in the history
…_time

Add  sleep time between gRpc request.
  • Loading branch information
musitdev authored Oct 8, 2024
2 parents b5e05c1 + 1d1d7c7 commit 8e83cde
Show file tree
Hide file tree
Showing 27 changed files with 259 additions and 101 deletions.
6 changes: 3 additions & 3 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use url::Url;

pub const QUERY_DEFAULT_RETRIES: u32 = 5;
pub const QUERY_DEFAULT_RETRY_DELAY_MS: u64 = 500;
pub const DEFAULT_SLEEP_TIME_BETWENN_REQUEST_MS: u64 = 10;

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -55,9 +56,14 @@ pub struct IndexerGrpcProcessorConfig {
// String vector for deprecated tables to skip db writes
#[serde(default)]
pub deprecated_tables: HashSet<String>,
#[serde(default = "IndexerGrpcProcessorConfig::default_sleep_time_between_request")]
pub default_sleep_time_between_request: u64,
}

impl IndexerGrpcProcessorConfig {
pub const fn default_sleep_time_between_request() -> u64 {
DEFAULT_SLEEP_TIME_BETWENN_REQUEST_MS
}
pub const fn default_gap_detection_batch_size() -> u64 {
DEFAULT_GAP_DETECTION_BATCH_SIZE
}
Expand Down Expand Up @@ -103,6 +109,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
self.transaction_filter.clone(),
self.grpc_response_item_timeout_in_secs,
self.deprecated_tables.clone(),
self.default_sleep_time_between_request,
)
.await
.context("Failed to build worker")?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl AccountTransaction {
for event in events {
account_transactions.extend(Self::from_event(event, txn_version));
}
for wsc in wscs {
for wsc in wscs.iter().filter(|wsc| wsc.change.is_some()) {
match wsc.change.as_ref().unwrap() {
Change::DeleteResource(res) => {
account_transactions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ impl CoinActivity {
}

// Need coin info from move resources
for wsc in &transaction_info.changes {
for wsc in transaction_info
.changes
.iter()
.filter(|wsc| wsc.change.is_some())
{
let (maybe_coin_info, maybe_coin_balance_data) =
if let WriteSetChangeEnum::WriteResource(write_resource) =
&wsc.change.as_ref().unwrap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,8 @@ impl WriteSetChange {
block_timestamp: chrono::NaiveDateTime,
) -> anyhow::Result<Option<(Self, WriteSetChangeDetail)>> {
let change_type = Self::get_write_set_change_type(write_set_change);
let change = write_set_change
.change
.as_ref()
.expect("WriteSetChange must have a change");
match change {
WriteSetChangeEnum::WriteModule(inner) => Ok(Some((
match write_set_change.change.as_ref() {
Some(WriteSetChangeEnum::WriteModule(inner)) => Ok(Some((
Self {
txn_version,
state_key_hash: standardize_address(
Expand All @@ -86,7 +82,7 @@ impl WriteSetChange {
block_timestamp,
)),
))),
WriteSetChangeEnum::DeleteModule(inner) => Ok(Some((
Some(WriteSetChangeEnum::DeleteModule(inner)) => Ok(Some((
Self {
txn_version,
state_key_hash: standardize_address(
Expand All @@ -106,7 +102,7 @@ impl WriteSetChange {
block_timestamp,
)),
))),
WriteSetChangeEnum::WriteResource(inner) => {
Some(WriteSetChangeEnum::WriteResource(inner)) => {
let resource_option = MoveResource::from_write_resource(
inner,
write_set_change_index,
Expand Down Expand Up @@ -138,7 +134,7 @@ impl WriteSetChange {
))
})
},
WriteSetChangeEnum::DeleteResource(inner) => {
Some(WriteSetChangeEnum::DeleteResource(inner)) => {
let resource_option = MoveResource::from_delete_resource(
inner,
write_set_change_index,
Expand Down Expand Up @@ -170,7 +166,7 @@ impl WriteSetChange {
))
})
},
WriteSetChangeEnum::WriteTableItem(inner) => {
Some(WriteSetChangeEnum::WriteTableItem(inner)) => {
let (ti, cti) = TableItem::from_write_table_item(
inner,
write_set_change_index,
Expand All @@ -197,7 +193,7 @@ impl WriteSetChange {
),
)))
},
WriteSetChangeEnum::DeleteTableItem(inner) => {
Some(WriteSetChangeEnum::DeleteTableItem(inner)) => {
let (ti, cti) = TableItem::from_delete_table_item(
inner,
write_set_change_index,
Expand All @@ -220,6 +216,7 @@ impl WriteSetChange {
WriteSetChangeDetail::Table(ti, cti, None),
)))
},
None => Ok(None),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,11 @@ impl WriteSetChange {
index: i64,
transaction_version: i64,
transaction_block_height: i64,
) -> (Self, WriteSetChangeDetail) {
) -> Option<(Self, WriteSetChangeDetail)> {
let type_ = Self::get_write_set_change_type(write_set_change);
let change = write_set_change
.change
.as_ref()
.expect("WriteSetChange must have a change");

match change {
WriteSetChangeEnum::WriteModule(inner) => (
match write_set_change.change.as_ref() {
Some(WriteSetChangeEnum::WriteModule(inner)) => Some((
Self {
transaction_version,
hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()),
Expand All @@ -64,8 +60,8 @@ impl WriteSetChange {
transaction_version,
transaction_block_height,
)),
),
WriteSetChangeEnum::DeleteModule(inner) => (
)),
Some(WriteSetChangeEnum::DeleteModule(inner)) => Some((
Self {
transaction_version,
hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()),
Expand All @@ -80,8 +76,8 @@ impl WriteSetChange {
transaction_version,
transaction_block_height,
)),
),
WriteSetChangeEnum::WriteResource(inner) => (
)),
Some(WriteSetChangeEnum::WriteResource(inner)) => Some((
Self {
transaction_version,
hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()),
Expand All @@ -96,8 +92,8 @@ impl WriteSetChange {
transaction_version,
transaction_block_height,
)),
),
WriteSetChangeEnum::DeleteResource(inner) => (
)),
Some(WriteSetChangeEnum::DeleteResource(inner)) => Some((
Self {
transaction_version,
hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()),
Expand All @@ -112,15 +108,15 @@ impl WriteSetChange {
transaction_version,
transaction_block_height,
)),
),
WriteSetChangeEnum::WriteTableItem(inner) => {
)),
Some(WriteSetChangeEnum::WriteTableItem(inner)) => {
let (ti, cti) = TableItem::from_write_table_item(
inner,
index,
transaction_version,
transaction_block_height,
);
(
Some((
Self {
transaction_version,
hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()),
Expand All @@ -134,16 +130,16 @@ impl WriteSetChange {
cti,
Some(TableMetadata::from_write_table_item(inner)),
),
)
))
},
WriteSetChangeEnum::DeleteTableItem(inner) => {
Some(WriteSetChangeEnum::DeleteTableItem(inner)) => {
let (ti, cti) = TableItem::from_delete_table_item(
inner,
index,
transaction_version,
transaction_block_height,
);
(
Some((
Self {
transaction_version,
hash: standardize_address_from_bytes(inner.state_key_hash.as_slice()),
Expand All @@ -153,8 +149,9 @@ impl WriteSetChange {
index,
},
WriteSetChangeDetail::Table(ti, cti, None),
)
))
},
None => None,
}
}

Expand All @@ -166,7 +163,7 @@ impl WriteSetChange {
write_set_changes
.iter()
.enumerate()
.map(|(index, write_set_change)| {
.filter_map(|(index, write_set_change)| {
Self::from_write_set_change(
write_set_change,
index as i64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,21 @@ impl CurrentDelegatedVoter {
) -> anyhow::Result<CurrentDelegatedVoterMap> {
let mut delegated_voter_map: CurrentDelegatedVoterMap = AHashMap::new();

let table_item_data = write_table_item.data.as_ref().unwrap();
let table_handle = standardize_address(&write_table_item.handle);
if let Some(VoteDelegationTableItem::VoteDelegationVector(vote_delegation_vector)) =
VoteDelegationTableItem::from_table_item_type(
table_item_data.value_type.as_str(),
&table_item_data.value,
txn_version,
)?
{
let pool_address = match vote_delegation_handle_to_pool_address.get(&table_handle) {
Some(pool_address) => pool_address.clone(),
None => {
// look up from db
Self::get_delegation_pool_address_by_table_handle(conn, &table_handle, query_retries, query_retry_delay_ms).await
if write_table_item.data.is_some() {
let table_item_data = write_table_item.data.as_ref().unwrap();
let table_handle = standardize_address(&write_table_item.handle);
if let Some(VoteDelegationTableItem::VoteDelegationVector(vote_delegation_vector)) =
VoteDelegationTableItem::from_table_item_type(
table_item_data.value_type.as_str(),
&table_item_data.value,
txn_version,
)?
{
let pool_address = match vote_delegation_handle_to_pool_address.get(&table_handle) {
Some(pool_address) => pool_address.clone(),
None => {
// look up from db
Self::get_delegation_pool_address_by_table_handle(conn, &table_handle, query_retries, query_retry_delay_ms).await
.unwrap_or_else(|_| {
tracing::error!(
transaction_version = txn_version,
Expand All @@ -103,28 +104,30 @@ impl CurrentDelegatedVoter {
);
"".to_string()
})
},
};
if !pool_address.is_empty() {
for inner in vote_delegation_vector {
let delegator_address = inner.get_delegator_address();
let voter = inner.value.get_voter();
let pending_voter = inner.value.get_pending_voter();
},
};
if !pool_address.is_empty() {
for inner in vote_delegation_vector {
let delegator_address = inner.get_delegator_address();
let voter = inner.value.get_voter();
let pending_voter = inner.value.get_pending_voter();

let delegated_voter = CurrentDelegatedVoter {
delegator_address: delegator_address.clone(),
delegation_pool_address: pool_address.clone(),
voter: Some(voter.clone()),
pending_voter: Some(pending_voter.clone()),
last_transaction_timestamp: txn_timestamp,
last_transaction_version: txn_version,
table_handle: Some(table_handle.clone()),
};
delegated_voter_map
.insert((pool_address.clone(), delegator_address), delegated_voter);
let delegated_voter = CurrentDelegatedVoter {
delegator_address: delegator_address.clone(),
delegation_pool_address: pool_address.clone(),
voter: Some(voter.clone()),
pending_voter: Some(pending_voter.clone()),
last_transaction_timestamp: txn_timestamp,
last_transaction_version: txn_version,
table_handle: Some(table_handle.clone()),
};
delegated_voter_map
.insert((pool_address.clone(), delegator_address), delegated_voter);
}
}
}
}

Ok(delegated_voter_map)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ impl CurrentDelegatorBalance {

let changes = &transaction.info.as_ref().unwrap().changes;
// Do a first pass to get the mapping of active_share table handles to staking pool resource let txn_version = transaction.version as i64;
for wsc in changes {
for wsc in changes.iter().filter(|wsc| wsc.change.is_some()) {
if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() {
if let Some(map) =
Self::get_inactive_pool_to_staking_pool_mapping(write_resource, txn_version)
Expand All @@ -428,7 +428,11 @@ impl CurrentDelegatorBalance {
}
}
// Now make a pass through table items to get the actual delegator balances
for (index, wsc) in changes.iter().enumerate() {
for (index, wsc) in changes
.iter()
.filter(|wsc| wsc.change.is_some())
.enumerate()
{
let maybe_delegator_balance = match wsc.change.as_ref().unwrap() {
Change::DeleteTableItem(table_item) => {
if let Some((balance, current_balance)) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl DelegatorPool {
.as_ref()
.expect("Transaction info doesn't exist!")
.changes;
for wsc in changes {
for wsc in changes.into_iter().filter(|wsc| wsc.change.is_some()) {
if let Change::WriteResource(write_resource) = wsc.change.as_ref().unwrap() {
let maybe_write_resource =
Self::from_write_resource(write_resource, txn_version)?;
Expand Down Expand Up @@ -174,6 +174,9 @@ impl DelegatorPool {
write_table_item: &WriteTableItem,
txn_version: i64,
) -> anyhow::Result<Option<PoolBalanceMetadata>> {
if write_table_item.data.is_some() {
return Ok(None);
}
let table_item_data = write_table_item.data.as_ref().unwrap();

if let Some(StakeTableItem::Pool(inner)) = &StakeTableItem::from_table_item_type(
Expand Down
Loading

0 comments on commit 8e83cde

Please sign in to comment.