Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orchestrator Async runtime improvements #350

Open
andrey-kuprianov opened this issue Oct 11, 2021 · 0 comments · Fixed by onomyprotocol/arc#19
Open

Orchestrator Async runtime improvements #350

andrey-kuprianov opened this issue Oct 11, 2021 · 0 comments · Fixed by onomyprotocol/arc#19

Comments

@andrey-kuprianov
Copy link

Surfaced from @informalsystems audit of Althea Gravity Bridge at commit 19a4cfe

severity: Informational
type: Restructuring proposal
difficulty: Intermediate

Involved artifacts

Description

  • Consider removing all Instant::now() and using tokio::time::*. Offloading these low level details to Future on Tokio runtime may increase performance. Futures are optimized to be polled efficiently. Also using Instant::now() leads to non-exact timeouts and loop_speed because the checks are always if the elapsed time is greater or lesser than a fixed Duration.

Here is an example on cargo +nightly bench

#![feature(test)]

extern crate test;

const WAIT_MICROS : u64 = 200;
const TIMEOUT_MICROS : u64 = 500;

async fn wait_for_two_micros() {
    for _ in 0..3 {
        tokio::time::sleep(std::time::Duration::from_micros(WAIT_MICROS)).await;
    }
}

pub async fn timeout_for_five_micros_tokio() -> usize {
    match tokio::time::timeout(std::time::Duration::from_micros(TIMEOUT_MICROS), async {
        loop {
            wait_for_two_micros().await
        }
    })
    .await
    {
        Ok(_) => 0,
        Err(_) => 1,
    }
}

pub async fn timeout_for_five_micros_normal() -> usize {
    let timeout = std::time::Duration::from_micros(TIMEOUT_MICROS);
    let start = tokio::time::Instant::now();
    loop {
        wait_for_two_micros().await;
        if tokio::time::Instant::now() - start > timeout {
            break 0;
        }
    }
}

pub fn run_async<T>(f: impl std::future::Future<Output = T>) -> T {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(f)
}

#[cfg(test)]
mod tests {
    use super::{run_async, timeout_for_five_micros_normal, timeout_for_five_micros_tokio};
    use test::{black_box, Bencher};

    #[bench]
    fn bench_tokio(b: &mut Bencher) {
        b.iter(|| black_box(run_async(timeout_for_five_micros_tokio())));
    }

    #[bench]
    fn bench_normal(b: &mut Bencher) {
        b.iter(|| black_box(run_async(timeout_for_five_micros_normal())));
    }
}

It produces the following on my machine.

running 2 tests
test tests::bench_normal ... bench:   3,530,973 ns/iter (+/- 18,828)
test tests::bench_tokio  ... bench:   1,348,445 ns/iter (+/- 16,900)

The tokio::time::timeout version is almost three times faster than the loop version.

Affected files

For example, orchestrator/cosmos_gravity/src/utils.rs#L11-L24

pub async fn wait_for_cosmos_online(contact: &Contact, timeout: Duration) {
    let start = Instant::now();
    while let Err(CosmosGrpcError::NodeNotSynced) | Err(CosmosGrpcError::ChainNotRunning) =
        contact.wait_for_next_block(timeout).await
    {
        sleep(Duration::from_secs(1)).await;
        if Instant::now() - start > timeout {
            panic!("Cosmos node has not come online during timeout!")
        }
    }
    contact.wait_for_next_block(timeout).await.unwrap();
    contact.wait_for_next_block(timeout).await.unwrap();
    contact.wait_for_next_block(timeout).await.unwrap();
}

<< SIDENOTE >>
There are 3 timeout calls at the end.
Does this mean wait_for_cosmos_online will take 4 * timeout to terminate? Is this an expected behavior?

The above code may be transformed to,

pub async fn wait_for_cosmos_online(contact: &Contact, timeout: Duration) -> Result<(), GrpcError> {
    match tokio::time::timeout(timeout, contact.wait_for_next_block(timeout)).await {
        Ok(Err(CosmosGrpcError::NodeNotSynced) | Err(CosmosGrpcError::ChainNotRunning)) => panic!("Cosmos node has not come online during timeout!"),
        Err(_) => debug!("timedout")
    }
    for _ in 0..3 {
        contact.wait_for_next_block(timeout).await?;
    }
}
  • orchestrator/cosmos_gravity/src/utils.rs#L27-L43

    pub async fn get_last_event_nonce_with_retry(
        client: &mut GravityQueryClient<Channel>,
        our_cosmos_address: CosmosAddress,
        prefix: String,
    ) -> u64 {
        let mut res =
            get_last_event_nonce_for_validator(client, our_cosmos_address, prefix.clone()).await;
        while res.is_err() {
            error!(
                "Failed to get last event nonce, is the Cosmos GRPC working? {:?}",
                res
            );
            sleep(RETRY_TIME).await;
            res = get_last_event_nonce_for_validator(client, our_cosmos_address, prefix.clone()).await;
        }
        res.unwrap()
    }

    may be transformed to,

    pub async fn get_last_event_nonce_with_retry(
        client: &mut GravityQueryClient<Channel>,
        our_cosmos_address: CosmosAddress,
        prefix: String,
    ) -> u64 {
        loop{
            match get_last_event_nonce_for_validator(client, our_cosmos_address, prefix.clone()).await {
                Ok(last_nonce) => {break last_none;},
                Err(res) = {
                    error!(
                        "Failed to get last event nonce, is the Cosmos GRPC working? {:?}",
                        res
                    );
                    sleep(RETRY_TIME).await;
                }
            }
        }
    }
  • orchestrator/gbt/src/client/deploy_erc20_representation.rs#L74-L95

        let start = Instant::now();
        loop {
            let res = grpc
                .denom_to_erc20(QueryDenomToErc20Request {
                    denom: denom.clone(),
                })
                .await;
    
            if let Ok(val) = res {
                info!(
                    "Asset {} has accepted new ERC20 representation {}",
                    denom,
                    val.into_inner().erc20
                );
                exit(0);
            }
    
            if Instant::now() - start > Duration::from_secs(100) {
                info!("Your ERC20 contract was not adopted, double check the metadata and try again");
                exit(1);
            }
            delay_for(Duration::from_secs(1)).await;
        }

    may be transformed to,

        match tokio::time::timeout(std::time::Duration::from_secs(100), async {
            loop {
                let res = grpc
                    .denom_to_erc20(QueryDenomToErc20Request {
                        denom: denom.clone(),
                    })
                    .await;
    
                if let Ok(val) = res {
                    break val;
                }
                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
            }
        })
        .await
        {
            Ok(val) => {
                info!(
                    "Asset {} has accepted new ERC20 representation {}",
                    denom,
                    val.into_inner().erc20
                );
                exit(0);
            },
            Err(_) => {
                info!("Your ERC20 contract was not adopted, double check the metadata and try again");
                exit(1);
            },
        }
  • Consider refactoring main loops.
    Affected files

    • orchestrator/orchestrator/src/main_loop.rs

    • orchestrator/relayer/src/main_loop.rs

      loop {
              let loop_start = Instant::now();
              // ...
              let elapsed = Instant::now() - loop_start;
              if elapsed < ETH_SIGNER_LOOP_SPEED {
                  delay_for(ETH_SIGNER_LOOP_SPEED - elapsed).await;
              }
      }

      may be transformed to,

      loop {
          let (async_resp, _) = tokio::join!(
              async {
                  // ...
              },
              tokio::time::sleep(std::time::Duration::from_secs(ETH_SIGNER_LOOP_SPEED))
          );
      }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant