diff --git a/package.json b/package.json index 0d15503d..a5e0ba29 100644 --- a/package.json +++ b/package.json @@ -44,6 +44,7 @@ "ethers": "^6.12.1", "immer": "^10.1.1", "lodash": "^4.17.21", + "workerpool": "^9.1.1", "zod": "^3.23.8" }, "devDependencies": { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 893be9dc..e0379714 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -32,6 +32,9 @@ dependencies: lodash: specifier: ^4.17.21 version: 4.17.21 + workerpool: + specifier: ^9.1.1 + version: 9.1.1 zod: specifier: ^3.23.8 version: 3.23.8 @@ -8146,6 +8149,10 @@ packages: resolution: {integrity: sha512-ILEIE97kDZvF9Wb9f6h5aXK4swSlKGUcOEGiIYb2OOu/IrDU9iwj0fD//SsA6E5ibwJxpEvhullJY4Sl4GcpAw==} dev: true + /workerpool@9.1.1: + resolution: {integrity: sha512-EFoFTSEo9m4V4wNrwzVRjxnf/E/oBpOzcI/R5CIugJhl9RsCiq525rszo4AtqcjQQoqFdu2E3H82AnbtpaQHvg==} + dev: false + /wrap-ansi@7.0.0: resolution: {integrity: sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==} engines: {node: '>=10'} diff --git a/src/data-fetcher-loop/data-fetcher-loop.test.ts b/src/data-fetcher-loop/data-fetcher-loop.test.ts index 844a3567..0de5b06a 100644 --- a/src/data-fetcher-loop/data-fetcher-loop.test.ts +++ b/src/data-fetcher-loop/data-fetcher-loop.test.ts @@ -1,22 +1,33 @@ import * as commonsModule from '@api3/commons'; import { initializeState } from '../../test/fixtures/mock-config'; +import { createMockSignedDataVerifier } from '../../test/utils'; +import { logger } from '../logger'; import { updateState } from '../state'; import * as dataFetcherLoopModule from './data-fetcher-loop'; import * as signedDataStateModule from './signed-data-state'; +import * as signedDataVerifierPoolModule from './signed-data-verifier-pool'; -describe('data fetcher', () => { +describe(dataFetcherLoopModule.runDataFetcher.name, () => { beforeEach(() => { initializeState(); updateState((draft) => { draft.signedApiUrls = { '31337': { hardhat: ['http://127.0.0.1:8090/0xC04575A2773Da9Cd23853A69694e02111b2c4182'] }, }; + draft.activeDataFeedBeaconIds = { + '31337': { + hardhat: [ + '0x91be0acf2d58a15c7cf687edabe4e255fdb27fbb77eba2a52f3bb3b46c99ec04', + '0xddc6ca9cc6f5768d9bfa8cc59f79bde8cf97a6521d0b95835255951ce06f19e6', + ], + }, + }; }); }); - it('retrieves signed data from urls', async () => { + it('saves signed data for active data feeds', async () => { const saveSignedDataSpy = jest.spyOn(signedDataStateModule, 'saveSignedData'); jest.spyOn(commonsModule, 'executeRequest').mockResolvedValue({ @@ -53,22 +64,35 @@ describe('data fetcher', () => { }, }, }); - jest.spyOn(dataFetcherLoopModule, 'callSignedApi'); jest.spyOn(signedDataStateModule, 'isSignedDataFresh').mockReturnValue(true); + jest.spyOn(signedDataVerifierPoolModule, 'getVerifier').mockResolvedValue(createMockSignedDataVerifier()); + jest.spyOn(logger, 'info'); - const dataFetcherPromise = dataFetcherLoopModule.runDataFetcher(); - await expect(dataFetcherPromise).resolves.toBeDefined(); + await expect(dataFetcherLoopModule.runDataFetcher()).resolves.toBeDefined(); expect(commonsModule.executeRequest).toHaveBeenCalledTimes(1); - expect(saveSignedDataSpy).toHaveBeenCalledTimes(3); + expect(saveSignedDataSpy).toHaveBeenCalledTimes(1); expect(dataFetcherLoopModule.callSignedApi).toHaveBeenNthCalledWith( 1, 'http://127.0.0.1:8090/0xC04575A2773Da9Cd23853A69694e02111b2c4182', 10_000 ); + expect(logger.info).toHaveBeenCalledTimes(3); + expect(logger.info).toHaveBeenNthCalledWith(1, 'Fetching signed data.', { urlCount: 1, staggerTimeMs: 10_000 }); + expect(logger.info).toHaveBeenNthCalledWith(2, 'Fetched signed data from Signed API.', expect.any(Object)); + expect(logger.info).toHaveBeenNthCalledWith( + 3, + 'Saved signed data from Signed API using a worker.', + expect.objectContaining({ + url: 'http://127.0.0.1:8090/0xC04575A2773Da9Cd23853A69694e02111b2c4182', + signedDataCount: 2, + }) + ); }); +}); +describe(dataFetcherLoopModule.callSignedApi.name, () => { it('handles parsing error from Signed API', async () => { jest.spyOn(commonsModule, 'executeRequest').mockResolvedValue({ success: true, @@ -85,7 +109,11 @@ describe('data fetcher', () => { }, }, }); + jest.spyOn(logger, 'warn'); await expect(dataFetcherLoopModule.callSignedApi('some-url', 10_000)).resolves.toBeNull(); + + expect(logger.warn).toHaveBeenCalledTimes(1); + expect(logger.warn).toHaveBeenCalledWith('Failed to parse signed API response.', { url: 'some-url' }); }); }); diff --git a/src/data-fetcher-loop/data-fetcher-loop.ts b/src/data-fetcher-loop/data-fetcher-loop.ts index 02074e6a..30cbb9a9 100644 --- a/src/data-fetcher-loop/data-fetcher-loop.ts +++ b/src/data-fetcher-loop/data-fetcher-loop.ts @@ -1,9 +1,9 @@ -import { executeRequest } from '@api3/commons'; +import { type Hex, executeRequest } from '@api3/commons'; import { uniq } from 'lodash'; import { logger } from '../logger'; import { getState } from '../state'; -import { signedApiResponseSchema, type SignedData } from '../types'; +import { type SignedDataRecord, signedApiResponseSchema, type SignedDataRecordEntry } from '../types'; import { sleep } from '../utils'; import { purgeOldSignedData, saveSignedData } from './signed-data-state'; @@ -27,7 +27,7 @@ export const startDataFetcherLoop = () => { * - Actual handler fn: * https://github.com/api3dao/signed-api/blob/b6e0d0700dd9e7547b37eaa65e98b50120220105/packages/api/src/handlers.ts#L81 */ -export const callSignedApi = async (url: string, timeout: number): Promise => { +export const callSignedApi = async (url: string, timeout: number): Promise => { const executionResult = await executeRequest({ method: 'get', timeout, @@ -52,7 +52,7 @@ export const callSignedApi = async (url: string, timeout: number): Promise { @@ -61,9 +61,18 @@ export const runDataFetcher = async () => { const { config: { signedDataFetchInterval }, signedApiUrls, + activeDataFeedBeaconIds, } = state; const signedDataFetchIntervalMs = signedDataFetchInterval * 1000; + // Compute all the unique active beacon IDs reported across all data providers. Only signed data for these beacons + // will be saved by Airseeker. + const activeBeaconIds = new Set( + Object.values(activeDataFeedBeaconIds) + .map((beaconIdsPerProvider) => Object.values(beaconIdsPerProvider)) + .flat(2) + ); + // Better to log the non-decomposed object to see which URL comes from which chain-provider group. logger.debug('Signed API URLs.', { signedApiUrls }); const urls = uniq( @@ -74,7 +83,7 @@ export const runDataFetcher = async () => { const urlCount = urls.length; const staggerTimeMs = signedDataFetchIntervalMs / urlCount; - logger.info('Fetching signed data', { urlCount, staggerTimeMs }); + logger.info('Fetching signed data.', { urlCount, staggerTimeMs }); const fetchResults = await Promise.all( urls.map(async (url, index) => { await sleep(staggerTimeMs * index); @@ -83,12 +92,20 @@ export const runDataFetcher = async () => { // NOTE: We allow each Signed API call to take full signedDataFetchIntervalMs. Because these calls are // staggered, it means that there can be pending requests from different data fetcher loops happening at the // same time. This does not matter much, because we only save the freshest signed data. - const signedDataApiResponse = await callSignedApi(url, signedDataFetchIntervalMs); - if (!signedDataApiResponse) return; + const signedDataBatch = await callSignedApi(url, signedDataFetchIntervalMs); + if (!signedDataBatch) return; logger.info('Fetched signed data from Signed API.', { url, duration: Date.now() - now }); - for (const signedData of signedDataApiResponse) saveSignedData(signedData); - logger.info('Saved all signed data from Signed API.', { url, duration: Date.now() - now }); + // Save only the signed data that is relevant to the active data feeds. + const signedDataForActiveBeacons = Object.entries(signedDataBatch).filter(([beaconId]) => + activeBeaconIds.has(beaconId as Hex) + ); + const signedDataCount = await saveSignedData(signedDataForActiveBeacons as SignedDataRecordEntry[]); + logger.info('Saved signed data from Signed API using a worker.', { + url, + duration: Date.now() - now, + signedDataCount, + }); }) ); diff --git a/src/data-fetcher-loop/signed-data-state.test.ts b/src/data-fetcher-loop/signed-data-state.test.ts index 202b34e6..6f14ff89 100644 --- a/src/data-fetcher-loop/signed-data-state.test.ts +++ b/src/data-fetcher-loop/signed-data-state.test.ts @@ -2,14 +2,16 @@ import { deriveBeaconId, type Hex } from '@api3/commons'; import { ethers } from 'ethers'; import { initializeState } from '../../test/fixtures/mock-config'; -import { allowPartial, generateRandomBytes, signData } from '../../test/utils'; +import { allowPartial, createMockSignedDataVerifier, generateRandomBytes, signData } from '../../test/utils'; +import { logger } from '../logger'; import { getState, updateState } from '../state'; import type { SignedData } from '../types'; import * as signedDataStateModule from './signed-data-state'; +import * as signedDataVerifierPoolModule from './signed-data-verifier-pool'; -describe('signed data state', () => { - let testDataPoint: SignedData; +describe(signedDataStateModule.saveSignedData.name, () => { + let validSignedData: SignedData; const signer = ethers.Wallet.fromPhrase('test test test test test test test test test test test junk'); beforeAll(async () => { @@ -19,60 +21,134 @@ describe('signed data state', () => { const airnode = signer.address as Hex; const encodedValue = ethers.AbiCoder.defaultAbiCoder().encode(['int256'], [1n]); - testDataPoint = { + validSignedData = { airnode, encodedValue, - signature: await signData(signer, templateId, timestamp, encodedValue), templateId, timestamp, + signature: await signData(signer, templateId, timestamp, encodedValue), }; }); - it('stores and gets a data point', () => { + it('stores signed data', async () => { jest.spyOn(signedDataStateModule, 'isSignedDataFresh').mockReturnValue(true); - signedDataStateModule.saveSignedData(testDataPoint); - const dataFeedId = deriveBeaconId(testDataPoint.airnode, testDataPoint.templateId) as Hex; + jest.spyOn(signedDataVerifierPoolModule, 'getVerifier').mockResolvedValue(createMockSignedDataVerifier()); + const beaconId = deriveBeaconId(validSignedData.airnode, validSignedData.templateId) as Hex; - const datapoint = signedDataStateModule.getSignedData(dataFeedId); + await signedDataStateModule.saveSignedData([[beaconId, validSignedData]]); - expect(datapoint).toStrictEqual(testDataPoint); + const signedData = signedDataStateModule.getSignedData(beaconId); + expect(signedData).toStrictEqual(validSignedData); }); - it('checks that the timestamp on signed data is not in the future', async () => { + it('does not store signed data that is older than already stored one', async () => { + const beaconId = deriveBeaconId(validSignedData.airnode, validSignedData.templateId) as Hex; + const timestamp = String(Number(validSignedData.timestamp) + 10); // 10s newer. + const storedSignedData = { + ...validSignedData, + timestamp, + signature: await signData(signer, validSignedData.templateId, timestamp, validSignedData.encodedValue), + }; + updateState((draft) => { + draft.signedDatas[beaconId] = storedSignedData; + }); + jest.spyOn(logger, 'debug'); + + await signedDataStateModule.saveSignedData([[beaconId, validSignedData]]); + + expect(signedDataStateModule.getSignedData(beaconId)).toStrictEqual(storedSignedData); + expect(logger.debug).toHaveBeenCalledTimes(1); + expect(logger.debug).toHaveBeenCalledWith( + 'Skipping state update. The signed data value is not fresher than the stored value.' + ); + }); + + it('does not accept signed data that is too far in the future', async () => { const templateId = generateRandomBytes(32); const timestamp = Math.floor((Date.now() + 61 * 60 * 1000) / 1000).toString(); const airnode = signer.address as Hex; const encodedValue = ethers.AbiCoder.defaultAbiCoder().encode(['int256'], [1n]); - const futureTestDataPoint = { + const futureSignedData = { airnode, encodedValue, signature: await signData(signer, templateId, timestamp, encodedValue), templateId, timestamp, }; + const beaconId = deriveBeaconId(airnode, templateId) as Hex; + jest.spyOn(signedDataVerifierPoolModule, 'getVerifier').mockResolvedValue(createMockSignedDataVerifier()); + jest.spyOn(logger, 'warn'); + jest.spyOn(logger, 'error'); + + await signedDataStateModule.saveSignedData([[beaconId, futureSignedData]]); + + expect(signedDataStateModule.getSignedData(beaconId)).toBeUndefined(); + expect(logger.error).toHaveBeenCalledTimes(1); + expect(logger.error).toHaveBeenCalledWith( + 'Refusing to store sample as timestamp is more than one hour in the future.', + expect.any(Object) + ); + expect(logger.warn).toHaveBeenCalledTimes(0); + }); - expect(signedDataStateModule.verifySignedDataIntegrity(testDataPoint)).toBeTruthy(); - expect(signedDataStateModule.verifySignedDataIntegrity(futureTestDataPoint)).toBeFalsy(); + it('accepts signed data that is less then 1h in the future', async () => { + const templateId = generateRandomBytes(32); + const timestamp = Math.floor((Date.now() + 30 * 60 * 1000) / 1000).toString(); + const airnode = signer.address as Hex; + const encodedValue = ethers.AbiCoder.defaultAbiCoder().encode(['int256'], [1n]); + const futureSignedData = { + airnode, + encodedValue, + signature: await signData(signer, templateId, timestamp, encodedValue), + templateId, + timestamp, + }; + const beaconId = deriveBeaconId(airnode, templateId) as Hex; + jest.spyOn(signedDataVerifierPoolModule, 'getVerifier').mockResolvedValue(createMockSignedDataVerifier()); + jest.spyOn(logger, 'warn'); + jest.spyOn(logger, 'error'); + + await signedDataStateModule.saveSignedData([[beaconId, futureSignedData]]); + + expect(signedDataStateModule.getSignedData(deriveBeaconId(airnode, templateId) as Hex)).toStrictEqual( + futureSignedData + ); + expect(logger.error).toHaveBeenCalledTimes(0); + expect(logger.warn).toHaveBeenCalledTimes(1); + expect(logger.warn).toHaveBeenCalledWith( + 'Sample is in the future, but by less than an hour, therefore storing anyway.', + expect.any(Object) + ); }); - // eslint-disable-next-line jest/no-disabled-tests - it.skip('checks the signature on signed data', async () => { + it('checks the signature on signed data', async () => { const templateId = generateRandomBytes(32); - const timestamp = Math.floor((Date.now() + 60 * 60 * 1000) / 1000).toString(); + const timestamp = Math.floor((Date.now() - 0.5 * 1000) / 1000).toString(); const airnode = ethers.Wallet.createRandom().address as Hex; const encodedValue = ethers.AbiCoder.defaultAbiCoder().encode(['int256'], [1n]); + jest.spyOn(signedDataVerifierPoolModule, 'getVerifier').mockResolvedValue(createMockSignedDataVerifier()); + jest.spyOn(logger, 'warn'); + jest.spyOn(logger, 'error'); - const badTestDataPoint = { + const badSignedData = { airnode, encodedValue, signature: await signData(signer, templateId, timestamp, encodedValue), templateId, timestamp, }; + const beaconId = deriveBeaconId(airnode, templateId) as Hex; - expect(signedDataStateModule.verifySignedDataIntegrity(badTestDataPoint)).toBeFalsy(); + await signedDataStateModule.saveSignedData([[beaconId, badSignedData]]); + + expect(signedDataStateModule.getSignedData(deriveBeaconId(airnode, templateId) as Hex)).toBeUndefined(); + expect(logger.error).toHaveBeenCalledTimes(1); + expect(logger.error).toHaveBeenCalledWith('Failed to verify signed data.', badSignedData); + expect(logger.warn).toHaveBeenCalledTimes(0); }); +}); +describe(signedDataStateModule.purgeOldSignedData.name, () => { it('purges old data from the state', () => { const baseTime = 1_700_126_230_000; jest.useFakeTimers().setSystemTime(baseTime); diff --git a/src/data-fetcher-loop/signed-data-state.ts b/src/data-fetcher-loop/signed-data-state.ts index 0ee29a31..a1a434c6 100644 --- a/src/data-fetcher-loop/signed-data-state.ts +++ b/src/data-fetcher-loop/signed-data-state.ts @@ -1,100 +1,82 @@ -import { type Hex, deriveBeaconId } from '@api3/commons'; -import { goSync } from '@api3/promise-utils'; -import { ethers } from 'ethers'; +import type { Hex } from '@api3/commons'; import { logger } from '../logger'; import { getState, updateState } from '../state'; -import type { SignedData } from '../types'; +import type { SignedData, SignedDataRecordEntry } from '../types'; -export const verifySignedData = ({ airnode, templateId, timestamp, signature, encodedValue }: SignedData) => { - // Verification is wrapped in goSync, because ethers methods can potentially throw on invalid input. - const goVerify = goSync(() => { - const message = ethers.getBytes( - ethers.solidityPackedKeccak256(['bytes32', 'uint256', 'bytes'], [templateId, timestamp, encodedValue]) - ); +import { getVerifier } from './signed-data-verifier-pool'; - const signerAddr = ethers.verifyMessage(message, signature); - if (signerAddr !== airnode) throw new Error('Signer address does not match'); - }); +const verifyTimestamp = ([beaconId, signedData]: SignedDataRecordEntry) => { + const { airnode, templateId, timestamp } = signedData; - if (!goVerify.success) { - logger.error(`Signature verification failed.`, { - signature, - timestamp, - encodedValue, - }); + // Check that the signed data is fresher than the one stored in state. + const timestampMs = Number(timestamp) * 1000; + const storedValue = getState().signedDatas[beaconId]; + if (storedValue && Number(storedValue.timestamp) * 1000 >= timestampMs) { + logger.debug('Skipping state update. The signed data value is not fresher than the stored value.'); return false; } - return true; -}; - -const verifyTimestamp = (timestamp: number) => { - const timestampMs = timestamp * 1000; - - if (timestampMs > Date.now() + 60 * 60 * 1000) { + // Verify the timestamp of the signed data. + const nowMs = Date.now(); + if (timestampMs > nowMs + 60 * 60 * 1000) { logger.error(`Refusing to store sample as timestamp is more than one hour in the future.`, { - systemDateNow: new Date().toLocaleDateString(), - signedDataDate: new Date(timestampMs).toLocaleDateString(), + airnode, + templateId, + timestampMs, + nowMs, }); return false; } - - if (timestampMs > Date.now()) { + if (timestampMs > nowMs) { logger.warn(`Sample is in the future, but by less than an hour, therefore storing anyway.`, { - systemDateNow: new Date().toLocaleDateString(), - signedDataDate: new Date(timestampMs).toLocaleDateString(), + airnode, + templateId, + timestampMs, + nowMs, }); } return true; }; -export const verifySignedDataIntegrity = (signedData: SignedData) => { - // TODO: Temporarily disable signed data verification. - return verifyTimestamp(Number.parseInt(signedData.timestamp, 10)) /* && verifySignedData(signedData) */; -}; - -export const saveSignedData = (signedData: SignedData) => { - const { airnode, templateId, timestamp } = signedData; - - // Make sure we run the verification checks with enough context. - logger.runWithContext({ airnode, templateId }, () => { - if (!verifySignedDataIntegrity(signedData)) { - return; - } - - const state = getState(); - - const dataFeedId = deriveBeaconId(airnode, templateId) as Hex; - - const existingValue = state.signedDatas[dataFeedId]; - if (existingValue && existingValue.timestamp >= timestamp) { - logger.debug('Skipping state update. The signed data value is not fresher than the stored value.'); - return; +export const saveSignedData = async (signedDataBatch: SignedDataRecordEntry[]) => { + // Filter out signed data with invalid timestamps or we already have a fresher signed data stored in state. + signedDataBatch = signedDataBatch.filter((signedDataEntry) => verifyTimestamp(signedDataEntry)); + if (signedDataBatch.length === 0) return; + + const verifier = await getVerifier(); + // We are skipping the whole batch even if there is only one invalid signed data. This is consistent with the Signed + // API approach. + const verificationResult = await verifier.verifySignedData(signedDataBatch); + if (verificationResult !== true) { + logger.error('Failed to verify signed data.', verificationResult); + return; + } + updateState((draft) => { + for (const [beaconId, signedData] of signedDataBatch) { + draft.signedDatas[beaconId] = signedData; } - - updateState((draft) => { - draft.signedDatas[dataFeedId] = signedData; - }); }); + + return signedDataBatch.length; }; -export const getSignedData = (dataFeedId: Hex) => getState().signedDatas[dataFeedId]; +export const getSignedData = (beaconId: Hex) => getState().signedDatas[beaconId]; export const isSignedDataFresh = (signedData: SignedData) => BigInt(signedData.timestamp) > BigInt(Math.ceil(Date.now() / 1000 - 24 * 60 * 60)); export const purgeOldSignedData = () => { const state = getState(); - const oldSignedData = Object.values(state.signedDatas).filter((signedData) => isSignedDataFresh(signedData)); + const oldSignedData = Object.values(state.signedDatas).filter((signedData) => isSignedDataFresh(signedData!)); if (oldSignedData.length > 0) { logger.debug(`Purging some old signed data.`, { oldSignedData }); } updateState((draft) => { draft.signedDatas = Object.fromEntries( - Object.entries(draft.signedDatas).filter(([_dataFeedId, signedData]) => isSignedDataFresh(signedData)) + Object.entries(draft.signedDatas).filter(([_beaconId, signedData]) => isSignedDataFresh(signedData!)) ); }); }; diff --git a/src/data-fetcher-loop/signed-data-verifier-pool.ts b/src/data-fetcher-loop/signed-data-verifier-pool.ts new file mode 100644 index 00000000..39abf8c0 --- /dev/null +++ b/src/data-fetcher-loop/signed-data-verifier-pool.ts @@ -0,0 +1,43 @@ +import workerpool, { type Pool } from 'workerpool'; + +// Create a worker pool using an external worker script. +let pool: Pool | undefined; + +export const initializeVerifierPool = () => { + // If the pool is already initialized, no need to re-initialize it. + if (pool) return pool; + + // Allow using the worker from TS (run in development mode) or JS files (when compiled). Note, that transpiling the + // file in development mode is done by ts-node and so it must be available. + const extension = __filename.endsWith('.ts') ? 'ts' : 'js'; + // By default the max workers is the number of CPU cores minus one. This is dangerous when the Signed API is deployed + // on a single core machine (possible on low tier Cloud). We set the min number of workers to 1 to avoid this issue. + // This will also correctly set the maximum number of workers. See: + // https://github.com/josdejong/workerpool/blob/a1d85d5e49ca7632a43251d703e69f1c3ba4107b/src/Pool.js#L76 + // + // As a note, on AWS the min number of workers is set to 1 even with the defaults (even with 256 CPU). + const baseOptions = { + workerType: 'thread', + minWorkers: 1, + } as const; + // Allow using the worker as a TypeScript module. See: + // https://github.com/josdejong/workerpool/issues/379#issuecomment-1580093502. + const options = + extension === 'ts' + ? { + ...baseOptions, + workerThreadOpts: { + execArgv: ['--require', 'ts-node/register'], + }, + } + : baseOptions; + pool = workerpool.pool(`${__dirname}/signed-data-verifier.${extension}`, options); + + return pool; +}; + +export const getVerifier = async () => { + if (!pool) throw new Error('Worker pool has not been initialized'); + + return pool.proxy(); +}; diff --git a/src/data-fetcher-loop/signed-data-verifier-worker.ts b/src/data-fetcher-loop/signed-data-verifier-worker.ts new file mode 100644 index 00000000..72e10a95 --- /dev/null +++ b/src/data-fetcher-loop/signed-data-verifier-worker.ts @@ -0,0 +1,8 @@ +import workerpool from 'workerpool'; + +import { verifySignedData } from './signed-data-verifier'; + +// Create a worker from this module and register public functions. +workerpool.worker({ + verifySignedData, +}); diff --git a/src/data-fetcher-loop/signed-data-verifier.ts b/src/data-fetcher-loop/signed-data-verifier.ts new file mode 100644 index 00000000..37c04e99 --- /dev/null +++ b/src/data-fetcher-loop/signed-data-verifier.ts @@ -0,0 +1,35 @@ +import { deriveBeaconId } from '@api3/commons'; +import { goSync } from '@api3/promise-utils'; +import { ethers } from 'ethers'; + +import type { SignedData, SignedDataRecordEntry } from '../types'; + +// The function is supposed to be run from inside a worker thread. It validates a batch of signed data and a whole batch +// is rejected if there is even a single invalid signed data. +// +// If the verification is successful, the function returns `true`. Otherwise, returns the signed data that caused the +// validation to fail. +export const verifySignedData = (signedDataBatch: SignedDataRecordEntry[]): SignedData | true => { + for (const [beaconId, signedData] of signedDataBatch) { + const { airnode, templateId, timestamp, encodedValue, signature } = signedData; + + // Verification is wrapped in goSync, because ethers methods can potentially throw on invalid input. + const goVerifySignature = goSync(() => { + const message = ethers.getBytes( + ethers.solidityPackedKeccak256(['bytes32', 'uint256', 'bytes'], [templateId, timestamp, encodedValue]) + ); + + const signerAddress = ethers.verifyMessage(message, signature); + if (signerAddress !== airnode) throw new Error('Signer address does not match'); + }); + if (!goVerifySignature.success) return signedData; + + const goVerifyBeaconId = goSync(() => { + const derivedBeaconId = deriveBeaconId(airnode, templateId); + if (derivedBeaconId !== beaconId) throw new Error('Beacon ID does not match'); + }); + if (!goVerifyBeaconId.success) return signedData; + } + + return true; +}; diff --git a/src/index.ts b/src/index.ts index 89235109..cf2aadf1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,6 @@ import { loadConfig } from './config'; import { startDataFetcherLoop } from './data-fetcher-loop'; +import { initializeVerifierPool } from './data-fetcher-loop/signed-data-verifier-pool'; import { loadEnv } from './env/env'; import { startHeartbeatLoop } from './heartbeat-loop'; import { logger } from './logger'; @@ -10,6 +11,7 @@ function main() { logger.info('Loading configuration and setting initial state.'); const config = loadConfig(); setInitialState(config); + initializeVerifierPool(); logger.info('Starting Airseeker loops.'); startDataFetcherLoop(); diff --git a/src/state/state.test.ts b/src/state/state.test.ts index 45b20cca..70d0c753 100644 --- a/src/state/state.test.ts +++ b/src/state/state.test.ts @@ -38,6 +38,7 @@ const stateMock: State = { signedApiUrls: { '31337': { hardhat: ['http://127.0.0.1:8090/0xC04575A2773Da9Cd23853A69694e02111b2c4182'] } }, derivedSponsorWallets: {}, deploymentTimestamp: '1687850583', + activeDataFeedBeaconIds: { '31337': { hardhat: [] } }, }; beforeAll(() => { diff --git a/src/state/state.ts b/src/state/state.ts index 0ea33300..09bc84e2 100644 --- a/src/state/state.ts +++ b/src/state/state.ts @@ -2,7 +2,7 @@ import type { Address, ChainId, Hex } from '@api3/commons'; import { produce, type Draft } from 'immer'; import type { Config } from '../config/schema'; -import type { SignedData } from '../types'; +import type { SignedDataRecord } from '../types'; interface GasPriceInfo { price: bigint; @@ -26,10 +26,11 @@ export interface State { Record> >; derivedSponsorWallets: Record; - signedDatas: Record; + signedDatas: SignedDataRecord; signedApiUrls: Record>; // The timestamp of when the service was initialized. This can be treated as a "deployment" timestamp. deploymentTimestamp: string; + activeDataFeedBeaconIds: Record>; } let state: State | undefined; @@ -51,6 +52,7 @@ export const setInitialState = (config: Config) => { signedApiUrls: {}, derivedSponsorWallets: {}, deploymentTimestamp: Math.floor(Date.now() / 1000).toString(), + activeDataFeedBeaconIds: {}, }; }; diff --git a/src/types.ts b/src/types.ts index 1f95f148..5e9540e5 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,4 +1,4 @@ -import { addressSchema, hexSchema, keccak256HashSchema } from '@api3/commons'; +import { type Hex, addressSchema, hexSchema, keccak256HashSchema } from '@api3/commons'; import { z } from 'zod'; // Taken from https://github.com/api3dao/signed-api/blob/main/packages/api/src/schema.ts @@ -12,7 +12,13 @@ export const signedDataSchema = z.object({ export type SignedData = z.infer; +const signedDataRecord = z.record(keccak256HashSchema, signedDataSchema); + +export type SignedDataRecord = z.infer; + export const signedApiResponseSchema = z.object({ count: z.number().positive(), - data: z.record(signedDataSchema), + data: signedDataRecord, }); + +export type SignedDataRecordEntry = [Hex /* Beacon ID */, SignedData]; diff --git a/src/update-feeds-loops/update-feeds-loops.ts b/src/update-feeds-loops/update-feeds-loops.ts index b3120092..24b4e22d 100644 --- a/src/update-feeds-loops/update-feeds-loops.ts +++ b/src/update-feeds-loops/update-feeds-loops.ts @@ -251,12 +251,18 @@ export const runUpdateFeeds = async (providerName: string, chain: Chain, chainId activeDataFeedCount, }); - // Update the state with the signed API URLs. + // Merge the signed API URLs and active beacons from all the batches. const signedApiUrls = uniq( processedBatches.reduce((acc, batch) => (batch ? [...acc, ...batch.signedApiUrls] : acc), []) ); - // Overwrite the state with the new signed API URLs instead of merging them to avoid stale URLs. - updateState((draft) => set(draft, ['signedApiUrls', chainId, providerName], signedApiUrls)); + const beaconIds = uniq( + processedBatches.reduce((acc, batch) => (batch ? [...acc, ...batch.beaconIds] : acc), []) + ); + // Overwrite the state with the new signed API URLs instead of merging them to avoid keeping stale URLs. + updateState((draft) => { + set(draft, ['signedApiUrls', chainId, providerName], signedApiUrls); + set(draft, ['activeDataFeedBeaconIds', chainId, providerName], beaconIds); + }); }); if (!goRunUpdateFeeds.success) { @@ -300,7 +306,7 @@ export const processBatch = async ( ); const successCount = updatedFeeds.filter(Boolean).length; - // Generate signed API URLs for the batch + // Generate signed API URLs for the batch. const signedApiUrls = batch .map((dataFeed) => dataFeed.beaconsWithData.map((beacon, index) => { @@ -316,5 +322,8 @@ export const processBatch = async ( }) ) .flat(2); - return { signedApiUrls, successCount, errorCount: size(feedsToUpdate) - successCount }; + // Get the beacon IDs for the active data feeds. + const beaconIds = batch.flatMap((dataFeed) => dataFeed.beaconsWithData.map((beacon) => beacon.beaconId)); + + return { signedApiUrls, beaconIds, successCount, errorCount: size(feedsToUpdate) - successCount }; }; diff --git a/test/utils.ts b/test/utils.ts index 2182dc8b..bbc4ee06 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -3,7 +3,9 @@ import { randomBytes } from 'node:crypto'; import type { Address, Hex } from '@api3/commons'; import { ethers, type HDNodeWallet } from 'ethers'; -import type { SignedData } from '../src/types'; +import { verifySignedData } from '../src/data-fetcher-loop/signed-data-verifier'; +import type { getVerifier } from '../src/data-fetcher-loop/signed-data-verifier-pool'; +import type { SignedData, SignedDataRecordEntry } from '../src/types'; import type { Beacon } from '../src/update-feeds-loops/contracts'; export const signData = async (signer: ethers.Signer, templateId: string, timestamp: string, data: string) => @@ -51,3 +53,10 @@ export const generateSignedData = async ( signature, }; }; + +export const createMockSignedDataVerifier = () => { + return { + // eslint-disable-next-line @typescript-eslint/require-await + verifySignedData: async (signedDataBatch: SignedDataRecordEntry[]) => verifySignedData(signedDataBatch), + } as unknown as Awaited>; +};