Skip to content

Commit

Permalink
Add pubsub responder window
Browse files Browse the repository at this point in the history
  • Loading branch information
stephhuynh18 committed Dec 8, 2023
1 parent fd425f6 commit c328dd4
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 4 deletions.
1 change: 1 addition & 0 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"requireAuth": false,
"schedulerIntervalMS": 300000,
"schedulerStopAfterNoOp": false,
"pubsubResponderWindowMs": 8035200000,
"carStorage": {
"mode": "inmemory",
"s3BucketName": "myS3Bucket",
Expand Down
1 change: 1 addition & 0 deletions config/env/dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"requireAuth": "@@REQUIRE_AUTH",
"schedulerIntervalMS": "@@SCHEDULER_INTERVAL_MS",
"schedulerStopAfterNoOp": "@@SCHEDULER_STOP_AFTER_NO_OP",
"pubsubResponderWindowMs": "@@PUBSUB_RESPONDER_WINDOW_MS",
"carStorage": {
"mode": "@@MERKLE_CAR_STORAGE_MODE",
"s3BucketName": "@@S3_BUCKET_NAME",
Expand Down
1 change: 1 addition & 0 deletions config/env/prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"requireAuth": "@@REQUIRE_AUTH",
"schedulerIntervalMS": "@@SCHEDULER_INTERVAL_MS",
"schedulerStopAfterNoOp": "@@SCHEDULER_STOP_AFTER_NO_OP",
"pubsubResponderWindowMs": "@@PUBSUB_RESPONDER_WINDOW_MS",
"carStorage": {
"mode": "@@MERKLE_CAR_STORAGE_MODE",
"s3BucketName": "@@S3_BUCKET_NAME",
Expand Down
22 changes: 22 additions & 0 deletions src/repositories/__tests__/request-repository.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,28 @@ describe('request repository test', () => {
const received = await requestRepository.findCompletedForStream(myStreamId)
expect(expectedRequest.map(({ id }) => id)).toEqual(received.map(({ id }) => id))
})

test('If the completed request for a given stream is too old, return an empty array', async () => {
const myStreamId = randomStreamID().toString()
const after = new Date(Date.now() - 1000 * 60 * 60)
const requests = generateRequests(
{
streamId: myStreamId,
status: RequestStatus.COMPLETED,
updatedAt: new Date(after.getTime() - 1),
},
1
)

await requestRepository.createRequests(requests)

const createdRequests = await requestRepository.allRequests()
expect(requests.length).toEqual(createdRequests.length)

const received = await requestRepository.findCompletedForStream(myStreamId, 1, after)
expect(received.length).toEqual(0)
})

test('If there is no completed request for a given stream, return an empty array', async () => {
const myStreamId = randomStreamID().toString()
const requests = [
Expand Down
16 changes: 13 additions & 3 deletions src/repositories/request-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,23 @@ export class RequestRepository {
return returned.map((r) => new Request(r))
}

async findCompletedForStream(streamId: string | StreamID, limit = 1): Promise<Array<Request>> {
const found = await this.table
async findCompletedForStream(
streamId: string | StreamID,
limit = 1,
after?: Date
): Promise<Array<Request>> {
const query = this.table
.where({ streamId: streamId.toString() })
.where({ status: RequestStatus.COMPLETED })
.andWhere({ status: RequestStatus.COMPLETED })
.orderBy('updatedAt', 'desc')
.limit(limit)

if (after) {
query.andWhere('updatedAt', '>=', date.encode(after))
}

const found = await query

return found.map((r) => new Request(r))
}
}
47 changes: 47 additions & 0 deletions src/services/__tests__/ipfs-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,53 @@ describe('pubsub', () => {
expect(receivedTip).toEqual(anchorCid.toString())
})

test('Will not respond to query message about stream with an anchor if it is too old', async () => {
const pubsubMessage = { typ: 1, id: '1', stream: randomStreamID() }

const ipfsQueueService = injector.resolve('ipfsQueueService')
const queueSendMessageSpy = jest.spyOn(ipfsQueueService, 'sendMessage')
const handleMessageSpy = jest.spyOn(service, 'handleMessage')
const pubsubSubscribeSpy = jest.spyOn(mockIpfsClient.pubsub, 'subscribe')
pubsubSubscribeSpy.mockImplementationOnce(
(topic: string, onMessage: (message: Message) => void) => {
expect(topic).toEqual('/faux')
onMessage(asIpfsMessage(serialize(pubsubMessage)))
return Promise.resolve()
}
)
// @ts-ignore
const beforeWindow = new Date(Date.now() - service.pubsubResponderWindowMs - 1000)
const requestRepository = injector.resolve('requestRepository')
const createdRequest = await requestRepository.createOrUpdate(
generateRequest({
streamId: pubsubMessage.stream.toString(),
status: RequestStatus.COMPLETED,
// @ts-ignore
createdAt: beforeWindow,
updatedAt: beforeWindow,
})
)
const anchorRepository = injector.resolve('anchorRepository')
const anchorCid = randomCID()
await anchorRepository.createAnchors([
{
requestId: createdRequest.id,
proofCid: randomCID(),
path: '0',
cid: anchorCid,
},
])

// @ts-ignore
service.respondToPubsubQueries = true
await service.init()

await Utils.delay(1000)
expect(handleMessageSpy).toBeCalledTimes(1)
expect(handleMessageSpy).toBeCalledWith(pubsubMessage)
expect(queueSendMessageSpy).toBeCalledTimes(0)
})

test('Will ignore non pusub messages', async () => {
const pubsubMessage = { typ: 1, id: '1', stream: randomStreamID() }

Expand Down
15 changes: 14 additions & 1 deletion src/services/ipfs-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const MAX_CACHE_ENTRIES = 100
export const IPFS_PUT_TIMEOUT = 30 * 1000 // 30 seconds
const PUBSUB_DELAY = 100
const DEFAULT_CONCURRENT_GET_LIMIT = 100
export const DEFAULT_PUBSUB_RESPONDER_WINDOW = 1_000 * 60 * 60 * 24 * 31 * 3 // 3 months

function buildHttpAgent(endpoint: string | undefined): HttpAgent {
const agentOptions = {
Expand Down Expand Up @@ -67,6 +68,7 @@ export class IpfsService implements IIpfsService {
private pubsub$?: Subscription
private readonly respondToPubsubQueries: boolean
private readonly resubscribeAfterErrorDelay: number
private readonly pubsubResponderWindowMs: number

static inject = ['config', 'ipfsQueueService', 'requestRepository', 'anchorRepository'] as const

Expand All @@ -86,6 +88,7 @@ export class IpfsService implements IIpfsService {
this.codecNames = new Map()
this.respondToPubsubQueries = config.mode === AppMode.PUBSUB_RESPONDER ? true : false
this.resubscribeAfterErrorDelay = IPFS_RESUBSCRIBE_AFTER_ERROR_DELAY
this.pubsubResponderWindowMs = config.pubsubResponderWindowMs || DEFAULT_PUBSUB_RESPONDER_WINDOW
}

/**
Expand Down Expand Up @@ -253,7 +256,11 @@ export class IpfsService implements IIpfsService {

const { stream: streamId, id } = message

const completedRequest = await this.requestRepository.findCompletedForStream(streamId, 1)
const completedRequest = await this.requestRepository.findCompletedForStream(
streamId,
1,
new Date(Date.now() - this.pubsubResponderWindowMs)
)
if (completedRequest.length === 0) {
return
}
Expand All @@ -263,6 +270,12 @@ export class IpfsService implements IIpfsService {
return
}

logger.debug(
`Pubsub responder responding with anchor commit ${
anchor.cid
} for stream ${streamId.toString()}`
)

const tipMap = new Map().set(streamId.toString(), anchor.cid)

const serializedMessage = serialize({ typ: MsgType.RESPONSE, id, tips: tipMap })
Expand Down

0 comments on commit c328dd4

Please sign in to comment.