Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orchestrator Async runtime improvements #274

Merged
merged 28 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6e88e25
working on erc20.rs successful
hannydevelop Oct 27, 2021
c7fffc3
erc20 is ready
hannydevelop Oct 29, 2021
87996fc
orchestrator/src/main_loop.rs is still a work in progress
hannydevelop Oct 29, 2021
bf0ed0a
main_loop ready
hannydevelop Oct 29, 2021
b403dea
main_loop almost ready
hannydevelop Nov 1, 2021
71d4acb
fixed 2021 errors on docker
hannydevelop Nov 1, 2021
3ef27eb
happy_path_v2 ready for review
hannydevelop Nov 3, 2021
763fd16
main_loop, resolving changes
hannydevelop Nov 3, 2021
b5faf34
happy_path.rs ready for review
hannydevelop Nov 3, 2021
9347dde
happy_path.rs is ready
hannydevelop Nov 3, 2021
cab25b2
removing instant, replacing with delay_for in erc20.rs
hannydevelop Nov 7, 2021
fc227ac
fixed indentation, removed stale duration
hannydevelop Nov 7, 2021
14ddc5c
capturing erc20 in info!
hannydevelop Nov 7, 2021
4d7cc8f
removed stale imports, removed early break, adding loop
hannydevelop Nov 7, 2021
17d6d54
removing early break
hannydevelop Nov 7, 2021
81c152c
writing better error message for panic in happy_path v2.rs
hannydevelop Nov 7, 2021
165f569
fixed previous merged PR not captured because of conflicts
hannydevelop Nov 7, 2021
acf20ea
fixed usage of tokio::time::sleep and delay_for in main_loop
hannydevelop Nov 9, 2021
d2dd8d2
worked on implementing delay_for uniformly in happypath.rs
hannydevelop Nov 9, 2021
b79a083
fixed typo in happy_path.rs
hannydevelop Nov 9, 2021
9b958a3
capturing erc20 in success message
hannydevelop Nov 9, 2021
268d995
adding a break for if balance.denom.contains and balance.amount match…
hannydevelop Nov 9, 2021
a6cb54a
adding break to avoid looping till timeout
hannydevelop Nov 9, 2021
1a15f8e
merging main
hannydevelop Nov 9, 2021
df1c1a3
fixing sleep and break in loop
hannydevelop Nov 9, 2021
ab2bca6
just ran cargo fmt
hannydevelop Nov 16, 2021
19c9eee
ran git merge main
hannydevelop Nov 16, 2021
d5f6ebc
fixed conflicting files, ran git merge main again
hannydevelop Nov 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions orchestrator/gorc/src/commands/deploy/erc20.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,30 +85,33 @@ impl Erc20 {

println!("We have deployed ERC20 contract {:#066x}, waiting to see if the Cosmos chain choses to adopt it", res);

let start = Instant::now();
loop {
let req = DenomToErc20Request {
denom: denom.clone(),
};

let res = grpc.denom_to_erc20(req).await;

if let Ok(val) = res {
let val = val.into_inner();
match tokio::time::timeout(std::time::Duration::from_secs(100), async {
hannydevelop marked this conversation as resolved.
Show resolved Hide resolved
loop {
let req = DenomToErc20Request {
denom: denom.clone(),
};

let res = grpc.denom_to_erc20(req).await;

if let Ok(val) = res {
break val;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}).await
{
Ok(val) => {
println!(
"Asset {} has accepted new ERC20 representation {}",
denom, val.erc20
denom,
val.into_inner().erc20
);
exit(0);
}

if Instant::now() - start > Duration::from_secs(100) {
println!(
"Your ERC20 contract was not adopted, double check the metadata and try again"
);
},
Err(_) => {
println!("Your ERC20 contract was not adopted, double check the metadata and try again");
exit(1);
}
delay_for(Duration::from_secs(1)).await;
hannydevelop marked this conversation as resolved.
Show resolved Hide resolved
},
}
}
}
158 changes: 69 additions & 89 deletions orchestrator/orchestrator/src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,82 +134,72 @@ pub async fn eth_oracle_main_loop(
let mut grpc_client = grpc_client;

loop {
let loop_start = Instant::now();

let latest_eth_block = web3.eth_block_number().await;
let latest_cosmos_block = contact.get_chain_status().await;
match (latest_eth_block, latest_cosmos_block) {
(Ok(latest_eth_block), Ok(ChainStatus::Moving { block_height })) => {
metrics::set_cosmos_block_height(block_height.clone());
metrics::set_ethereum_block_height(latest_eth_block.clone());
trace!(
"Latest Eth block {} Latest Cosmos block {}",
latest_eth_block,
block_height,
);
}
(Ok(_latest_eth_block), Ok(ChainStatus::Syncing)) => {
warn!("Cosmos node syncing, Eth oracle paused");
delay_for(DELAY).await;
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old version of this loop, we could fail faster (in this case only waiting DELAY instead of ETH_ORACLE_LOOP_SPEED) and try from the beginning of the loop again. In the tokio join! model we'll always wait the full ETH_ORACLE_LOOP_SPEED before starting again. I know that the task joining model was suggested by the auditor, and it is cleaner, but probably performs worse in cases where we'd rather restart the loop. Based on the comments at the bottom around the elapsed time calculation, we might care more about performance than wall clock consistency.

Copy link
Contributor Author

@hannydevelop hannydevelop Nov 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cbrit, the problem here isn't sleep, rather it's join. With join, we have to wait the full ETH_ORACLE_LOOP_SPEED before starting again. However, the previous code had continue.

Copy link
Member

@cbrit cbrit Nov 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hannydevelop I know. I am suggesting that interval might be a better solution than join because it allows us to continue the loop while still making sure the loop only runs every ETH_ORACLE_LOOP_SPEED seconds if the match is a successful case.

The auditor said

using Instant::now() leads to non-exact timeouts and loop_speed because the checks are always if the elapsed time is greater or lesser than a fixed Duration"

but this is solved by interval.

As far as performance, I am not sure how it differs from join, or how much it really matters.

Copy link
Contributor Author

@hannydevelop hannydevelop Nov 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, I'll rather use timeout and sleep or return back to using Instant::now. Let's discuss this on Monday, I can try to contact the auditor for the logic behind using join in main_loop

}
(Ok(_latest_eth_block), Ok(ChainStatus::WaitingToStart)) => {
warn!("Cosmos node syncing waiting for chain start, Eth oracle paused");
delay_for(DELAY).await;
continue;
}
(Ok(_), Err(_)) => {
metrics::COSMOS_UNAVAILABLE.inc();
warn!("Could not contact Cosmos grpc, trying again");
delay_for(DELAY).await;
continue;
}
(Err(_), Ok(_)) => {
metrics::ETHEREUM_UNAVAILABLE.inc();
warn!("Could not contact Eth node, trying again");
delay_for(DELAY).await;
continue;
}
(Err(_), Err(_)) => {
metrics::COSMOS_UNAVAILABLE.inc();
metrics::ETHEREUM_UNAVAILABLE.inc();
error!("Could not reach Ethereum or Cosmos rpc!");
delay_for(DELAY).await;
continue;
}
}

// Relays events from Ethereum -> Cosmos
match check_for_events(
&web3,
&contact,
&mut grpc_client,
gravity_contract_address,
cosmos_key,
last_checked_block.clone(),
msg_sender.clone(),
)
.await
{
Ok(new_block) => last_checked_block = new_block,
Err(e) => {
metrics::ETHEREUM_EVENT_CHECK_FAILURES.inc();
error!("Failed to get events for block range, Check your Eth node and Cosmos gRPC {:?}", e);
if let gravity_utils::error::GravityError::CosmosGrpcError(err) = e {
if let CosmosGrpcError::TransactionFailed { tx: _, time: _ } = err {
delay_for(Duration::from_secs(10)).await;
let (async_resp, _) = tokio::join!(
async {
let latest_eth_block = web3.eth_block_number().await;
let latest_cosmos_block = contact.get_chain_status().await;
match (latest_eth_block, latest_cosmos_block) {
(Ok(latest_eth_block), Ok(ChainStatus::Moving { block_height })) => {
metrics::set_cosmos_block_height(block_height.clone());
metrics::set_ethereum_block_height(latest_eth_block.clone());
trace!(
"Latest Eth block {} Latest Cosmos block {}",
latest_eth_block,
block_height,
);
}
(Ok(_latest_eth_block), Ok(ChainStatus::Syncing)) => {
warn!("Cosmos node syncing, Eth oracle paused");
delay_for(DELAY).await;
}
(Ok(_latest_eth_block), Ok(ChainStatus::WaitingToStart)) => {
warn!("Cosmos node syncing waiting for chain start, Eth oracle paused");
delay_for(DELAY).await;
}
(Ok(_), Err(_)) => {
metrics::COSMOS_UNAVAILABLE.inc();
warn!("Could not contact Cosmos grpc, trying again");
delay_for(DELAY).await;
}
(Err(_), Ok(_)) => {
metrics::ETHEREUM_UNAVAILABLE.inc();
warn!("Could not contact Eth node, trying again");
delay_for(DELAY).await;
}
(Err(_), Err(_)) => {
metrics::COSMOS_UNAVAILABLE.inc();
metrics::ETHEREUM_UNAVAILABLE.inc();
error!("Could not reach Ethereum or Cosmos rpc!");
delay_for(DELAY).await;
}
}
}
}

// a bit of logic that tires to keep things running every LOOP_SPEED seconds exactly
// this is not required for any specific reason. In fact we expect and plan for
// the timing being off significantly
let elapsed = Instant::now() - loop_start;
if elapsed < ETH_ORACLE_LOOP_SPEED {
delay_for(ETH_ORACLE_LOOP_SPEED - elapsed).await;
}
// Relays events from Ethereum -> Cosmos
match check_for_events(
&web3,
&contact,
&mut grpc_client,
gravity_contract_address,
cosmos_key,
last_checked_block.clone(),
msg_sender.clone(),
)
.await
{
Ok(new_block) => last_checked_block = new_block,
Err(e) => {
metrics::ETHEREUM_EVENT_CHECK_FAILURES.inc();
error!("Failed to get events for block range, Check your Eth node and Cosmos gRPC {:?}", e);
if let gravity_utils::error::GravityError::CosmosGrpcError(err) = e {
if let CosmosGrpcError::TransactionFailed { tx: _, time: _ } = err {
delay_for(Duration::from_secs(10)).await;
}
}
}
}
},
tokio::time::sleep(std::time::Duration::from_secs(13))
);
}
}

Expand Down Expand Up @@ -237,9 +227,9 @@ pub async fn eth_signer_main_loop(
let gravity_id = gravity_id.unwrap();

loop {
let loop_start = Instant::now();

let latest_eth_block = web3.eth_block_number().await;
let (async_resp, _) = tokio::join!(
async {
let latest_eth_block = web3.eth_block_number().await;
let latest_cosmos_block = contact.get_chain_status().await;
match (latest_eth_block, latest_cosmos_block) {
(Ok(latest_eth_block), Ok(ChainStatus::Moving { block_height })) => {
Expand All @@ -254,31 +244,26 @@ pub async fn eth_signer_main_loop(
(Ok(_latest_eth_block), Ok(ChainStatus::Syncing)) => {
warn!("Cosmos node syncing, Eth signer paused");
delay_for(DELAY).await;
continue;
}
(Ok(_latest_eth_block), Ok(ChainStatus::WaitingToStart)) => {
warn!("Cosmos node syncing waiting for chain start, Eth signer paused");
delay_for(DELAY).await;
continue;
}
(Ok(_), Err(_)) => {
metrics::COSMOS_UNAVAILABLE.inc();
warn!("Could not contact Cosmos grpc, trying again");
delay_for(DELAY).await;
continue;
}
(Err(_), Ok(_)) => {
metrics::ETHEREUM_UNAVAILABLE.inc();
warn!("Could not contact Eth node, trying again");
delay_for(DELAY).await;
continue;
}
(Err(_), Err(_)) => {
metrics::COSMOS_UNAVAILABLE.inc();
metrics::ETHEREUM_UNAVAILABLE.inc();
error!("Could not reach Ethereum or Cosmos rpc!");
delay_for(DELAY).await;
continue;
}
}

Expand Down Expand Up @@ -377,14 +362,9 @@ pub async fn eth_signer_main_loop(
e
);
}

// a bit of logic that tires to keep things running every LOOP_SPEED seconds exactly
// this is not required for any specific reason. In fact we expect and plan for
// the timing being off significantly
let elapsed = Instant::now() - loop_start;
if elapsed < ETH_SIGNER_LOOP_SPEED {
delay_for(ETH_SIGNER_LOOP_SPEED - elapsed).await;
}
},
tokio::time::sleep(std::time::Duration::from_secs(11))
hannydevelop marked this conversation as resolved.
Show resolved Hide resolved
);
}
}

Expand Down