forked from althea-net/cosmos-gravity-bridge
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathbatch_relaying.rs
339 lines (309 loc) · 13.4 KB
/
batch_relaying.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
use std::{collections::HashMap, sync::Arc, time::Duration};
use cosmos_gravity::query::{get_latest_transaction_batches, get_transaction_batch_signatures};
use ethereum_gravity::{
message_signatures::encode_tx_batch_confirm_hashed, submit_batch::send_eth_transaction_batch,
utils::get_tx_batch_nonce,
};
use futures::stream::{self, StreamExt};
use gravity_proto::gravity::query_client::QueryClient as GravityQueryClient;
use gravity_utils::{
clarity::{address::Address as EthAddress, PrivateKey as EthPrivateKey, Uint256},
num_conversion::{print_eth, print_gwei},
prices::get_weth_price,
types::{
BatchConfirmResponse, BatchRelayingMode, RelayerConfig, TransactionBatch, Valset,
WhitelistToken,
},
web30::client::Web3,
};
use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
use tonic::transport::Channel;
#[derive(Debug, Clone)]
struct SubmittableBatch {
batch: TransactionBatch,
sigs: Vec<BatchConfirmResponse>,
}
#[allow(clippy::too_many_arguments)]
/// This function relays batches from Cosmos to Ethereum. First we request
/// the latest transaction batches, which is a list of the latest 100 batches
/// of all types. From there we determine which batches are valid to submit as
/// far as signatures and then make requests to Ethereum to determine which are
/// valid to submit given the current chain state. From there we simulate a submission
/// and if that succeeds and we like the gas cost we complete the relaying process and
/// actually submit the data to Ethereum
pub async fn relay_batches(
// the validator set currently in the contract on Ethereum
current_valset: &Valset,
ethereum_key: EthPrivateKey,
web3: &Web3,
grpc_client: &mut GravityQueryClient<Channel>,
gravity_contract_address: EthAddress,
gravity_id: String,
timeout: Duration,
config: &RelayerConfig,
) {
let possible_batches =
get_batches_and_signatures(current_valset, grpc_client, gravity_id.clone()).await;
trace!("possible batches {:?}", possible_batches);
submit_batches(
current_valset,
ethereum_key,
web3,
gravity_contract_address,
gravity_id,
timeout,
possible_batches,
config,
)
.await;
}
/// This function retrieves the latest batches from the Cosmos module and then
/// iterates through the signatures for each batch, determining if they are ready
/// to submit. It is possible for a batch to not have valid signatures for two reasons
/// one is that not enough signatures have been collected yet from the validators two is
/// that the batch is old enough that the signatures do not reflect the current validator
/// set on Ethereum. In both the later and the former case the correct solution is to wait
/// through timeouts, new signatures, or a later valid batch being submitted old batches will
/// always be resolved.
async fn get_batches_and_signatures(
current_valset: &Valset,
grpc_client: &mut GravityQueryClient<Channel>,
gravity_id: String,
) -> HashMap<EthAddress, Vec<SubmittableBatch>> {
let latest_batches = if let Ok(lb) = get_latest_transaction_batches(grpc_client).await {
lb
} else {
return HashMap::new();
};
trace!("Latest batches {:?}", latest_batches);
let mut possible_batches = HashMap::new();
for batch in latest_batches {
let signatures =
get_transaction_batch_signatures(grpc_client, batch.nonce, batch.token_contract).await;
trace!("Got signatures {:?}", signatures);
if let Ok(sigs) = signatures {
// this checks that the signatures for the batch are actually possible to submit to the chain
let hash = encode_tx_batch_confirm_hashed(gravity_id.clone(), &batch);
if current_valset.order_sigs(&hash, &sigs).is_ok() {
// we've found a valid batch, add it to the list for it's token type
possible_batches
.entry(batch.token_contract)
.or_insert_with(Vec::new);
let list = possible_batches.get_mut(&batch.token_contract).unwrap();
list.push(SubmittableBatch { batch, sigs });
} else {
warn!(
"Batch {}/{} can not be submitted yet, waiting for more signatures",
batch.token_contract, batch.nonce
);
}
} else {
error!(
"could not get signatures for {}:{} with {:?}",
batch.token_contract, batch.nonce, signatures
);
}
}
// reverse the list so that it is oldest first, we want to submit
// older batches so that we don't invalidate newer batches
possible_batches.par_iter_mut().for_each(|(_, value)| {
value.reverse();
});
possible_batches
}
// Determines whether or not submitting `batch` will be profitable given the estimated `cost`
// and the current exchange rate available on uniswap
async fn should_relay_batch(
web3: &Web3,
batch: &TransactionBatch,
cost: Uint256,
pubkey: EthAddress,
config: &BatchRelayingMode,
) -> bool {
// skip price request below in the trivial case, couldn't really
// figure the code duplication / extra network IO balance otherwise
if let BatchRelayingMode::EveryBatch = config {
return true;
}
let batch_reward_amount = batch.total_fee.amount;
let batch_reward_token = batch.total_fee.token_contract_address;
let price = get_weth_price(batch_reward_token, batch_reward_amount, pubkey, web3).await;
match config {
BatchRelayingMode::EveryBatch => true,
BatchRelayingMode::ProfitableOnly { margin } => {
let cost_with_margin = get_cost_with_margin(cost, *margin);
// we need to see how much WETH we can get for the reward token amount,
// and compare that value to the gas cost times the margin
match price {
Ok(price) => price > cost_with_margin,
Err(e) => {
info!(
"Unable to determine swap price of token {} for WETH \n
it may just not be on Uniswap - Will not be relaying batch {:?}",
batch_reward_token, e
);
false
}
}
}
BatchRelayingMode::ProfitableWithWhitelist { margin, whitelist } => {
let cost_with_margin = get_cost_with_margin(cost, *margin);
// we need to see how much WETH we can get for the reward token amount,
// and compare that value to the gas cost times the margin
match (price, get_whitelist_amount(batch.token_contract, whitelist)) {
(_, Some(amount)) => amount <= batch.total_fee.amount,
(Ok(price), None) => price > cost_with_margin,
(Err(e), None) => {
info!(
"Unable to determine swap price of token {} for WETH \n
it may just not be on Uniswap - Will not be relaying batch {:?}",
batch_reward_token, e
);
false
}
}
}
}
}
/// Takes a token price whitelist, gets the amount of tokens for the specified
/// ERC20, returns none if not whitelisted
fn get_whitelist_amount(erc20: EthAddress, whitelist: &[WhitelistToken]) -> Option<Uint256> {
for i in whitelist {
if i.token == erc20 {
return Some(i.amount);
}
}
None
}
/// bakes the margin into the cost to provide an easy value to compare against
pub fn get_cost_with_margin(cost: Uint256, margin: f64) -> Uint256 {
let cost_as_float: f64 = cost.to_string().parse().unwrap();
let cost_with_margin = cost_as_float * margin;
Uint256::from_u128(cost_with_margin as u128)
}
#[allow(clippy::too_many_arguments)]
/// Attempts to submit batches with valid signatures, checking the state
/// of the Ethereum chain to ensure that it is valid to submit a given batch
/// more specifically that the correctly signed batch has not timed out or already
/// been submitted. The goal of this function is to submit batches in chronological order
/// of their creation, submitting batches newest first will invalidate old batches and is
/// less efficient if those old batches are profitable.
/// This function estimates the cost of submitting a batch before actually submitting it
/// to Ethereum, if it is determined that the ETH cost to submit is too high the batch will
/// be skipped and a later, more profitable, batch may be submitted.
/// Keep in mind that many other relayers are making this same computation and some may have
/// different standards for their profit margin, therefore there may be a race not only to
/// submit individual batches but also batches in different orders
async fn submit_batches(
current_valset: &Valset,
ethereum_key: EthPrivateKey,
web3: &Web3,
gravity_contract_address: EthAddress,
gravity_id: String,
timeout: Duration,
possible_batches: HashMap<EthAddress, Vec<SubmittableBatch>>,
config: &RelayerConfig,
) {
let our_ethereum_address = ethereum_key.to_address();
let ethereum_block_height = if let Ok(bn) = web3.eth_block_number().await {
bn
} else {
warn!("Failed to get eth block height, is your eth node working?");
return;
};
let data_holder = Arc::new((ethereum_block_height, current_valset, gravity_id, config));
// requests data from Ethereum only once per token type, this is valid because we are
// iterating from oldest to newest, so submitting a batch earlier in the loop won't
// ever invalidate submitting a batch later in the loop. Another relayer could always
// do that though.
stream::iter(possible_batches)
.zip(stream::repeat(data_holder.clone()))
.for_each_concurrent(2, |((token_type, batches), data_holder)| async move {
let (ethereum_block_height, current_valset, gravity_id, config) = &*data_holder;
let erc20_contract = token_type;
let latest_ethereum_batch = get_tx_batch_nonce(
gravity_contract_address,
erc20_contract,
our_ethereum_address,
web3,
)
.await;
if latest_ethereum_batch.is_err() {
error!(
"Failed to get latest Ethereum batch with {:?}",
latest_ethereum_batch
);
return;
}
let latest_ethereum_batch = latest_ethereum_batch.unwrap();
for batch in batches {
let oldest_signed_batch = batch.batch;
let oldest_signatures = batch.sigs;
let timeout_height = Uint256::from_u64(oldest_signed_batch.batch_timeout);
if timeout_height < *ethereum_block_height {
warn!(
"Batch {}/{} has timed out and can not be submitted",
oldest_signed_batch.nonce, oldest_signed_batch.token_contract
);
continue;
}
let latest_cosmos_batch_nonce = oldest_signed_batch.clone().nonce;
if latest_cosmos_batch_nonce > latest_ethereum_batch {
let cost = ethereum_gravity::submit_batch::estimate_tx_batch_cost(
current_valset,
oldest_signed_batch.clone(),
&oldest_signatures,
web3,
gravity_contract_address,
gravity_id.clone(),
ethereum_key,
)
.await;
if cost.is_err() {
error!("Batch cost estimate failed with {:?}", cost);
continue;
}
let cost = cost.unwrap();
info!(
"We have detected a batch to relay. This batch is estimated to cost {} Gas @ {} gwei / {:.4} ETH to submit",
cost.gas,
print_gwei(cost.gas_price),
print_eth(cost.get_total())
);
oldest_signed_batch
.display_with_eth_info(our_ethereum_address, web3)
.await;
let should_relay = should_relay_batch(
web3,
&oldest_signed_batch,
cost.get_total(),
our_ethereum_address,
&config.batch_relaying_mode,
)
.await;
if should_relay {
let res = send_eth_transaction_batch(
current_valset,
oldest_signed_batch,
&oldest_signatures,
web3,
timeout,
gravity_contract_address,
gravity_id.clone(),
ethereum_key,
)
.await;
if res.is_err() {
info!("Batch submission failed with {:?}", res);
}
} else {
info!(
"Not relaying batch {}/{} due to it not being profitable",
oldest_signed_batch.token_contract, oldest_signed_batch.nonce
);
}
}
}
})
.await;
}