Skip to content

Commit

Permalink
Implement tx error handling when ibcService.saveIbcChannelData fails (
Browse files Browse the repository at this point in the history
#537)

* rename classes for clarification of their function

* remove unneeded pass through function

* add new database create script, entity, and insertOrUpdate, add TODO for issue 538

* fix lint imports

* fix lint

* format sql

* update entitiy, start test

* update tests

* refactor name of block processing service

* minor refactors

* test refactor

* fix lints

* remove test file

* refactor table name

* fix lint

* fix import lint
  • Loading branch information
nullpointer0x00 authored Aug 26, 2024
1 parent 8028c92 commit fdcfed9
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
SELECT 'Create tx_processing_failure table' AS comment;

CREATE TABLE IF NOT EXISTS tx_processing_failure (
id SERIAL PRIMARY KEY,
block_height INT NOT NULL,
tx_hash VARCHAR(128) NOT NULL,
process_type VARCHAR(64) NOT NULL,
failure_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
error_message TEXT DEFAULT NULL,
retried BOOLEAN NOT NULL DEFAULT FALSE,
success BOOLEAN NOT NULL DEFAULT FALSE,
UNIQUE (block_height, tx_hash, process_type)
);
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ class BlockProposerRecord(id: EntityID<Int>) : IntEntity(id) {

fun getRecordsForProposer(address: String, limit: Int) = transaction {
BlockProposerRecord.find {
(BlockProposerTable.proposerOperatorAddress eq address) and
(BlockProposerTable.blockLatency.isNotNull())
(BlockProposerTable.proposerOperatorAddress eq address) and (BlockProposerTable.blockLatency.isNotNull())
}.orderBy(Pair(BlockProposerTable.blockHeight, SortOrder.DESC))
.limit(limit)
.toList()
Expand Down Expand Up @@ -353,13 +352,14 @@ class BlockCacheHourlyTxCountsRecord(id: EntityID<DateTime>) : Entity<DateTime>(
BlockCacheHourlyTxCountsTable.slice(txSum).selectAll().map { it[txSum] }.first()!!
}

fun getTxCountsForParams(fromDate: DateTime, toDate: DateTime, granularity: DateTruncGranularity) = transaction {
when (granularity) {
DAY, MONTH -> getGranularityCounts(fromDate, toDate, granularity)
HOUR -> getHourlyCounts(fromDate, toDate)
MINUTE -> emptyList()
fun getTxCountsForParams(fromDate: DateTime, toDate: DateTime, granularity: DateTruncGranularity) =
transaction {
when (granularity) {
DAY, MONTH -> getGranularityCounts(fromDate, toDate, granularity)
HOUR -> getHourlyCounts(fromDate, toDate)
MINUTE -> emptyList()
}
}
}

fun getTxHeatmap(fromDate: DateTime? = null, toDate: DateTime? = null) = transaction {
val blockTimestamp = BlockCacheHourlyTxCountsTable.blockTimestamp
Expand Down Expand Up @@ -399,22 +399,23 @@ class BlockCacheHourlyTxCountsRecord(id: EntityID<DateTime>) : Entity<DateTime>(
TxHeatmapRes(result, dayTotals, hourTotals)
}

private fun getGranularityCounts(fromDate: DateTime, toDate: DateTime, granularity: DateTruncGranularity) = transaction {
val dateTrunc = DateTrunc(granularity.name, BlockCacheHourlyTxCountsTable.blockTimestamp)
val txSum = BlockCacheHourlyTxCountsTable.txCount.sum()
BlockCacheHourlyTxCountsTable.slice(dateTrunc, txSum)
.select {
dateTrunc.between(fromDate.startOfDay(), toDate.startOfDay())
}
.groupBy(dateTrunc)
.orderBy(dateTrunc, SortOrder.DESC)
.map {
TxHistory(
it[dateTrunc]!!.withZone(DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss"),
it[txSum]!!
)
}
}
private fun getGranularityCounts(fromDate: DateTime, toDate: DateTime, granularity: DateTruncGranularity) =
transaction {
val dateTrunc = DateTrunc(granularity.name, BlockCacheHourlyTxCountsTable.blockTimestamp)
val txSum = BlockCacheHourlyTxCountsTable.txCount.sum()
BlockCacheHourlyTxCountsTable.slice(dateTrunc, txSum)
.select {
dateTrunc.between(fromDate.startOfDay(), toDate.startOfDay())
}
.groupBy(dateTrunc)
.orderBy(dateTrunc, SortOrder.DESC)
.map {
TxHistory(
it[dateTrunc]!!.withZone(DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss"),
it[txSum]!!
)
}
}

private fun getHourlyCounts(fromDate: DateTime, toDate: DateTime) = transaction {
BlockCacheHourlyTxCountsRecord.find {
Expand Down Expand Up @@ -534,18 +535,14 @@ class BlockTxRetryRecord(id: EntityID<Int>) : IntEntity(id) {
it[this.height] = height
it[this.retried] = true
it[this.success] = false
it[this.errorBlock] =
"NON BLOCKING ERROR: Logged to know what happened, but didnt stop processing.\n " +
e.stackTraceToString()
it[this.errorBlock] = "NON BLOCKING ERROR: Logged to know what happened, but didnt stop processing.\n " + e.stackTraceToString()
}
}

fun insertNonBlockingRetry(height: Int, e: Exception) = transaction {
BlockTxRetryTable.insertIgnore {
it[this.height] = height
it[this.errorBlock] =
"NON BLOCKING ERROR: Logged to know what happened, but didnt stop processing.\n " +
e.stackTraceToString()
it[this.errorBlock] = "NON BLOCKING ERROR: Logged to know what happened, but didnt stop processing.\n " + e.stackTraceToString()
}
}

Expand Down Expand Up @@ -582,3 +579,70 @@ class BlockTxRetryRecord(id: EntityID<Int>) : IntEntity(id) {
var success by BlockTxRetryTable.success
var errorBlock by BlockTxRetryTable.errorBlock
}

object TxProcessingFailureTable : IdTable<Int>(name = "tx_processing_failure") {
val blockHeight = integer("block_height")
val txHash = varchar("tx_hash", 128)
val processType = varchar("process_type", 64)
val failureTime = datetime("failure_time")
val errorMessage = text("error_message").nullable()
val retried = bool("retried").default(false)
val success = bool("success").default(false)

override val id = integer("id").entityId()

init {
index(true, blockHeight, txHash, processType)
}
}

class TxProcessingFailureRecord(id: EntityID<Int>) : IntEntity(id) {
companion object : IntEntityClass<TxProcessingFailureRecord>(TxProcessingFailureTable) {

fun insertOrUpdate(
blockHeight: Int,
txHash: String,
processType: String,
errorMessage: String?,
success: Boolean
) = transaction {
val existingRecord = TxProcessingFailureRecord.find {
(TxProcessingFailureTable.blockHeight eq blockHeight) and
(TxProcessingFailureTable.txHash eq txHash) and
(TxProcessingFailureTable.processType eq processType)
}.firstOrNull()

if (existingRecord == null) {
TxProcessingFailureTable.insertIgnore {
it[this.blockHeight] = blockHeight
it[this.txHash] = txHash
it[this.processType] = processType
it[this.errorMessage] = errorMessage
it[this.success] = success
}
} else {
existingRecord.apply {
this.errorMessage = errorMessage
this.success = success
this.retried = true
this.failureTime = DateTime.now()
}
}
}

fun deleteProcessedRecords() = transaction {
TxProcessingFailureTable.deleteWhere {
(TxProcessingFailureTable.retried eq true) and
(TxProcessingFailureTable.success eq true)
}
}
}

var blockHeight by TxProcessingFailureTable.blockHeight
var txHash by TxProcessingFailureTable.txHash
var processType by TxProcessingFailureTable.processType
var failureTime by TxProcessingFailureTable.failureTime
var errorMessage by TxProcessingFailureTable.errorMessage
var retried by TxProcessingFailureTable.retried
var success by TxProcessingFailureTable.success
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import io.provenance.explorer.model.base.DateTruncGranularity
import io.provenance.explorer.model.base.PREFIX_SCOPE
import io.provenance.explorer.model.base.PagedResults
import io.provenance.explorer.model.base.USD_UPPER
import io.provenance.explorer.service.async.AsyncCachingV2
import io.provenance.explorer.service.async.BlockAndTxProcessor
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
Expand All @@ -88,7 +88,7 @@ class ExplorerService(
private val blockService: BlockService,
private val validatorService: ValidatorService,
private val assetService: AssetService,
private val asyncV2: AsyncCachingV2,
private val asyncV2: BlockAndTxProcessor,
private val govClient: GovGrpcClient,
private val accountClient: AccountGrpcClient,
private val ibcClient: IbcGrpcClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import io.provenance.explorer.model.base.isMAddress
import io.provenance.explorer.model.base.toMAddress
import io.provenance.explorer.model.base.toMAddressScope
import io.provenance.explorer.model.download.TxHistoryChartData
import io.provenance.explorer.service.async.AsyncCachingV2
import io.provenance.explorer.service.async.BlockAndTxProcessor
import io.provenance.explorer.service.async.getAddressType
import org.jetbrains.exposed.dao.id.EntityID
import org.jetbrains.exposed.sql.SizedIterable
Expand All @@ -68,7 +68,7 @@ import javax.servlet.ServletOutputStream
@Service
class TransactionService(
private val protoPrinter: JsonFormat.Printer,
private val asyncV2: AsyncCachingV2,
private val asyncV2: BlockAndTxProcessor,
private val nftService: NftService,
private val valService: ValidatorService,
private val ibcService: IbcService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import io.provenance.explorer.domain.entities.TxMessageRecord
import io.provenance.explorer.domain.entities.TxMsgTypeSubtypeRecord
import io.provenance.explorer.domain.entities.TxMsgTypeSubtypeTable
import io.provenance.explorer.domain.entities.TxNftJoinRecord
import io.provenance.explorer.domain.entities.TxProcessingFailureRecord
import io.provenance.explorer.domain.entities.TxSingleMessageCacheRecord
import io.provenance.explorer.domain.entities.TxSmCodeRecord
import io.provenance.explorer.domain.entities.TxSmContractRecord
Expand Down Expand Up @@ -124,7 +125,7 @@ import org.joda.time.DateTime
import org.springframework.stereotype.Service

@Service
class AsyncCachingV2(
class BlockAndTxProcessor(
private val txClient: TransactionGrpcClient,
private val blockService: BlockService,
private val validatorService: ValidatorService,
Expand All @@ -139,7 +140,7 @@ class AsyncCachingV2(
private val groupService: GroupService
) {

protected val logger = logger(AsyncCachingV2::class)
protected val logger = logger(BlockAndTxProcessor::class)

protected var chainId: String = ""

Expand Down Expand Up @@ -254,45 +255,51 @@ class AsyncCachingV2(
if (pullFromDb) {
transaction {
TxCacheRecord.findByHeight(blockHeight)
.map { addTxToCacheWithTimestamp(it.txV2, blockTime, proposerRec) }
.map { processAndSaveTransactionData(it.txV2, blockTime.toDateTime(), proposerRec) }
}
} else {
runBlocking { txClient.getTxsByHeight(blockHeight, txCount) }
.map { addTxToCacheWithTimestamp(it, blockTime, proposerRec) }
.map { processAndSaveTransactionData(it, blockTime.toDateTime(), proposerRec) }
}
} catch (e: Exception) {
logger.error("Failed to retrieve transactions at block: $blockHeight error: ${e.message}", e)
BlockTxRetryRecord.insertOrUpdate(blockHeight, e)
listOf()
}

fun addTxToCacheWithTimestamp(
res: ServiceOuterClass.GetTxResponse,
blockTime: Timestamp,
proposerRec: BlockProposer
) =
addTxToCache(res, blockTime.toDateTime(), proposerRec)

// Function that saves all the things under a transaction
fun addTxToCache(
fun processAndSaveTransactionData(
res: ServiceOuterClass.GetTxResponse,
blockTime: DateTime,
proposerRec: BlockProposer
): TxUpdatedItems {
val tx = TxCacheRecord.buildInsert(res, blockTime)
val txUpdate = TxUpdate(tx)
val txInfo = TxData(proposerRec.blockHeight, null, res.txResponse.txhash, blockTime)

// TODO: See: https://github.com/provenance-io/explorer-service/issues/538
saveMessages(txInfo, res, txUpdate)
saveTxFees(res, txInfo, txUpdate, proposerRec)
val addrs = saveAddresses(txInfo, res, txUpdate)
val markers = saveMarkers(txInfo, res, txUpdate)
saveNftData(txInfo, res, txUpdate)
saveGovData(res, txInfo, txUpdate)
saveIbcChannelData(res, txInfo, txUpdate)
try {
saveIbcChannelData(res, txInfo, txUpdate)
} catch (e: Exception) {
logger.error("Failed to process IBC channel data for tx ${txInfo.txHash} at height ${txInfo.blockHeight}. Error: ${e.message}")
TxProcessingFailureRecord.insertOrUpdate(
txInfo.blockHeight,
txInfo.txHash,
"ibc_channel_data",
e.stackTraceToString(),
false
)
}
saveSmartContractData(res, txInfo, txUpdate)
saveNameData(res, txInfo)
groupService.saveGroups(res, txInfo, txUpdate)
saveSignaturesTx(res, txInfo, txUpdate)

return TxUpdatedItems(addrs, markers, txUpdate)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ import java.time.ZoneOffset
import javax.annotation.PostConstruct

@Service
class AsyncService(
class ScheduledTaskService(
private val props: ExplorerProperties,
private val blockService: BlockService,
private val assetService: AssetService,
private val govService: GovService,
private val asyncCache: AsyncCachingV2,
private val blockAndTxProcessor: BlockAndTxProcessor,
private val explorerService: ExplorerService,
private val cacheService: CacheService,
private val tokenService: TokenService,
Expand All @@ -103,7 +103,7 @@ class AsyncService(
private val metricsService: MetricsService
) {

protected val logger = logger(AsyncService::class)
protected val logger = logger(ScheduledTaskService::class)
protected var collectHistorical = true

@PostConstruct
Expand Down Expand Up @@ -135,7 +135,7 @@ class AsyncService(
shouldContinue = false
return
}
asyncCache.saveBlockEtc(it)
blockAndTxProcessor.saveBlockEtc(it)
indexHeight = it.block.height() - 1
}
blockService.updateBlockMinHeightIndex(indexHeight + 1)
Expand All @@ -144,7 +144,7 @@ class AsyncService(
} else {
while (indexHeight > index.first!!) {
blockService.getBlockAtHeightFromChain(indexHeight)?.let {
asyncCache.saveBlockEtc(it)
blockAndTxProcessor.saveBlockEtc(it)
indexHeight = it.block.height() - 1
}
}
Expand Down Expand Up @@ -246,7 +246,7 @@ class AsyncService(
logger.info("Retrying block/tx record at $height.")
var retryException: Exception? = null
val block = try {
asyncCache.saveBlockEtc(blockService.getBlockAtHeightFromChain(height), Pair(true, false))!!
blockAndTxProcessor.saveBlockEtc(blockService.getBlockAtHeightFromChain(height), Pair(true, false))!!
} catch (e: Exception) {
retryException = e
logger.error("Error saving block $height on retry.", e)
Expand Down Expand Up @@ -409,7 +409,7 @@ class AsyncService(
(startBlock.toInt()..minOf(props.oneElevenBugRange()!!.last, startBlock.toInt().plus(100))).toList()
.let { BlockCacheRecord.getBlocksForRange(it.first(), it.last()) }
.forEach { block ->
if (block.txCount > 0) asyncCache.saveBlockEtc(block.block, Pair(true, false))
if (block.txCount > 0) blockAndTxProcessor.saveBlockEtc(block.block, Pair(true, false))
// Check if the last processed block equals the end of the fee bug range
if (block.height == props.oneElevenBugRange()!!.last) {
cacheService.updateCacheValue(CacheKeys.FEE_BUG_ONE_ELEVEN_START_BLOCK.key, done)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import io.provenance.explorer.domain.entities.BlockCacheTable
import io.provenance.explorer.service.AccountService
import io.provenance.explorer.service.BlockService
import io.provenance.explorer.service.ValidatorService
import io.provenance.explorer.service.async.AsyncCachingV2
import io.provenance.explorer.service.async.BlockAndTxProcessor
import org.jetbrains.exposed.sql.SortOrder
import org.jetbrains.exposed.sql.transactions.transaction
import org.springframework.stereotype.Service

@Service
class MigrationService(
private val asyncCaching: AsyncCachingV2,
private val asyncCaching: BlockAndTxProcessor,
private val validatorService: ValidatorService,
private val accountService: AccountService,
private val blockService: BlockService
Expand Down
Loading

0 comments on commit fdcfed9

Please sign in to comment.