Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pubsub responder window #1180

Merged
merged 1 commit into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"requireAuth": false,
"schedulerIntervalMS": 300000,
"schedulerStopAfterNoOp": false,
"pubsubResponderWindowMs": 8035200000,
"carStorage": {
"mode": "inmemory",
"s3BucketName": "myS3Bucket",
Expand Down
1 change: 1 addition & 0 deletions config/env/dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"requireAuth": "@@REQUIRE_AUTH",
"schedulerIntervalMS": "@@SCHEDULER_INTERVAL_MS",
"schedulerStopAfterNoOp": "@@SCHEDULER_STOP_AFTER_NO_OP",
"pubsubResponderWindowMs": "@@PUBSUB_RESPONDER_WINDOW_MS",
"carStorage": {
"mode": "@@MERKLE_CAR_STORAGE_MODE",
"s3BucketName": "@@S3_BUCKET_NAME",
Expand Down
1 change: 1 addition & 0 deletions config/env/prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"requireAuth": "@@REQUIRE_AUTH",
"schedulerIntervalMS": "@@SCHEDULER_INTERVAL_MS",
"schedulerStopAfterNoOp": "@@SCHEDULER_STOP_AFTER_NO_OP",
"pubsubResponderWindowMs": "@@PUBSUB_RESPONDER_WINDOW_MS",
"carStorage": {
"mode": "@@MERKLE_CAR_STORAGE_MODE",
"s3BucketName": "@@S3_BUCKET_NAME",
Expand Down
22 changes: 22 additions & 0 deletions src/repositories/__tests__/request-repository.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,28 @@ describe('request repository test', () => {
const received = await requestRepository.findCompletedForStream(myStreamId)
expect(expectedRequest.map(({ id }) => id)).toEqual(received.map(({ id }) => id))
})

test('If the completed request for a given stream is too old, return an empty array', async () => {
const myStreamId = randomStreamID().toString()
const after = new Date(Date.now() - 1000 * 60 * 60)
const requests = generateRequests(
{
streamId: myStreamId,
status: RequestStatus.COMPLETED,
updatedAt: new Date(after.getTime() - 1),
},
1
)

await requestRepository.createRequests(requests)

const createdRequests = await requestRepository.allRequests()
expect(requests.length).toEqual(createdRequests.length)

const received = await requestRepository.findCompletedForStream(myStreamId, 1, after)
expect(received.length).toEqual(0)
})

test('If there is no completed request for a given stream, return an empty array', async () => {
const myStreamId = randomStreamID().toString()
const requests = [
Expand Down
16 changes: 13 additions & 3 deletions src/repositories/request-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,23 @@ export class RequestRepository {
return returned.map((r) => new Request(r))
}

async findCompletedForStream(streamId: string | StreamID, limit = 1): Promise<Array<Request>> {
const found = await this.table
async findCompletedForStream(
streamId: string | StreamID,
limit = 1,
after?: Date
): Promise<Array<Request>> {
const query = this.table
.where({ streamId: streamId.toString() })
.where({ status: RequestStatus.COMPLETED })
.andWhere({ status: RequestStatus.COMPLETED })
.orderBy('updatedAt', 'desc')
.limit(limit)

if (after) {
query.andWhere('updatedAt', '>=', date.encode(after))
}

const found = await query

return found.map((r) => new Request(r))
}
}
47 changes: 47 additions & 0 deletions src/services/__tests__/ipfs-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,53 @@ describe('pubsub', () => {
expect(receivedTip).toEqual(anchorCid.toString())
})

test('Will not respond to query message about stream with an anchor if it is too old', async () => {
const pubsubMessage = { typ: 1, id: '1', stream: randomStreamID() }

const ipfsQueueService = injector.resolve('ipfsQueueService')
const queueSendMessageSpy = jest.spyOn(ipfsQueueService, 'sendMessage')
const handleMessageSpy = jest.spyOn(service, 'handleMessage')
const pubsubSubscribeSpy = jest.spyOn(mockIpfsClient.pubsub, 'subscribe')
pubsubSubscribeSpy.mockImplementationOnce(
(topic: string, onMessage: (message: Message) => void) => {
expect(topic).toEqual('/faux')
onMessage(asIpfsMessage(serialize(pubsubMessage)))
return Promise.resolve()
}
)
// @ts-ignore
const beforeWindow = new Date(Date.now() - service.pubsubResponderWindowMs - 1000)
3benbox marked this conversation as resolved.
Show resolved Hide resolved
const requestRepository = injector.resolve('requestRepository')
const createdRequest = await requestRepository.createOrUpdate(
generateRequest({
streamId: pubsubMessage.stream.toString(),
status: RequestStatus.COMPLETED,
// @ts-ignore
createdAt: beforeWindow,
updatedAt: beforeWindow,
})
)
const anchorRepository = injector.resolve('anchorRepository')
const anchorCid = randomCID()
await anchorRepository.createAnchors([
{
requestId: createdRequest.id,
proofCid: randomCID(),
path: '0',
cid: anchorCid,
},
])

// @ts-ignore
service.respondToPubsubQueries = true
await service.init()

await Utils.delay(1000)
expect(handleMessageSpy).toBeCalledTimes(1)
expect(handleMessageSpy).toBeCalledWith(pubsubMessage)
expect(queueSendMessageSpy).toBeCalledTimes(0)
})

test('Will ignore non pusub messages', async () => {
const pubsubMessage = { typ: 1, id: '1', stream: randomStreamID() }

Expand Down
15 changes: 14 additions & 1 deletion src/services/ipfs-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const MAX_CACHE_ENTRIES = 100
export const IPFS_PUT_TIMEOUT = 30 * 1000 // 30 seconds
const PUBSUB_DELAY = 100
const DEFAULT_CONCURRENT_GET_LIMIT = 100
export const DEFAULT_PUBSUB_RESPONDER_WINDOW = 1_000 * 60 * 60 * 24 * 31 * 3 // 3 months

function buildHttpAgent(endpoint: string | undefined): HttpAgent {
const agentOptions = {
Expand Down Expand Up @@ -67,6 +68,7 @@ export class IpfsService implements IIpfsService {
private pubsub$?: Subscription
private readonly respondToPubsubQueries: boolean
private readonly resubscribeAfterErrorDelay: number
private readonly pubsubResponderWindowMs: number

static inject = ['config', 'ipfsQueueService', 'requestRepository', 'anchorRepository'] as const

Expand All @@ -86,6 +88,7 @@ export class IpfsService implements IIpfsService {
this.codecNames = new Map()
this.respondToPubsubQueries = config.mode === AppMode.PUBSUB_RESPONDER ? true : false
this.resubscribeAfterErrorDelay = IPFS_RESUBSCRIBE_AFTER_ERROR_DELAY
this.pubsubResponderWindowMs = config.pubsubResponderWindowMs || DEFAULT_PUBSUB_RESPONDER_WINDOW
}

/**
Expand Down Expand Up @@ -253,7 +256,11 @@ export class IpfsService implements IIpfsService {

const { stream: streamId, id } = message

const completedRequest = await this.requestRepository.findCompletedForStream(streamId, 1)
const completedRequest = await this.requestRepository.findCompletedForStream(
streamId,
1,
new Date(Date.now() - this.pubsubResponderWindowMs)
)
if (completedRequest.length === 0) {
return
}
Expand All @@ -263,6 +270,12 @@ export class IpfsService implements IIpfsService {
return
}

logger.debug(
`Pubsub responder responding with anchor commit ${
anchor.cid
} for stream ${streamId.toString()}`
)

const tipMap = new Map().set(streamId.toString(), anchor.cid)

const serializedMessage = serialize({ typ: MsgType.RESPONSE, id, tips: tipMap })
Expand Down