Skip to content

Commit

Permalink
feat: StorageReader: Publish ClaimsNotDownloaded (#613)
Browse files Browse the repository at this point in the history
  • Loading branch information
jwicks31 authored Oct 30, 2018
1 parent d1ee925 commit 6d41162
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 8 deletions.
1 change: 1 addition & 0 deletions src/Configuration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ describe('loadConfigurationWithDefaults', async (assert: any) => {
exchangeIpfsHashTxId: 'myPrefix.IPFS_HASH_TX_ID',
exchangePoetAnchorDownloaded: 'myPrefix.POET_ANCHOR_DOWNLOADED',
exchangeClaimsDownloaded: 'myPrefix.CLAIMS_DOWNLOADED',
exchangeClaimsNotDownloaded: 'myPrefix.CLAIMS_NOT_DOWNLOADED',
exchangeStorageWriterStoreNextClaim: 'myPrefix.STORAGE_WRITER::STORE_NEXT_CLAIM',
exchangeGetHealth: 'myPrefix.HEALTH::GET_HEALTH',
}
Expand Down
3 changes: 3 additions & 0 deletions src/Configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export interface ExchangeConfiguration {
readonly exchangeIpfsHashTxId: string
readonly exchangePoetAnchorDownloaded: string
readonly exchangeClaimsDownloaded: string
readonly exchangeClaimsNotDownloaded: string
readonly exchangeStorageWriterStoreNextClaim: string
readonly exchangeGetHealth: string
}
Expand Down Expand Up @@ -130,6 +131,7 @@ const defaultConfiguration: Configuration = {
exchangeIpfsHashTxId: 'IPFS_HASH_TX_ID',
exchangePoetAnchorDownloaded: 'POET_ANCHOR_DOWNLOADED',
exchangeClaimsDownloaded: 'CLAIMS_DOWNLOADED',
exchangeClaimsNotDownloaded: 'CLAIMS_NOT_DOWNLOADED',
exchangeStorageWriterStoreNextClaim: 'STORAGE_WRITER::STORE_NEXT_CLAIM',
exchangeGetHealth: 'HEALTH::GET_HEALTH',
}
Expand Down Expand Up @@ -172,6 +174,7 @@ const applyExchangePrefix = (configVars: any) => {
'exchangeIpfsHashTxId',
'exchangePoetAnchorDownloaded',
'exchangeClaimsDownloaded',
'exchangeClaimsNotDownloaded',
'exchangeStorageWriterStoreNextClaim',
'exchangeGetHealth',
]
Expand Down
11 changes: 11 additions & 0 deletions src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,14 @@ export interface ClaimIdIPFSHashPair {
readonly claimId: string
readonly ipfsFileHash: string
}

export interface IPFSHashFailure {
readonly ipfsFileHash: string
readonly failureType: string
readonly failureReason: string
readonly failureTime: number
}
const hasFailureType = has('failureType')
const hasFailureReason = has('failureReason')
const hasFailureTime = has('failureTime')
export const isIPFSHashFailure = allPass([hasIPFSFileHash, hasFailureReason, hasFailureType, hasFailureTime])
1 change: 1 addition & 0 deletions src/Messaging/ExchangeConfiguration.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export interface ExchangeConfiguration {
readonly poetAnchorDownloaded?: string
readonly claimsDownloaded?: string
readonly claimsNotDownloaded?: string
}
6 changes: 5 additions & 1 deletion src/Messaging/Messaging.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* tslint:disable:no-console */
import { Connection, connect, Channel } from 'amqplib'

import { ClaimIPFSHashPair, isClaimIPFSHashPair } from 'Interfaces'
import { ClaimIPFSHashPair, isClaimIPFSHashPair, IPFSHashFailure } from 'Interfaces'

import { ExchangeConfiguration } from './ExchangeConfiguration'
import { BlockDownloaded, isBlockDownloaded } from './Messages'
Expand Down Expand Up @@ -101,4 +101,8 @@ export class Messaging {
consume(claimIPFSHashPairs)
})
}

publishClaimsNotDownloaded = async (ipfsHashFailure: ReadonlyArray<IPFSHashFailure>) => {
return this.publish(this.exchanges.claimsNotDownloaded, ipfsHashFailure)
}
}
39 changes: 36 additions & 3 deletions src/StorageReader/ClaimController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,26 @@ export class ClaimController {
},
)

