diff --git a/config/default.json b/config/default.json index 73b8a1b2c..a052671d0 100644 --- a/config/default.json +++ b/config/default.json @@ -14,6 +14,7 @@ "requireAuth": false, "schedulerIntervalMS": 300000, "schedulerStopAfterNoOp": false, + "pubsubResponderWindowMs": 8035200000, "carStorage": { "mode": "inmemory", "s3BucketName": "myS3Bucket", diff --git a/config/env/dev.json b/config/env/dev.json index 473e87c0f..a20965638 100644 --- a/config/env/dev.json +++ b/config/env/dev.json @@ -14,6 +14,7 @@ "requireAuth": "@@REQUIRE_AUTH", "schedulerIntervalMS": "@@SCHEDULER_INTERVAL_MS", "schedulerStopAfterNoOp": "@@SCHEDULER_STOP_AFTER_NO_OP", + "pubsubResponderWindowMs": "@@PUBSUB_RESPONDER_WINDOW_MS", "carStorage": { "mode": "@@MERKLE_CAR_STORAGE_MODE", "s3BucketName": "@@S3_BUCKET_NAME", diff --git a/config/env/prod.json b/config/env/prod.json index 473e87c0f..a20965638 100644 --- a/config/env/prod.json +++ b/config/env/prod.json @@ -14,6 +14,7 @@ "requireAuth": "@@REQUIRE_AUTH", "schedulerIntervalMS": "@@SCHEDULER_INTERVAL_MS", "schedulerStopAfterNoOp": "@@SCHEDULER_STOP_AFTER_NO_OP", + "pubsubResponderWindowMs": "@@PUBSUB_RESPONDER_WINDOW_MS", "carStorage": { "mode": "@@MERKLE_CAR_STORAGE_MODE", "s3BucketName": "@@S3_BUCKET_NAME", diff --git a/src/repositories/__tests__/request-repository.test.ts b/src/repositories/__tests__/request-repository.test.ts index 5f30235c3..f2076eef3 100644 --- a/src/repositories/__tests__/request-repository.test.ts +++ b/src/repositories/__tests__/request-repository.test.ts @@ -1038,6 +1038,28 @@ describe('request repository test', () => { const received = await requestRepository.findCompletedForStream(myStreamId) expect(expectedRequest.map(({ id }) => id)).toEqual(received.map(({ id }) => id)) }) + + test('If the completed request for a given stream is too old, return an empty array', async () => { + const myStreamId = randomStreamID().toString() + const after = new Date(Date.now() - 1000 * 60 * 60) + const requests = generateRequests( + { + streamId: myStreamId, + status: RequestStatus.COMPLETED, + updatedAt: new Date(after.getTime() - 1), + }, + 1 + ) + + await requestRepository.createRequests(requests) + + const createdRequests = await requestRepository.allRequests() + expect(requests.length).toEqual(createdRequests.length) + + const received = await requestRepository.findCompletedForStream(myStreamId, 1, after) + expect(received.length).toEqual(0) + }) + test('If there is no completed request for a given stream, return an empty array', async () => { const myStreamId = randomStreamID().toString() const requests = [ diff --git a/src/repositories/request-repository.ts b/src/repositories/request-repository.ts index 8471fd740..5a48cc89e 100644 --- a/src/repositories/request-repository.ts +++ b/src/repositories/request-repository.ts @@ -609,13 +609,23 @@ export class RequestRepository { return returned.map((r) => new Request(r)) } - async findCompletedForStream(streamId: string | StreamID, limit = 1): Promise> { - const found = await this.table + async findCompletedForStream( + streamId: string | StreamID, + limit = 1, + after?: Date + ): Promise> { + const query = this.table .where({ streamId: streamId.toString() }) - .where({ status: RequestStatus.COMPLETED }) + .andWhere({ status: RequestStatus.COMPLETED }) .orderBy('updatedAt', 'desc') .limit(limit) + if (after) { + query.andWhere('updatedAt', '>=', date.encode(after)) + } + + const found = await query + return found.map((r) => new Request(r)) } } diff --git a/src/services/__tests__/ipfs-service.test.ts b/src/services/__tests__/ipfs-service.test.ts index 636d22300..be5ff3748 100644 --- a/src/services/__tests__/ipfs-service.test.ts +++ b/src/services/__tests__/ipfs-service.test.ts @@ -307,6 +307,53 @@ describe('pubsub', () => { expect(receivedTip).toEqual(anchorCid.toString()) }) + test('Will not respond to query message about stream with an anchor if it is too old', async () => { + const pubsubMessage = { typ: 1, id: '1', stream: randomStreamID() } + + const ipfsQueueService = injector.resolve('ipfsQueueService') + const queueSendMessageSpy = jest.spyOn(ipfsQueueService, 'sendMessage') + const handleMessageSpy = jest.spyOn(service, 'handleMessage') + const pubsubSubscribeSpy = jest.spyOn(mockIpfsClient.pubsub, 'subscribe') + pubsubSubscribeSpy.mockImplementationOnce( + (topic: string, onMessage: (message: Message) => void) => { + expect(topic).toEqual('/faux') + onMessage(asIpfsMessage(serialize(pubsubMessage))) + return Promise.resolve() + } + ) + // @ts-ignore + const beforeWindow = new Date(Date.now() - service.pubsubResponderWindowMs - 1000) + const requestRepository = injector.resolve('requestRepository') + const createdRequest = await requestRepository.createOrUpdate( + generateRequest({ + streamId: pubsubMessage.stream.toString(), + status: RequestStatus.COMPLETED, + // @ts-ignore + createdAt: beforeWindow, + updatedAt: beforeWindow, + }) + ) + const anchorRepository = injector.resolve('anchorRepository') + const anchorCid = randomCID() + await anchorRepository.createAnchors([ + { + requestId: createdRequest.id, + proofCid: randomCID(), + path: '0', + cid: anchorCid, + }, + ]) + + // @ts-ignore + service.respondToPubsubQueries = true + await service.init() + + await Utils.delay(1000) + expect(handleMessageSpy).toBeCalledTimes(1) + expect(handleMessageSpy).toBeCalledWith(pubsubMessage) + expect(queueSendMessageSpy).toBeCalledTimes(0) + }) + test('Will ignore non pusub messages', async () => { const pubsubMessage = { typ: 1, id: '1', stream: randomStreamID() } diff --git a/src/services/ipfs-service.ts b/src/services/ipfs-service.ts index 95d71d27f..b75341466 100644 --- a/src/services/ipfs-service.ts +++ b/src/services/ipfs-service.ts @@ -36,6 +36,7 @@ const MAX_CACHE_ENTRIES = 100 export const IPFS_PUT_TIMEOUT = 30 * 1000 // 30 seconds const PUBSUB_DELAY = 100 const DEFAULT_CONCURRENT_GET_LIMIT = 100 +export const DEFAULT_PUBSUB_RESPONDER_WINDOW = 1_000 * 60 * 60 * 24 * 31 * 3 // 3 months function buildHttpAgent(endpoint: string | undefined): HttpAgent { const agentOptions = { @@ -67,6 +68,7 @@ export class IpfsService implements IIpfsService { private pubsub$?: Subscription private readonly respondToPubsubQueries: boolean private readonly resubscribeAfterErrorDelay: number + private readonly pubsubResponderWindowMs: number static inject = ['config', 'ipfsQueueService', 'requestRepository', 'anchorRepository'] as const @@ -86,6 +88,7 @@ export class IpfsService implements IIpfsService { this.codecNames = new Map() this.respondToPubsubQueries = config.mode === AppMode.PUBSUB_RESPONDER ? true : false this.resubscribeAfterErrorDelay = IPFS_RESUBSCRIBE_AFTER_ERROR_DELAY + this.pubsubResponderWindowMs = config.pubsubResponderWindowMs || DEFAULT_PUBSUB_RESPONDER_WINDOW } /** @@ -253,7 +256,11 @@ export class IpfsService implements IIpfsService { const { stream: streamId, id } = message - const completedRequest = await this.requestRepository.findCompletedForStream(streamId, 1) + const completedRequest = await this.requestRepository.findCompletedForStream( + streamId, + 1, + new Date(Date.now() - this.pubsubResponderWindowMs) + ) if (completedRequest.length === 0) { return } @@ -263,6 +270,12 @@ export class IpfsService implements IIpfsService { return } + logger.debug( + `Pubsub responder responding with anchor commit ${ + anchor.cid + } for stream ${streamId.toString()}` + ) + const tipMap = new Map().set(streamId.toString(), anchor.cid) const serializedMessage = serialize({ typ: MsgType.RESPONSE, id, tips: tipMap })