Skip to content

Commit

Permalink
Verify signed data using worker thread (#291)
Browse files Browse the repository at this point in the history
* Rename

* Verify signed data in a worker

* Filter out signed data for non-active beacons

* Improve current tests

* Self review

* Validate beacon IDs as well

* Make sure to save the fresher value

* Self review

* Rename dataFeedId to beaconId

* Fix review comments
  • Loading branch information
Siegrift authored May 13, 2024
1 parent 5d474c7 commit 9327be4
Show file tree
Hide file tree
Showing 15 changed files with 331 additions and 105 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"ethers": "^6.12.1",
"immer": "^10.1.1",
"lodash": "^4.17.21",
"workerpool": "^9.1.1",
"zod": "^3.23.8"
},
"devDependencies": {
Expand Down
7 changes: 7 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 34 additions & 6 deletions src/data-fetcher-loop/data-fetcher-loop.test.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
import * as commonsModule from '@api3/commons';

import { initializeState } from '../../test/fixtures/mock-config';
import { createMockSignedDataVerifier } from '../../test/utils';
import { logger } from '../logger';
import { updateState } from '../state';

import * as dataFetcherLoopModule from './data-fetcher-loop';
import * as signedDataStateModule from './signed-data-state';
import * as signedDataVerifierPoolModule from './signed-data-verifier-pool';

describe('data fetcher', () => {
describe(dataFetcherLoopModule.runDataFetcher.name, () => {
beforeEach(() => {
initializeState();
updateState((draft) => {
draft.signedApiUrls = {
'31337': { hardhat: ['http://127.0.0.1:8090/0xC04575A2773Da9Cd23853A69694e02111b2c4182'] },
};
draft.activeDataFeedBeaconIds = {
'31337': {
hardhat: [
'0x91be0acf2d58a15c7cf687edabe4e255fdb27fbb77eba2a52f3bb3b46c99ec04',
'0xddc6ca9cc6f5768d9bfa8cc59f79bde8cf97a6521d0b95835255951ce06f19e6',
],
},
};
});
});

it('retrieves signed data from urls', async () => {
it('saves signed data for active data feeds', async () => {
const saveSignedDataSpy = jest.spyOn(signedDataStateModule, 'saveSignedData');

jest.spyOn(commonsModule, 'executeRequest').mockResolvedValue({
Expand Down Expand Up @@ -53,22 +64,35 @@ describe('data fetcher', () => {
},
},
});

jest.spyOn(dataFetcherLoopModule, 'callSignedApi');
jest.spyOn(signedDataStateModule, 'isSignedDataFresh').mockReturnValue(true);
jest.spyOn(signedDataVerifierPoolModule, 'getVerifier').mockResolvedValue(createMockSignedDataVerifier());
jest.spyOn(logger, 'info');

const dataFetcherPromise = dataFetcherLoopModule.runDataFetcher();
await expect(dataFetcherPromise).resolves.toBeDefined();
await expect(dataFetcherLoopModule.runDataFetcher()).resolves.toBeDefined();

expect(commonsModule.executeRequest).toHaveBeenCalledTimes(1);
expect(saveSignedDataSpy).toHaveBeenCalledTimes(3);
expect(saveSignedDataSpy).toHaveBeenCalledTimes(1);
expect(dataFetcherLoopModule.callSignedApi).toHaveBeenNthCalledWith(
1,
'http://127.0.0.1:8090/0xC04575A2773Da9Cd23853A69694e02111b2c4182',
10_000
);
expect(logger.info).toHaveBeenCalledTimes(3);
expect(logger.info).toHaveBeenNthCalledWith(1, 'Fetching signed data.', { urlCount: 1, staggerTimeMs: 10_000 });
expect(logger.info).toHaveBeenNthCalledWith(2, 'Fetched signed data from Signed API.', expect.any(Object));
expect(logger.info).toHaveBeenNthCalledWith(
3,
'Saved signed data from Signed API using a worker.',
expect.objectContaining({
url: 'http://127.0.0.1:8090/0xC04575A2773Da9Cd23853A69694e02111b2c4182',
signedDataCount: 2,
})
);
});
});

describe(dataFetcherLoopModule.callSignedApi.name, () => {
it('handles parsing error from Signed API', async () => {
jest.spyOn(commonsModule, 'executeRequest').mockResolvedValue({
success: true,
Expand All @@ -85,7 +109,11 @@ describe('data fetcher', () => {
},
},
});
jest.spyOn(logger, 'warn');

await expect(dataFetcherLoopModule.callSignedApi('some-url', 10_000)).resolves.toBeNull();

expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn).toHaveBeenCalledWith('Failed to parse signed API response.', { url: 'some-url' });
});
});
35 changes: 26 additions & 9 deletions src/data-fetcher-loop/data-fetcher-loop.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { executeRequest } from '@api3/commons';
import { type Hex, executeRequest } from '@api3/commons';
import { uniq } from 'lodash';

import { logger } from '../logger';
import { getState } from '../state';
import { signedApiResponseSchema, type SignedData } from '../types';
import { type SignedDataRecord, signedApiResponseSchema, type SignedDataRecordEntry } from '../types';
import { sleep } from '../utils';

import { purgeOldSignedData, saveSignedData } from './signed-data-state';
Expand All @@ -27,7 +27,7 @@ export const startDataFetcherLoop = () => {
* - Actual handler fn:
* https://github.com/api3dao/signed-api/blob/b6e0d0700dd9e7547b37eaa65e98b50120220105/packages/api/src/handlers.ts#L81
*/
export const callSignedApi = async (url: string, timeout: number): Promise<SignedData[] | null> => {
export const callSignedApi = async (url: string, timeout: number): Promise<SignedDataRecord | null> => {
const executionResult = await executeRequest({
method: 'get',
timeout,
Expand All @@ -52,7 +52,7 @@ export const callSignedApi = async (url: string, timeout: number): Promise<Signe
return null;
}

return Object.values(parseResult.data.data);
return parseResult.data.data;
};

export const runDataFetcher = async () => {
Expand All @@ -61,9 +61,18 @@ export const runDataFetcher = async () => {
const {
config: { signedDataFetchInterval },
signedApiUrls,
activeDataFeedBeaconIds,
} = state;
const signedDataFetchIntervalMs = signedDataFetchInterval * 1000;

// Compute all the unique active beacon IDs reported across all data providers. Only signed data for these beacons
// will be saved by Airseeker.
const activeBeaconIds = new Set(
Object.values(activeDataFeedBeaconIds)
.map((beaconIdsPerProvider) => Object.values(beaconIdsPerProvider))
.flat(2)
);

// Better to log the non-decomposed object to see which URL comes from which chain-provider group.
logger.debug('Signed API URLs.', { signedApiUrls });
const urls = uniq(
Expand All @@ -74,7 +83,7 @@ export const runDataFetcher = async () => {

const urlCount = urls.length;
const staggerTimeMs = signedDataFetchIntervalMs / urlCount;
logger.info('Fetching signed data', { urlCount, staggerTimeMs });
logger.info('Fetching signed data.', { urlCount, staggerTimeMs });
const fetchResults = await Promise.all(
urls.map(async (url, index) => {
await sleep(staggerTimeMs * index);
Expand All @@ -83,12 +92,20 @@ export const runDataFetcher = async () => {
// NOTE: We allow each Signed API call to take full signedDataFetchIntervalMs. Because these calls are
// staggered, it means that there can be pending requests from different data fetcher loops happening at the
// same time. This does not matter much, because we only save the freshest signed data.
const signedDataApiResponse = await callSignedApi(url, signedDataFetchIntervalMs);
if (!signedDataApiResponse) return;
const signedDataBatch = await callSignedApi(url, signedDataFetchIntervalMs);
if (!signedDataBatch) return;
logger.info('Fetched signed data from Signed API.', { url, duration: Date.now() - now });

for (const signedData of signedDataApiResponse) saveSignedData(signedData);
logger.info('Saved all signed data from Signed API.', { url, duration: Date.now() - now });
// Save only the signed data that is relevant to the active data feeds.
const signedDataForActiveBeacons = Object.entries(signedDataBatch).filter(([beaconId]) =>
activeBeaconIds.has(beaconId as Hex)
);
const signedDataCount = await saveSignedData(signedDataForActiveBeacons as SignedDataRecordEntry[]);
logger.info('Saved signed data from Signed API using a worker.', {
url,
duration: Date.now() - now,
signedDataCount,
});
})
);

Expand Down
114 changes: 95 additions & 19 deletions src/data-fetcher-loop/signed-data-state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ import { deriveBeaconId, type Hex } from '@api3/commons';
import { ethers } from 'ethers';

import { initializeState } from '../../test/fixtures/mock-config';
import { allowPartial, generateRandomBytes, signData } from '../../test/utils';
import { allowPartial, createMockSignedDataVerifier, generateRandomBytes, signData } from '../../test/utils';
import { logger } from '../logger';
import { getState, updateState } from '../state';
import type { SignedData } from '../types';

import * as signedDataStateModule from './signed-data-state';
import * as signedDataVerifierPoolModule from './signed-data-verifier-pool';

describe('signed data state', () => {
let testDataPoint: SignedData;
describe(signedDataStateModule.saveSignedData.name, () => {
let validSignedData: SignedData;
const signer = ethers.Wallet.fromPhrase('test test test test test test test test test test test junk');

beforeAll(async () => {
Expand All @@ -19,60 +21,134 @@ describe('signed data state', () => {
const airnode = signer.address as Hex;
const encodedValue = ethers.AbiCoder.defaultAbiCoder().encode(['int256'], [1n]);

testDataPoint = {
validSignedData = {
airnode,
encodedValue,
signature: await signData(signer, templateId, timestamp, encodedValue),
templateId,
timestamp,
signature: await signData(signer, templateId, timestamp, encodedValue),
};
});

it('stores and gets a data point', () => {
it('stores signed data', async () => {
jest.spyOn(signedDataStateModule, 'isSignedDataFresh').mockReturnValue(true);
signedDataStateModule.saveSignedData(testDataPoint);
const dataFeedId = deriveBeaconId(testDataPoint.airnode, testDataPoint.templateId) as Hex;
jest.spyOn(signedDataVerifierPoolModule, 'getVerifier').mockResolvedValue(createMockSignedDataVerifier());
const beaconId = deriveBeaconId(validSignedData.airnode, validSignedData.templateId) as Hex;

const datapoint = signedDataStateModule.getSignedData(dataFeedId);
await signedDataStateModule.saveSignedData([[beaconId, validSignedData]]);

expect(datapoint).toStrictEqual(testDataPoint);
const signedData = signedDataStateModule.getSignedData(beaconId);
expect(signedData).toStrictEqual(validSignedData);
});

it('checks that the timestamp on signed data is not in the future', async () => {
it('does not store signed data that is older than already stored one', async () => {
const beaconId = deriveBeaconId(validSignedData.airnode, validSignedData.templateId) as Hex;
const timestamp = String(Number(validSignedData.timestamp) + 10); // 10s newer.
const storedSignedData = {
...validSignedData,
timestamp,
signature: await signData(signer, validSignedData.templateId, timestamp, validSignedData.encodedValue),
};
updateState((draft) => {
draft.signedDatas[beaconId] = storedSignedData;
});
jest.spyOn(logger, 'debug');

await signedDataStateModule.saveSignedData([[beaconId, validSignedData]]);

expect(signedDataStateModule.getSignedData(beaconId)).toStrictEqual(storedSignedData);
expect(logger.debug).toHaveBeenCalledTimes(1);
expect(logger.debug).toHaveBeenCalledWith(
'Skipping state update. The signed data value is not fresher than the stored value.'
);
});

it('does not accept signed data that is too far in the future', async () => {
const templateId = generateRandomBytes(32);
const timestamp = Math.floor((Date.now() + 61 * 60 * 1000) / 1000).toString();
const airnode = signer.address as Hex;
const encodedValue = ethers.AbiCoder.defaultAbiCoder().encode(['int256'], [1n]);
const futureTestDataPoint = {
const futureSignedData = {
airnode,
encodedValue,
signature: await signData(signer, templateId, timestamp, encodedValue),
templateId,
timestamp,
};
const beaconId = deriveBeaconId(airnode, templateId) as Hex;
jest.spyOn(signedDataVerifierPoolModule, 'getVerifier').mockResolvedValue(createMockSignedDataVerifier());
jest.spyOn(logger, 'warn');
jest.spyOn(logger, 'error');

await signedDataStateModule.saveSignedData([[beaconId, futureSignedData]]);

expect(signedDataStateModule.getSignedData(beaconId)).toBeUndefined();
expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
'Refusing to store sample as timestamp is more than one hour in the future.',
expect.any(Object)
);
expect(logger.warn).toHaveBeenCalledTimes(0);
});

expect(signedDataStateModule.verifySignedDataIntegrity(testDataPoint)).toBeTruthy();
expect(signedDataStateModule.verifySignedDataIntegrity(futureTestDataPoint)).toBeFalsy();
it('accepts signed data that is less then 1h in the future', async () => {
const templateId = generateRandomBytes(32);
const timestamp = Math.floor((Date.now() + 30 * 60 * 1000) / 1000).toString();
const airnode = signer.address as Hex;
const encodedValue = ethers.AbiCoder.defaultAbiCoder().encode(['int256'], [1n]);
const futureSignedData = {
airnode,
encodedValue,
signature: await signData(signer, templateId, timestamp, encodedValue),
templateId,
timestamp,
};
const beaconId = deriveBeaconId(airnode, templateId) as Hex;
jest.spyOn(signedDataVerifierPoolModule, 'getVerifier').mockResolvedValue(createMockSignedDataVerifier());
jest.spyOn(logger, 'warn');
jest.spyOn(logger, 'error');

await signedDataStateModule.saveSignedData([[beaconId, futureSignedData]]);

expect(signedDataStateModule.getSignedData(deriveBeaconId(airnode, templateId) as Hex)).toStrictEqual(
futureSignedData
);
expect(logger.error).toHaveBeenCalledTimes(0);
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn).toHaveBeenCalledWith(
'Sample is in the future, but by less than an hour, therefore storing anyway.',
expect.any(Object)
);
});

// eslint-disable-next-line jest/no-disabled-tests
it.skip('checks the signature on signed data', async () => {
it('checks the signature on signed data', async () => {
const templateId = generateRandomBytes(32);
const timestamp = Math.floor((Date.now() + 60 * 60 * 1000) / 1000).toString();
const timestamp = Math.floor((Date.now() - 0.5 * 1000) / 1000).toString();
const airnode = ethers.Wallet.createRandom().address as Hex;
const encodedValue = ethers.AbiCoder.defaultAbiCoder().encode(['int256'], [1n]);
jest.spyOn(signedDataVerifierPoolModule, 'getVerifier').mockResolvedValue(createMockSignedDataVerifier());
jest.spyOn(logger, 'warn');
jest.spyOn(logger, 'error');

const badTestDataPoint = {
const badSignedData = {
airnode,
encodedValue,
signature: await signData(signer, templateId, timestamp, encodedValue),
templateId,
timestamp,
};
const beaconId = deriveBeaconId(airnode, templateId) as Hex;

expect(signedDataStateModule.verifySignedDataIntegrity(badTestDataPoint)).toBeFalsy();
await signedDataStateModule.saveSignedData([[beaconId, badSignedData]]);

expect(signedDataStateModule.getSignedData(deriveBeaconId(airnode, templateId) as Hex)).toBeUndefined();
expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith('Failed to verify signed data.', badSignedData);
expect(logger.warn).toHaveBeenCalledTimes(0);
});
});

describe(signedDataStateModule.purgeOldSignedData.name, () => {
it('purges old data from the state', () => {
const baseTime = 1_700_126_230_000;
jest.useFakeTimers().setSystemTime(baseTime);
Expand Down
Loading

0 comments on commit 9327be4

Please sign in to comment.