const publishEntryFailureReason = async (
ipfsFileHash: string,
failureType: FailureType,
failureReason: FailureReason,
failureTime: number
) => {
const logger = this.logger.child({ method: 'publishEntryFailureReason' })
logger.trace('started publishing')

await this.messaging.publishClaimsNotDownloaded([
{
ipfsFileHash,
failureType,
failureReason,
failureTime,
},
])
logger.trace('finished publishing')
}

const pipe = pipeP(
this.findEntryToDownload,
this.updateEntryAttempts,
Expand All @@ -107,13 +127,26 @@ export class ClaimController {

const handleErrors = async (error: Error) => {
if (error instanceof NoMoreEntriesException) logger.trace(error.message)
else if (error instanceof InvalidClaim)
else if (error instanceof InvalidClaim) {
await updateEntryFailureReason(error.ipfsFileHash, FailureType.Hard, error.failureReason)
else if (error instanceof IPFSTimeoutError)
await publishEntryFailureReason(error.ipfsFileHash, FailureType.Hard, error.failureReason, error.failureTime)
} else if (error instanceof IPFSTimeoutError) {
await updateEntryFailureReason(error.ipfsFileHash, FailureType.Soft, FailureReason.IPFSTimeout)
else if (error instanceof IPFSGenericError) {
await publishEntryFailureReason(
error.ipfsFileHash,
FailureType.Soft,
FailureReason.IPFSTimeout,
error.failureTime
)
} else if (error instanceof IPFSGenericError) {
logger.warn({ error })
await updateEntryFailureReason(error.ipfsFileHash, FailureType.Soft, FailureReason.IPFSGeneric)
await publishEntryFailureReason(
error.ipfsFileHash,
FailureType.Soft,
FailureReason.IPFSGeneric,
error.failureTime
)
} else throw error
}

Expand Down
12 changes: 9 additions & 3 deletions src/StorageReader/Exceptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,37 @@ export class NoMoreEntriesException extends Error {
export class InvalidClaim extends Error {
readonly ipfsFileHash: string
readonly failureReason: FailureReason
readonly failureTime?: number

constructor(ipfsFileHash: string, failureReason: FailureReason) {
constructor(ipfsFileHash: string, failureReason: FailureReason, failureTime = new Date().getTime()) {
super()
this.ipfsFileHash = ipfsFileHash
this.failureReason = failureReason
this.failureTime = failureTime
}
}

export class IPFSGenericError extends Error {
readonly ipfsFileHash: string
readonly underlyingError: Error
readonly failureTime: number

constructor(ipfsFileHash: string, underlyingError: Error) {
constructor(ipfsFileHash: string, underlyingError: Error, failureTime = new Date().getTime()) {
super()
this.ipfsFileHash = ipfsFileHash
this.underlyingError = underlyingError
this.failureTime = failureTime
}
}

export class IPFSTimeoutError extends Error {
readonly ipfsFileHash: string
readonly failureTime: number

constructor(ipfsFileHash: string) {
constructor(ipfsFileHash: string, failureTime = new Date().getTime()) {
super()
this.ipfsFileHash = ipfsFileHash
this.failureTime = failureTime
}
}

Expand Down
1 change: 1 addition & 0 deletions src/StorageReader/ExchangeConfiguration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export interface ExchangeConfiguration {
readonly batchReaderReadNextDirectorySuccess?: string
readonly poetAnchorDownloaded?: string
readonly claimsDownloaded?: string
readonly claimsNotDownloaded?: string
}
5 changes: 4 additions & 1 deletion src/StorageReader/StorageReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ export class StorageReader {
this.mongoClient = await MongoClient.connect(this.configuration.dbUrl)
this.dbConnection = await this.mongoClient.db()

const exchangesMessaging = pick(['poetAnchorDownloaded', 'claimsDownloaded'], this.configuration.exchanges)
const exchangesMessaging = pick(
['poetAnchorDownloaded', 'claimsDownloaded', 'claimsNotDownloaded'],
this.configuration.exchanges,
)
this.messaging = new Messaging(this.configuration.rabbitmqUrl, exchangesMessaging)
await this.messaging.start()

Expand Down
1 change: 1 addition & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ export async function app(localVars: any = {}) {
batchReaderReadNextDirectorySuccess: configuration.exchangeBatchReaderReadNextDirectorySuccess,
poetAnchorDownloaded: configuration.exchangePoetAnchorDownloaded,
claimsDownloaded: configuration.exchangeClaimsDownloaded,
claimsNotDownloaded: configuration.exchangeClaimsNotDownloaded,
},
})
try {
Expand Down

0 comments on commit 6d41162

Please sign in to comment.