Skip to content

Commit

Permalink
Test publisher (#22)
Browse files Browse the repository at this point in the history
* opt logs

* aggregator param for events

* update near indexer

* Test deploy locale

* near indexer key

* opt near indexer

* check balance before do sth.

* opt command

* Revert "opt command"

This reverts commit 026bb0b.

---------

Co-authored-by: fewensa <[email protected]>
  • Loading branch information
jiguantong and fewensa authored Oct 30, 2024
1 parent f0ed27d commit 439dc6a
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 9 deletions.
9 changes: 9 additions & 0 deletions packages/common/src/near.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,13 @@ export class NearI {
});
}

public contractSigner(contractId: string): nearAPI.Contract {
return this.contract(contractId, {
viewMethods: [
'experimental_signature_deposit'
],
changeMethods: [],
useLocalViewExecution: false,
});
}
}
5 changes: 4 additions & 1 deletion packages/near-indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
"create": "graph create xapi-near --node https://api.thegraph.com/deploy/",
"create-local": "graph create xapi-near --node http://127.0.0.1:8020",
"deploy": "graph deploy --studio xapi-near",
"deploy-local": "graph deploy xapi-near --ipfs http://localhost:5001 --node http://127.0.0.1:8020"
"deploy-local": "graph deploy xapi-near --ipfs http://localhost:5001 --node http://127.0.0.1:8020",
"create:darwinia": "graph create --node https://thegraph-g2.darwinia.network/training/deploy/",
"remove:darwinia": "graph remove --node https://thegraph-g2.darwinia.network/training/deploy/",
"deploy:darwinia": "graph deploy --node https://thegraph-g2.darwinia.network/training/deploy/ --ipfs https://ipfs.network.thegraph.com"
},
"devDependencies": {
"@graphprotocol/graph-cli": "0.84.0",
Expand Down
14 changes: 12 additions & 2 deletions packages/near-indexer/src/mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ function parseResponse(eventData: TypedMap<string, JSONValue>, nanoId: string, r
response.started_at = BigInt.fromString(eventData.mustGet("started_at").toString());
response.updated_at = BigInt.fromString(eventData.mustGet("updated_at").toString());
response.status = eventData.mustGet("status").toString();
response.result = eventData.mustGet("result").toString();
const maybeResult = eventData.get("result");
if (maybeResult) {
response.result = eventData.mustGet("result").toString();
} else {
response.result = "";
}
response.chain_id = BigInt.fromString(eventData.mustGet("chain_id").toString());
response.aggregator = receipt.receiverId;
response.error_code = eventData.get("error_code") ? eventData.mustGet("error_code").toI64() as i32 : 0;
Expand All @@ -240,7 +245,12 @@ function parseAggregated(eventData: TypedMap<string, JSONValue>, nanoId: string,
aggregatedEvent.started_at = BigInt.fromString(eventData.mustGet("started_at").toString());
aggregatedEvent.updated_at = BigInt.fromString(eventData.mustGet("updated_at").toString());
aggregatedEvent.status = eventData.mustGet("status").toString();
aggregatedEvent.result = eventData.mustGet("result").toString();
const maybeResult = eventData.get("result");
if (maybeResult) {
aggregatedEvent.result = eventData.mustGet("result").toString();
} else {
aggregatedEvent.result = "";
}
aggregatedEvent.chain_id = BigInt.fromString(eventData.mustGet("chain_id").toString());
aggregatedEvent.aggregator = receipt.receiverId;
aggregatedEvent.error_code = eventData.get("error_code") ? eventData.mustGet("error_code").toI64() as i32 : 0;
Expand Down
1 change: 1 addition & 0 deletions packages/publisher-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"bs58check": "^4.0.0",
"cacache": "^19.0.1",
"commander": "^12.1.0",
"decimal.js": "^10.4.3",
"elliptic": "^6.5.7",
"ethers": "^6.13.3",
"hash.js": "^1.1.7",
Expand Down
71 changes: 68 additions & 3 deletions packages/publisher-client/src/command/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
MpcOptions, XAPIResponse, Signature, PublishChainConfig, RequestMade,
Aggregator,
} from "@ringdao/xapi-common";
import { Decimal } from 'decimal.js';
import { HelixChain, HelixChainConf } from "@helixbridge/helixconf";

import xapiAbi from "../abis/xapi.abi.json";
Expand All @@ -22,6 +23,8 @@ import { KeyPairString } from "near-api-js/lib/utils";

const homedir = require('os').homedir();

const MINIMUM_AGGREGATOR_NEAR = new Decimal(1);

export interface StartOptions {
nearAccount: string,
nearPrivateKey: KeyPairString,
Expand Down Expand Up @@ -94,6 +97,7 @@ export class PublisherStarter {
await setTimeout(60000);
continue;
}

for (const aggregator of allAggregators) {
for (const chainId of aggregator.supported_chains) {
const chain = HelixChain.get(chainId);
Expand All @@ -102,17 +106,25 @@ export class PublisherStarter {
logger.error(`Can't find chain: ${chainId}`, {
target: "start"
});
await setTimeout(1000);
continue;
}
const near = await this.near(options, chain);
const nearEthereum = this.getNearEthClient(chain);
const _lifecycle = { ...options, near, targetChain: chain, nearEthereum, aggregator: aggregator.id, cache: publisherCache };

logger.info("------------------------------------------------");
if (!await this.checkAggregatorBalance(aggregator.id, _lifecycle)) {
await setTimeout(1000);
break;
}

try {
logger.info("------------------------------------------------");
logger.info(`==== 📞 start config-syncer for ${aggregator.id} [${chain.name}-${chain.id.toString()}] ====`, {
target: "config-syncer",
});
await this.runConfigSyncer({ ...options, near, targetChain: chain, nearEthereum, aggregator: aggregator.id, cache: publisherCache });
await this.runConfigSyncer(_lifecycle);
await setTimeout(1000);
} catch (e: any) {
logger.error(`run config-syncer errored: ${e.stack || e}`, {
Expand All @@ -125,7 +137,7 @@ export class PublisherStarter {
logger.info(`==== 📦 start publisher for ${aggregator.id} [${chain.name}-${chain.id.toString()}] ====`, {
target: "publisher",
});
await this.runPublisher({ ...options, near, targetChain: chain, nearEthereum, aggregator: aggregator.id, cache: publisherCache });
await this.runPublisher(_lifecycle);
await setTimeout(1000);
} catch (e: any) {
logger.error(`run publisher errored: ${e.stack || e}`, {
Expand All @@ -151,11 +163,15 @@ export class PublisherStarter {
await this.nearGraphqlService.queryAggregatedeEvents({
endpoint: this._nearGraphqlEndpoint!,
ids: nonfulfilled.map((item) => item.requestId),
aggregator: lifecycle.aggregator
});
const toPublishIds = aggregatedEvents.map(a => a.request_id);
logger.info(`==> ${lifecycle.aggregator} [${targetChain.name}-${targetChain.id.toString()}] toPublishIds: [${toPublishIds.length}], ${toPublishIds}`, {
logger.info(`==> ${lifecycle.aggregator} [${targetChain.name}-${targetChain.id.toString()}] toPublishIds: [${toPublishIds.length}]`, {
target: "publisher",
});
logger.debug(`==> ${lifecycle.aggregator} [${targetChain.name}-${targetChain.id.toString()}] toPublishIds: ${toPublishIds}`, {
target: "publisher"
});
// 3. Check request status on xapi contract
for (const aggregated of aggregatedEvents) {
const relatedRequest = nonfulfilled.find(v => v.requestId == aggregated.request_id);
Expand Down Expand Up @@ -481,6 +497,55 @@ export class PublisherStarter {
return (await lifecycle.nearEthereum.deriveAddress(aggregator, `XAPI-${lifecycle.targetChain.id.toString()}`)).address;
}

async checkAggregatorBalance(aggregator: string, lifecycle: PublisherLifecycle): Promise<boolean> {
// @ts-ignore
const mpcConfig = await lifecycle.near.contractAggregator(aggregator).get_mpc_config();
logger.debug(`===> mpcConfig: ${JSON.stringify(mpcConfig)}`, {
target: "check-aggregator-balance",
});
if (!mpcConfig || !mpcConfig.mpc_contract) {
return false;
}

let signerDepositRequired = new Decimal(0);
try {
// @ts-ignore
const _signerDepositRequired = await lifecycle.near.contractSigner(mpcConfig.mpc_contract).experimental_signature_deposit();
logger.debug(`===> signerDepositRequired: ${_signerDepositRequired}`, {
target: "check-aggregator-balance",
});
if (_signerDepositRequired) {
signerDepositRequired = new Decimal(_signerDepositRequired).div(new Decimal('1000000000000000000000000'));
if (BigInt(_signerDepositRequired) > BigInt(mpcConfig.attached_balance)) {
logger.warn(`==== ❌ MpcConfig attached_balance of ${aggregator}: ${mpcConfig.attached_balance} < ${_signerDepositRequired}, break ====`, {
target: "check-aggregator-balance"
});
return false;
}
}
} catch (e) {
// @ts-ignore
logger.error(`==> Fetch signature_deposit error: ${e.message}`, {
target: "check-aggregator-balance"
});
}

const aggregatorAccount = new NearAccount(lifecycle.near.near.connection, aggregator);
const balance = await aggregatorAccount.getAccountBalance();
const availableBalance = new Decimal(balance.available).div(new Decimal('1000000000000000000000000'));
if (availableBalance.comparedTo(MINIMUM_AGGREGATOR_NEAR.add(new Decimal(signerDepositRequired))) < 0) {
logger.warn(`==== ❌ Avaliable Balance of ${aggregator}: ${availableBalance.toFixed(4)} NEAR < ${MINIMUM_AGGREGATOR_NEAR} NEAR + ${signerDepositRequired} NEAR, break ====`, {
target: "check-aggregator-balance"
});
return false;
} else {
logger.info(`==== ✅ Avaliable Balance of ${aggregator}: ${availableBalance.toFixed(4)} NEAR >= ${MINIMUM_AGGREGATOR_NEAR} NEAR + ${signerDepositRequired} NEAR ====`, {
target: "check-aggregator-balance"
});
return true;
}
}

getNearEthClient(chain: HelixChainConf): NearEthereum {
const cachedNearEthereum = this.nearEthereumMap[chain.id.toString()];
if (cachedNearEthereum) {
Expand Down
14 changes: 11 additions & 3 deletions packages/publisher-client/src/services/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ export interface QueryWithAggregator extends BasicGraphqlParams {
aggregator: string,
}

export interface QueryWithAggregatorIds extends BasicGraphqlParams {
aggregator: string,
ids: string[]
}

export interface QueryWithPublishChain extends QueryWithAggregator {
chainId: string
}
Expand Down Expand Up @@ -107,14 +112,16 @@ export class EvmGraphqlService extends AbstractGraphqlService {

@Service()
export class NearGraphqlService extends AbstractGraphqlService {
async queryAggregatedeEvents(params: QueryWithIds): Promise<XAPIResponse[]> {
async queryAggregatedeEvents(params: QueryWithAggregatorIds): Promise<XAPIResponse[]> {
const query = `
query QueryAggregatedEvents(
${params.ids ? "$ids: [String]" : ""}
$ids: [String]
$aggregator: String
) {
aggregatedEvents(
where: {
${params.ids ? "request_id_in: $ids" : ""}
request_id_in: $ids
aggregator: $aggregator
}
) {
valid_reporters
Expand All @@ -136,6 +143,7 @@ export class NearGraphqlService extends AbstractGraphqlService {
query,
variables: {
ids: params.ids,
aggregator: params.aggregator
},
});
return data["aggregatedEvents"];
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4244,6 +4244,11 @@ decamelize@^1.1.0:
resolved "https://registry.yarnpkg.com/decamelize/-/decamelize-1.2.0.tgz#f6534d15148269b20352e7bee26f501f9a191290"
integrity sha512-z2S+W9X73hAUUki+N+9Za2lBlun89zigOyGrsax+KUQ6wKW4ZoWpEYBkGhQjwAjjDCkWxhY0VKEhk8wzY7F5cA==

decimal.js@^10.4.3:
version "10.4.3"
resolved "https://registry.yarnpkg.com/decimal.js/-/decimal.js-10.4.3.tgz#1044092884d245d1b7f65725fa4ad4c6f781cc23"
integrity sha512-VBBaLc1MgL5XpzgIP7ny5Z6Nx3UrRkIViUkPUdtl9aya5amy3De1gsUUSB1g3+3sExYNjCAsAznmukyxCb1GRA==

[email protected], dedent@^1.0.0:
version "1.5.3"
resolved "https://registry.yarnpkg.com/dedent/-/dedent-1.5.3.tgz#99aee19eb9bae55a67327717b6e848d0bf777e5a"
Expand Down

0 comments on commit 439dc6a

Please sign in to comment.