From dd93ffb9fb84113c7fc89af80710bf87c3571b91 Mon Sep 17 00:00:00 2001 From: piotrkosecki Date: Thu, 14 Oct 2021 10:45:01 +0200 Subject: [PATCH 1/6] wip --- conseil-lorre/src/main/resources/application.conf | 7 +++++++ .../conseil/indexer/config/LorreConfiguration.scala | 6 ++++++ .../conseil/indexer/forks/ForkDetector.scala | 8 ++++++++ .../conseil/indexer/forks/ForkHandler.scala | 12 ++++++++++++ .../indexer/tezos/TezosDatabaseOperations.scala | 5 +++++ .../conseil/indexer/tezos/TezosIndexer.scala | 11 +++++++++++ 6 files changed, 49 insertions(+) diff --git a/conseil-lorre/src/main/resources/application.conf b/conseil-lorre/src/main/resources/application.conf index 5eed60d1e..1f407d48c 100644 --- a/conseil-lorre/src/main/resources/application.conf +++ b/conseil-lorre/src/main/resources/application.conf @@ -99,4 +99,11 @@ lorre { interval: 360 minutes # interval between fetches } + fork-handling { + backtrack-levels: 100 //how many levels back we should check for the forks + backtrack-levels: ${?CONSEIL_LORRE_FORK_HANDLING_BAKCTRACK_LEVELS} + backtrack-interval: 120 // every how many iterations we should check for forks + backtrack-interval: ${?CONSEIL_LORRE_FORK_HANDLING_BAKCTRACK_INTERVAL} + } + } \ No newline at end of file diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/config/LorreConfiguration.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/config/LorreConfiguration.scala index ce601325a..3b576816b 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/config/LorreConfiguration.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/config/LorreConfiguration.scala @@ -24,6 +24,7 @@ final case class LorreConfiguration( blockRightsFetching: BakingAndEndorsingRights, tokenContracts: TokenContracts, metadataFetching: TzipMetadata, + forkHandling: ForkHandling, enabledFeatures: Features ) @@ -67,6 +68,11 @@ final case class TokenContracts( interval: FiniteDuration ) +final case class ForkHandling( + backtrackLevels: Int, + backtrackInterval: Int +) + /** sodium library references */ final case class SodiumConfiguration(libraryPath: String) extends AnyVal with Product with Serializable diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkDetector.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkDetector.scala index b89225e04..de5e3bf18 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkDetector.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkDetector.scala @@ -68,6 +68,14 @@ class ForkDetector[Eff[_]: Monad, BlockId: Eq]( else ForkedId ) + def checkDepthLevel(level: Long, depth: Long): Eff[List[Long]] = { + val sth = for { + x <- level to (level - depth) by -1 + } yield checkOnLevel(x).map(x -> _) + sth.toList.sequence.map(_.filter(x => x._2 == ForkedId).map(_._1)) + } + + /** Assuming a fork was detected for a specific level (i.e. high), * the algorithm backtracks to find the exact lowest level where the * locally indexed blocks start diverging from the forked blockchain. diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkHandler.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkHandler.scala index a9052c864..4e80fb794 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkHandler.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkHandler.scala @@ -37,4 +37,16 @@ abstract class ForkHandler[Eff[_]: Monad, BlockId: Eq]( amendment <- amender.amendFork(forkLevel, forkBlockId, currentHeadLevel, Instant.now()) } yield amendment.some } + + def handleForkFrom(currentHeadLevel: Long, depth: Long) = { + detector.checkDepthLevel(currentHeadLevel, depth).flatMap { + case Nil => Option.empty.pure[Eff] + case xs => + val forkLevel = xs.min + for { + forkBlockId <- indexerSearch.searchForLevel(forkLevel) + amendment <- amender.amendFork(forkLevel, forkBlockId, currentHeadLevel, Instant.now()) + } yield amendment.some + } + } } diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala index 78bd5298d..dce6e0454 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala @@ -422,6 +422,11 @@ object TezosDatabaseOperations extends ConseilLogSupport { } } + + def getLastBlocks(amount: Int): DBIO[Seq[Tables.BlocksRow]] = { + Tables.Blocks.sortBy(b => b.level.desc).take(amount).result + } + /** * Upserts baking rights to the database * @param bakingRightsMap mapping of hash to bakingRights list diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala index d75a6403e..562402670 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala @@ -113,6 +113,15 @@ class TezosIndexer private ( iteration: Int, accountResetEvents: AccountResetEvents ): Unit = { + /* +take x blocks +check for + */ + val backtrackLevels = lorreConf.forkHandling.backtrackLevels + val backtrackInterval = lorreConf.forkHandling.backtrackInterval + +// def processForks + val processing = for { maxLevel <- indexedData.fetchMaxLevel reloadedAccountEvents <- processFork(maxLevel) @@ -157,6 +166,8 @@ class TezosIndexer private ( } } + + /** Search for any possible forks happened between the last sync cycle and now. * If a fork is detected, corrections will be applied. * From b93020fca502ec910374fcb24d2a22f9311fd6e5 Mon Sep 17 00:00:00 2001 From: piotrkosecki Date: Fri, 15 Oct 2021 16:44:38 +0200 Subject: [PATCH 2/6] wip --- .../conseil/indexer/tezos/TezosIndexer.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala index 562402670..9bde4380e 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala @@ -194,6 +194,25 @@ check for } else emptyOutcome } + private def processLastForks(maxIndexedLevel: BlockLevel, depth: Long): Future[Option[AccountResetEvents]] = { + lazy val emptyOutcome = Future.successful(Option.empty) + if (featureFlags.forkHandlingIsOn && maxIndexedLevel != indexedData.defaultBlockLevel) + forkHandler.handleForkFrom(maxIndexedLevel, depth).flatMap { + case None => + logger.debug(s"No fork detected up to $maxIndexedLevel") + emptyOutcome + case Some((forkId, invalidations)) => + logger.warn( + s"A fork was detected somewhere before the currently indexed level $maxIndexedLevel. $invalidations entries were invalidated and connected to fork $forkId" + ) + /* locally processed events were invalidated on db, we need to reload them afresh */ + accountsResetHandler + .unprocessedResetRequestLevels(lorreConf.chainEvents) + .map(Some(_)) + + } else emptyOutcome + } + /** * Fetches all blocks not in the database from the Tezos network and adds them to the database. * Additionally stores account references that needs updating, too From d2c9ac3fa1de70d0b2f7095d3ced7911130347cf Mon Sep 17 00:00:00 2001 From: piotrkosecki Date: Tue, 19 Oct 2021 15:59:10 +0200 Subject: [PATCH 3/6] wip --- .../cryptonomic/conseil/indexer/tezos/TezosIndexer.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala index 9bde4380e..6610dafc1 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala @@ -113,18 +113,13 @@ class TezosIndexer private ( iteration: Int, accountResetEvents: AccountResetEvents ): Unit = { - /* -take x blocks -check for - */ val backtrackLevels = lorreConf.forkHandling.backtrackLevels val backtrackInterval = lorreConf.forkHandling.backtrackInterval -// def processForks - val processing = for { maxLevel <- indexedData.fetchMaxLevel reloadedAccountEvents <- processFork(maxLevel) + unhandled <- accountsResetHandler.applyUnhandledAccountsResets( reloadedAccountEvents.getOrElse(accountResetEvents) ) From c8a8d87f9b683267ee95aff3985d06b17d36452a Mon Sep 17 00:00:00 2001 From: piotrkosecki Date: Thu, 21 Oct 2021 12:16:48 +0200 Subject: [PATCH 4/6] added handling of fork backtracking mechanism, turned off logs for testing --- conseil-lorre/src/main/resources/logging.conf | 2 +- .../conseil/indexer/forks/ForkDetector.scala | 10 ++++++++-- .../conseil/indexer/forks/ForkHandler.scala | 8 +++++++- .../conseil/indexer/tezos/TezosIndexer.scala | 15 +++++++++------ 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/conseil-lorre/src/main/resources/logging.conf b/conseil-lorre/src/main/resources/logging.conf index ed93a9e6e..ec866a8dd 100644 --- a/conseil-lorre/src/main/resources/logging.conf +++ b/conseil-lorre/src/main/resources/logging.conf @@ -1,5 +1,5 @@ logging: { - muted: false + muted: true enable-json-output: false enable-json-output: ${?CONSEIL_ENABLE_JSON_OUTPUT} diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkDetector.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkDetector.scala index de5e3bf18..e7cf07e74 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkDetector.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkDetector.scala @@ -68,11 +68,17 @@ class ForkDetector[Eff[_]: Monad, BlockId: Eq]( else ForkedId ) + /** + * Checks if there is a fork from given level down to (level - depth) + * @param level - from which level we start check for fork + * @param depth - how deep we check for forks + * @return a list of failed fork checks of levels + */ def checkDepthLevel(level: Long, depth: Long): Eff[List[Long]] = { - val sth = for { + val result = for { x <- level to (level - depth) by -1 } yield checkOnLevel(x).map(x -> _) - sth.toList.sequence.map(_.filter(x => x._2 == ForkedId).map(_._1)) + result.toList.sequence.map(_.filter(x => x._2 == ForkedId).map(_._1)) } diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkHandler.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkHandler.scala index 4e80fb794..58c5eaa57 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkHandler.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkHandler.scala @@ -38,7 +38,13 @@ abstract class ForkHandler[Eff[_]: Monad, BlockId: Eq]( } yield amendment.some } - def handleForkFrom(currentHeadLevel: Long, depth: Long) = { + /** + * Looks for forks in from head down to (head - depth), finds mn level when fork happened and amends forks + * @param currentHeadLevel current level of the latest block + * @param depth how deep we want to search for the forks + * @return None if no fork happened, or the result of data amendment + */ + def handleForkFrom(currentHeadLevel: Long, depth: Long): Eff[Option[ForkAmender.Results]] = { detector.checkDepthLevel(currentHeadLevel, depth).flatMap { case Nil => Option.empty.pure[Eff] case xs => diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala index 6610dafc1..236dc244f 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala @@ -119,9 +119,10 @@ class TezosIndexer private ( val processing = for { maxLevel <- indexedData.fetchMaxLevel reloadedAccountEvents <- processFork(maxLevel) + lastReloadedAccountEvents <- processLastForks(maxLevel, backtrackLevels, backtrackInterval, iteration) unhandled <- accountsResetHandler.applyUnhandledAccountsResets( - reloadedAccountEvents.getOrElse(accountResetEvents) + reloadedAccountEvents.orElse(lastReloadedAccountEvents).getOrElse(accountResetEvents) ) _ <- processTezosBlocks(maxLevel) _ <- feeOperations @@ -189,16 +190,18 @@ class TezosIndexer private ( } else emptyOutcome } - private def processLastForks(maxIndexedLevel: BlockLevel, depth: Long): Future[Option[AccountResetEvents]] = { + private def processLastForks(maxIndexedLevel: BlockLevel, depth: Long, interval: Long, iteration: Long): Future[Option[AccountResetEvents]] = { lazy val emptyOutcome = Future.successful(Option.empty) - if (featureFlags.forkHandlingIsOn && maxIndexedLevel != indexedData.defaultBlockLevel) + println(s"checking iteration $iteration") + println(s"$featureFlags") + if (featureFlags.forkHandlingIsOn && maxIndexedLevel != indexedData.defaultBlockLevel && iteration % interval == 0) forkHandler.handleForkFrom(maxIndexedLevel, depth).flatMap { case None => - logger.debug(s"No fork detected up to $maxIndexedLevel") + println(s"AFH: No fork detected up to $maxIndexedLevel") emptyOutcome case Some((forkId, invalidations)) => - logger.warn( - s"A fork was detected somewhere before the currently indexed level $maxIndexedLevel. $invalidations entries were invalidated and connected to fork $forkId" + println( + s"AFH: A fork was detected somewhere before the currently indexed level $maxIndexedLevel. $invalidations entries were invalidated and connected to fork $forkId" ) /* locally processed events were invalidated on db, we need to reload them afresh */ accountsResetHandler From 585ec59e62dda7d391ad562fbf093de7cbb4382f Mon Sep 17 00:00:00 2001 From: piotrkosecki Date: Fri, 29 Oct 2021 18:18:31 +0200 Subject: [PATCH 5/6] implemented fork fixing using batch fetching, added big maps fork invalidation, more small fixes --- ...nericPlatformDiscoveryOperationsTest.scala | 9 +- .../conseil/common/tezos/Tables.scala | 143 ++++++++++++++---- .../src/main/resources/application.conf | 4 +- conseil-lorre/src/main/resources/logging.conf | 2 +- .../conseil/indexer/forks/ForkDetector.scala | 14 -- .../conseil/indexer/forks/ForkHandler.scala | 18 --- .../tezos/TezosDatabaseOperations.scala | 8 +- .../conseil/indexer/tezos/TezosIndexer.scala | 29 +++- .../indexer/tezos/TezosNodeFetchers.scala | 38 +++++ .../tezos/bigmaps/BigMapsConversions.scala | 9 +- .../tezos/bigmaps/BigMapsOperations.scala | 4 +- .../forks/BacktracingForkProcessor.scala | 71 +++++++++ .../forks/TezosForkInvalidatingAmender.scala | 3 + .../michelson/contracts/TNSContract.scala | 2 +- .../tezos/JsonDecodersTestFixtures.scala | 12 +- .../tezos/bigmaps/BigMapsOperationsTest.scala | 58 ++++--- sql/conseil.sql | 16 +- 17 files changed, 329 insertions(+), 111 deletions(-) create mode 100644 conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/forks/BacktracingForkProcessor.scala diff --git a/conseil-api/src/test/scala/tech/cryptonomic/conseil/api/routes/platform/discovery/GenericPlatformDiscoveryOperationsTest.scala b/conseil-api/src/test/scala/tech/cryptonomic/conseil/api/routes/platform/discovery/GenericPlatformDiscoveryOperationsTest.scala index 00ff088a6..eb7abcd29 100644 --- a/conseil-api/src/test/scala/tech/cryptonomic/conseil/api/routes/platform/discovery/GenericPlatformDiscoveryOperationsTest.scala +++ b/conseil-api/src/test/scala/tech/cryptonomic/conseil/api/routes/platform/discovery/GenericPlatformDiscoveryOperationsTest.scala @@ -447,7 +447,10 @@ class GenericPlatformDiscoveryOperationsTest Set( Attribute("big_map_id", "Big map id", DataType.Decimal, None, KeyType.UniqueKey, "big_maps"), Attribute("key_type", "Key type", DataType.String, None, KeyType.NonKey, "big_maps"), - Attribute("value_type", "Value type", DataType.String, None, KeyType.NonKey, "big_maps") + Attribute("value_type", "Value type", DataType.String, None, KeyType.NonKey, "big_maps"), + Attribute("fork_id", "Fork id", DataType.String, None, KeyType.UniqueKey, "big_maps"), + Attribute("block_level", "Block level", DataType.LargeInt, None, KeyType.NonKey, "big_maps"), + Attribute("invalidated_asof", "Invalidated asof", DataType.DateTime, None, KeyType.NonKey, "big_maps") ) ) } @@ -472,7 +475,9 @@ class GenericPlatformDiscoveryOperationsTest Attribute("block_level", "Block level", DataType.LargeInt, None, KeyType.NonKey, "big_map_contents"), Attribute("timestamp", "Timestamp", DataType.DateTime, None, KeyType.NonKey, "big_map_contents"), Attribute("cycle", "Cycle", DataType.Int, None, KeyType.NonKey, "big_map_contents"), - Attribute("period", "Period", DataType.Int, None, KeyType.NonKey, "big_map_contents") + Attribute("period", "Period", DataType.Int, None, KeyType.NonKey, "big_map_contents"), + Attribute("fork_id", "Fork id", DataType.String, None, KeyType.UniqueKey, "big_map_contents"), + Attribute("invalidated_asof", "Invalidated asof", DataType.DateTime, None, KeyType.NonKey, "big_map_contents") ) ) } diff --git a/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/tezos/Tables.scala b/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/tezos/Tables.scala index 06ac7a982..04076f6e3 100644 --- a/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/tezos/Tables.scala +++ b/conseil-common/src/main/scala/tech/cryptonomic/conseil/common/tezos/Tables.scala @@ -1486,7 +1486,9 @@ trait Tables { * @param blockLevel Database column block_level SqlType(int8), Default(None) * @param timestamp Database column timestamp SqlType(timestamp), Default(None) * @param cycle Database column cycle SqlType(int4), Default(None) - * @param period Database column period SqlType(int4), Default(None) */ + * @param period Database column period SqlType(int4), Default(None) + * @param forkId Database column fork_id SqlType(varchar) + * @param invalidatedAsof Database column invalidated_asof SqlType(timestamp), Default(None) */ case class BigMapContentsRow( bigMapId: scala.math.BigDecimal, key: String, @@ -1497,7 +1499,9 @@ trait Tables { blockLevel: Option[Long] = None, timestamp: Option[java.sql.Timestamp] = None, cycle: Option[Int] = None, - period: Option[Int] = None + period: Option[Int] = None, + forkId: String, + invalidatedAsof: Option[java.sql.Timestamp] = None ) /** GetResult implicit for fetching BigMapContentsRow objects using plain SQL queries */ @@ -1521,7 +1525,9 @@ trait Tables { < (BigMapContentsRow.tupled, BigMapContentsRow.unapply) + ( + bigMapId, + key, + keyHash, + operationGroupId, + value, + valueMicheline, + blockLevel, + timestamp, + cycle, + period, + forkId, + invalidatedAsof + ) <> (BigMapContentsRow.tupled, BigMapContentsRow.unapply) /** Maps whole row to an option. Useful for outer joins. */ def ? = @@ -1545,11 +1564,14 @@ trait Tables { blockLevel, timestamp, cycle, - period + period, + Rep.Some(forkId), + invalidatedAsof ) ).shaped.<>( { r => - import r._; _1.map(_ => BigMapContentsRow.tupled((_1.get, _2.get, _3, _4, _5, _6, _7, _8, _9, _10))) + import r._; + _1.map(_ => BigMapContentsRow.tupled((_1.get, _2.get, _3, _4, _5, _6, _7, _8, _9, _10, _11.get, _12))) }, (_: Any) => throw new Exception("Inserting into ? projection not supported.") ) @@ -1584,8 +1606,15 @@ trait Tables { /** Database column period SqlType(int4), Default(None) */ val period: Rep[Option[Int]] = column[Option[Int]]("period", O.Default(None)) + /** Database column fork_id SqlType(varchar) */ + val forkId: Rep[String] = column[String]("fork_id") + + /** Database column invalidated_asof SqlType(timestamp), Default(None) */ + val invalidatedAsof: Rep[Option[java.sql.Timestamp]] = + column[Option[java.sql.Timestamp]]("invalidated_asof", O.Default(None)) + /** Primary key of BigMapContents (database name big_map_contents_pkey) */ - val pk = primaryKey("big_map_contents_pkey", (bigMapId, key)) + val pk = primaryKey("big_map_contents_pkey", (bigMapId, key, forkId)) /** Index over (bigMapId) (database name big_map_id_idx) */ val index1 = index("big_map_id_idx", bigMapId) @@ -1609,7 +1638,9 @@ trait Tables { * @param blockLevel Database column block_level SqlType(int8), Default(None) * @param timestamp Database column timestamp SqlType(timestamp), Default(None) * @param cycle Database column cycle SqlType(int4), Default(None) - * @param period Database column period SqlType(int4), Default(None) */ + * @param period Database column period SqlType(int4), Default(None) + * @param forkId Database column fork_id SqlType(varchar) + * @param invalidatedAsof Database column invalidated_asof SqlType(timestamp), Default(None) */ case class BigMapContentsHistoryRow( bigMapId: scala.math.BigDecimal, key: String, @@ -1619,7 +1650,9 @@ trait Tables { blockLevel: Option[Long] = None, timestamp: Option[java.sql.Timestamp] = None, cycle: Option[Int] = None, - period: Option[Int] = None + period: Option[Int] = None, + forkId: String, + invalidatedAsof: Option[java.sql.Timestamp] = None ) /** GetResult implicit for fetching BigMapContentsHistoryRow objects using plain SQL queries */ @@ -1642,7 +1675,9 @@ trait Tables { < (BigMapContentsHistoryRow.tupled, BigMapContentsHistoryRow.unapply) + (bigMapId, key, keyHash, operationGroupId, value, blockLevel, timestamp, cycle, period, forkId, invalidatedAsof) <> (BigMapContentsHistoryRow.tupled, BigMapContentsHistoryRow.unapply) /** Maps whole row to an option. Useful for outer joins. */ def ? = - ((Rep.Some(bigMapId), Rep.Some(key), keyHash, operationGroupId, value, blockLevel, timestamp, cycle, period)).shaped - .<>( - { r => - import r._; _1.map(_ => BigMapContentsHistoryRow.tupled((_1.get, _2.get, _3, _4, _5, _6, _7, _8, _9))) - }, - (_: Any) => throw new Exception("Inserting into ? projection not supported.") + ( + ( + Rep.Some(bigMapId), + Rep.Some(key), + keyHash, + operationGroupId, + value, + blockLevel, + timestamp, + cycle, + period, + Rep.Some(forkId), + invalidatedAsof ) + ).shaped.<>( + { r => + import r._; + _1.map(_ => BigMapContentsHistoryRow.tupled((_1.get, _2.get, _3, _4, _5, _6, _7, _8, _9, _10.get, _11))) + }, + (_: Any) => throw new Exception("Inserting into ? projection not supported.") + ) /** Database column big_map_id SqlType(numeric) */ val bigMapId: Rep[scala.math.BigDecimal] = column[scala.math.BigDecimal]("big_map_id") @@ -1689,46 +1738,80 @@ trait Tables { /** Database column period SqlType(int4), Default(None) */ val period: Rep[Option[Int]] = column[Option[Int]]("period", O.Default(None)) + + /** Database column fork_id SqlType(varchar) */ + val forkId: Rep[String] = column[String]("fork_id") + + /** Database column invalidated_asof SqlType(timestamp), Default(None) */ + val invalidatedAsof: Rep[Option[java.sql.Timestamp]] = + column[Option[java.sql.Timestamp]]("invalidated_asof", O.Default(None)) } /** Collection-like TableQuery object for table BigMapContentsHistory */ lazy val BigMapContentsHistory = new TableQuery(tag => new BigMapContentsHistory(tag)) /** Entity class storing rows of table BigMaps - * @param bigMapId Database column big_map_id SqlType(numeric), PrimaryKey + * @param bigMapId Database column big_map_id SqlType(numeric) * @param keyType Database column key_type SqlType(varchar), Default(None) - * @param valueType Database column value_type SqlType(varchar), Default(None) */ + * @param valueType Database column value_type SqlType(varchar), Default(None) + * @param forkId Database column fork_id SqlType(varchar) + * @param blockLevel Database column block_level SqlType(int8), Default(None) + * @param invalidatedAsof Database column invalidated_asof SqlType(timestamp), Default(None) */ case class BigMapsRow( bigMapId: scala.math.BigDecimal, keyType: Option[String] = None, - valueType: Option[String] = None + valueType: Option[String] = None, + forkId: String, + blockLevel: Option[Long] = None, + invalidatedAsof: Option[java.sql.Timestamp] = None ) /** GetResult implicit for fetching BigMapsRow objects using plain SQL queries */ - implicit def GetResultBigMapsRow(implicit e0: GR[scala.math.BigDecimal], e1: GR[Option[String]]): GR[BigMapsRow] = - GR { prs => - import prs._ - BigMapsRow.tupled((<<[scala.math.BigDecimal], < + import prs._ + BigMapsRow.tupled( + (<<[scala.math.BigDecimal], < (BigMapsRow.tupled, BigMapsRow.unapply) + def * = + (bigMapId, keyType, valueType, forkId, blockLevel, invalidatedAsof) <> (BigMapsRow.tupled, BigMapsRow.unapply) /** Maps whole row to an option. Useful for outer joins. */ def ? = - ((Rep.Some(bigMapId), keyType, valueType)).shaped.<>({ r => - import r._; _1.map(_ => BigMapsRow.tupled((_1.get, _2, _3))) + ((Rep.Some(bigMapId), keyType, valueType, Rep.Some(forkId), blockLevel, invalidatedAsof)).shaped.<>({ r => + import r._; _1.map(_ => BigMapsRow.tupled((_1.get, _2, _3, _4.get, _5, _6))) }, (_: Any) => throw new Exception("Inserting into ? projection not supported.")) - /** Database column big_map_id SqlType(numeric), PrimaryKey */ - val bigMapId: Rep[scala.math.BigDecimal] = column[scala.math.BigDecimal]("big_map_id", O.PrimaryKey) + /** Database column big_map_id SqlType(numeric) */ + val bigMapId: Rep[scala.math.BigDecimal] = column[scala.math.BigDecimal]("big_map_id") /** Database column key_type SqlType(varchar), Default(None) */ val keyType: Rep[Option[String]] = column[Option[String]]("key_type", O.Default(None)) /** Database column value_type SqlType(varchar), Default(None) */ val valueType: Rep[Option[String]] = column[Option[String]]("value_type", O.Default(None)) + + /** Database column fork_id SqlType(varchar) */ + val forkId: Rep[String] = column[String]("fork_id") + + /** Database column block_level SqlType(int8), Default(None) */ + val blockLevel: Rep[Option[Long]] = column[Option[Long]]("block_level", O.Default(None)) + + /** Database column invalidated_asof SqlType(timestamp), Default(None) */ + val invalidatedAsof: Rep[Option[java.sql.Timestamp]] = + column[Option[java.sql.Timestamp]]("invalidated_asof", O.Default(None)) + + /** Primary key of BigMaps (database name big_maps_pkey) */ + val pk = primaryKey("big_maps_pkey", (bigMapId, forkId)) } /** Collection-like TableQuery object for table BigMaps */ diff --git a/conseil-lorre/src/main/resources/application.conf b/conseil-lorre/src/main/resources/application.conf index 1f407d48c..302f1aa77 100644 --- a/conseil-lorre/src/main/resources/application.conf +++ b/conseil-lorre/src/main/resources/application.conf @@ -37,10 +37,10 @@ lorre { block-rights-fetching-is-on: true block-rights-fetching-is-on: ${?CONSEIL_LORRE_BLOCK_RIGHTS_FETCHING_ENABLED} metadata-fetching-is-on: false - metadata-fetching-is-on: ${?CONSEIL_LORRE_BLOCK_RIGHTS_FETCHING_ENABLED} + metadata-fetching-is-on: ${?CONSEIL_LORRE_METADATA_FETCHING_ENABLED} fork-handling-is-on: false fork-handling-is-on: ${?CONSEIL_LORRE_FORK_DETECTION_ENABLED} - registered-tokens-is-on: true + registered-tokens-is-on: false registered-tokens-is-on: ${?CONSEIL_LORRE_REGISTERED_TOKENS_FETCHING_ENABLED} } diff --git a/conseil-lorre/src/main/resources/logging.conf b/conseil-lorre/src/main/resources/logging.conf index ec866a8dd..ed93a9e6e 100644 --- a/conseil-lorre/src/main/resources/logging.conf +++ b/conseil-lorre/src/main/resources/logging.conf @@ -1,5 +1,5 @@ logging: { - muted: true + muted: false enable-json-output: false enable-json-output: ${?CONSEIL_ENABLE_JSON_OUTPUT} diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkDetector.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkDetector.scala index e7cf07e74..b89225e04 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkDetector.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkDetector.scala @@ -68,20 +68,6 @@ class ForkDetector[Eff[_]: Monad, BlockId: Eq]( else ForkedId ) - /** - * Checks if there is a fork from given level down to (level - depth) - * @param level - from which level we start check for fork - * @param depth - how deep we check for forks - * @return a list of failed fork checks of levels - */ - def checkDepthLevel(level: Long, depth: Long): Eff[List[Long]] = { - val result = for { - x <- level to (level - depth) by -1 - } yield checkOnLevel(x).map(x -> _) - result.toList.sequence.map(_.filter(x => x._2 == ForkedId).map(_._1)) - } - - /** Assuming a fork was detected for a specific level (i.e. high), * the algorithm backtracks to find the exact lowest level where the * locally indexed blocks start diverging from the forked blockchain. diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkHandler.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkHandler.scala index 58c5eaa57..a9052c864 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkHandler.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/forks/ForkHandler.scala @@ -37,22 +37,4 @@ abstract class ForkHandler[Eff[_]: Monad, BlockId: Eq]( amendment <- amender.amendFork(forkLevel, forkBlockId, currentHeadLevel, Instant.now()) } yield amendment.some } - - /** - * Looks for forks in from head down to (head - depth), finds mn level when fork happened and amends forks - * @param currentHeadLevel current level of the latest block - * @param depth how deep we want to search for the forks - * @return None if no fork happened, or the result of data amendment - */ - def handleForkFrom(currentHeadLevel: Long, depth: Long): Eff[Option[ForkAmender.Results]] = { - detector.checkDepthLevel(currentHeadLevel, depth).flatMap { - case Nil => Option.empty.pure[Eff] - case xs => - val forkLevel = xs.min - for { - forkBlockId <- indexerSearch.searchForLevel(forkLevel) - amendment <- amender.amendFork(forkLevel, forkBlockId, currentHeadLevel, Instant.now()) - } yield amendment.some - } - } } diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala index dce6e0454..7b59d7263 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosDatabaseOperations.scala @@ -422,11 +422,6 @@ object TezosDatabaseOperations extends ConseilLogSupport { } } - - def getLastBlocks(amount: Int): DBIO[Seq[Tables.BlocksRow]] = { - Tables.Blocks.sortBy(b => b.level.desc).take(amount).result - } - /** * Upserts baking rights to the database * @param bakingRightsMap mapping of hash to bakingRights list @@ -1030,6 +1025,9 @@ object TezosDatabaseOperations extends ConseilLogSupport { lazy val tokenBalances = EntityTableInvalidator(TokenBalances)(_.blockLevel, _.invalidatedAsof, _.forkId) lazy val governance = EntityTableInvalidator(Governance)(_.level.ifNull(-1L), _.invalidatedAsof, _.forkId) lazy val fees = EntityTableInvalidator(Fees)(_.level.ifNull(-1L), _.invalidatedAsof, _.forkId) + lazy val bigMaps = EntityTableInvalidator(BigMaps)(_.blockLevel.ifNull(-1L), _.invalidatedAsof, _.forkId) + lazy val bigMapContents = EntityTableInvalidator(BigMapContents)(_.blockLevel.ifNull(-1L), _.invalidatedAsof, _.forkId) + lazy val bigMapContentsHistory = EntityTableInvalidator(BigMapContentsHistory)(_.blockLevel.ifNull(-1L), _.invalidatedAsof, _.forkId) /** Deletes entries for the registry of processed chain events. * Due to a fork, those events will need be processed again over the new fork diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala index 236dc244f..626dcdac7 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala @@ -23,7 +23,11 @@ import tech.cryptonomic.conseil.indexer.config._ import tech.cryptonomic.conseil.indexer.forks.ForkHandler import tech.cryptonomic.conseil.indexer.logging.LorreProgressLogging import tech.cryptonomic.conseil.indexer.tezos.TezosErrors._ -import tech.cryptonomic.conseil.indexer.tezos.forks.{TezosForkInvalidatingAmender, TezosForkSearchEngine} +import tech.cryptonomic.conseil.indexer.tezos.forks.{ + BacktracingForkProcessor, + TezosForkInvalidatingAmender, + TezosForkSearchEngine +} import tech.cryptonomic.conseil.indexer.tezos.processing._ import tech.cryptonomic.conseil.indexer.tezos.processing.AccountsResetHandler.{AccountResetEvents, UnhandledResetEvents} @@ -58,6 +62,7 @@ class TezosIndexer private ( accountsResetHandler: AccountsResetHandler, registeredTokensFetcher: RegisteredTokensFetcher, forkHandler: ForkHandler[Future, TezosBlockHash], + backtrackingForkProcessor: BacktracingForkProcessor, feeOperations: TezosFeeOperations, terminationSequence: () => Future[ShutdownComplete] )( @@ -162,8 +167,6 @@ class TezosIndexer private ( } } - - /** Search for any possible forks happened between the last sync cycle and now. * If a fork is detected, corrections will be applied. * @@ -190,12 +193,15 @@ class TezosIndexer private ( } else emptyOutcome } - private def processLastForks(maxIndexedLevel: BlockLevel, depth: Long, interval: Long, iteration: Long): Future[Option[AccountResetEvents]] = { + private def processLastForks( + maxIndexedLevel: BlockLevel, + depth: Long, + interval: Long, + iteration: Long + ): Future[Option[AccountResetEvents]] = { lazy val emptyOutcome = Future.successful(Option.empty) - println(s"checking iteration $iteration") - println(s"$featureFlags") if (featureFlags.forkHandlingIsOn && maxIndexedLevel != indexedData.defaultBlockLevel && iteration % interval == 0) - forkHandler.handleForkFrom(maxIndexedLevel, depth).flatMap { + backtrackingForkProcessor.handleForkFrom(maxIndexedLevel, depth).flatMap { case None => println(s"AFH: No fork detected up to $maxIndexedLevel") emptyOutcome @@ -404,6 +410,14 @@ object TezosIndexer extends ConseilLogSupport { amender = TezosForkInvalidatingAmender(db) ) + val backtracingForkProcessor = new BacktracingForkProcessor( + network = selectedNetwork, + node = new TezosNodeInterface(conf, callsConf, streamingClientConf), + tezosIndexedDataOperations = indexedData, + indexerSearch = forkSearchEngine.idsIndexerSearch, + amender = TezosForkInvalidatingAmender(db) + )(dispatcher) + val feeOperations = new TezosFeeOperations(db) /* the shutdown sequence to free resources */ @@ -479,6 +493,7 @@ object TezosIndexer extends ConseilLogSupport { accountsResetHandler, registeredTokensFetcher, forkHandler, + backtracingForkProcessor, feeOperations, gracefulTermination ) diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosNodeFetchers.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosNodeFetchers.scala index 8c5ab1097..59992207e 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosNodeFetchers.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosNodeFetchers.scala @@ -100,6 +100,44 @@ private[tezos] trait TezosBlocksDataFetchers { } + /** a fetcher of blocks range */ + implicit def blocksRangeFetcher(headLevel: Long) = new FutureFetcher { + import TezosJsonDecoders.Circe.Blocks._ + + type Encoded = String + type In = Long + type Out = BlockData + + private def makeUrl = (offset: Long) => s"blocks/${headLevel - offset}" + + //fetch a future stream of values + override val fetchData = { + Kleisli( + offsets => { + logger.info(s"""Fetching blocks for range ${offsets.min} to ${offsets.max}""") + node.runBatchedGetQuery(network, offsets, makeUrl, fetchConcurrency).onError { + case err => + val showBounds = offsets.onBounds((first, last) => s"$first to $last").getOrElse("unspecified") + logger + .error( + s"I encountered problems while fetching blocks data from $network, for offsets $showBounds from the $headLevel. The error says ${err.getMessage}" + ) + .pure[Future] + } + } + ) + } + + // decode with `JsonDecoders` + override val decodeData = Kleisli { json => + decodeLiftingTo[Future, Out](json) + .onError( + logErrorOnJsonDecoding(s"I fetched a block definition from tezos node that I'm unable to decode: $json") + ) + } + + } + /** decode account ids from operation json results with the `cats.Id` effect, i.e. a total function with no effect */ val accountIdsJsonDecode: Kleisli[Id, String, List[AccountId]] = Kleisli[Id, String, List[AccountId]] { diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/bigmaps/BigMapsConversions.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/bigmaps/BigMapsConversions.scala index 7c878342b..bacdf2e5d 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/bigmaps/BigMapsConversions.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/bigmaps/BigMapsConversions.scala @@ -4,7 +4,7 @@ import java.sql.Timestamp import cats.implicits._ import tech.cryptonomic.conseil.common.io.Logging.ConseilLogSupport -import tech.cryptonomic.conseil.common.tezos.Tables +import tech.cryptonomic.conseil.common.tezos.{Fork, Tables} import tech.cryptonomic.conseil.common.tezos.TezosTypes._ import tech.cryptonomic.conseil.indexer.tezos.michelson import tech.cryptonomic.conseil.indexer.tezos.michelson.contracts.TokenContracts @@ -45,7 +45,9 @@ object BigMapsConversions extends ConseilLogSupport { Tables.BigMapsRow( bigMapId = id, keyType = Some(toMichelsonScript[MichelsonExpression](key_type.expression)), - valueType = Some(toMichelsonScript[MichelsonExpression](value_type.expression)) + valueType = Some(toMichelsonScript[MichelsonExpression](value_type.expression)), + forkId = Fork.mainForkId, + blockLevel = Some(ref.level) ) ) case BlockTagged(ref, (hash, _, BigMapAlloc(_, InvalidDecimal(json), _, _))) => @@ -86,7 +88,8 @@ object BigMapsConversions extends ConseilLogSupport { blockLevel = Some(ref.level), timestamp = ref.timestamp.map(Timestamp.from), cycle = ref.cycle, - period = ref.period + period = ref.period, + forkId = Fork.mainForkId ) ) case BlockTagged(ref, (hash, _, BigMapUpdate(_, _, _, InvalidDecimal(json), _))) => diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/bigmaps/BigMapsOperations.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/bigmaps/BigMapsOperations.scala index cc574b938..ab0f10aec 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/bigmaps/BigMapsOperations.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/bigmaps/BigMapsOperations.scala @@ -71,7 +71,9 @@ case class BigMapsOperations[Profile <: ExPostgresProfile](profile: Profile) ext it.blockLevel, it.timestamp, it.cycle, - it.period + it.period, + it.forkId, + it.invalidatedAsof ) ) .result diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/forks/BacktracingForkProcessor.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/forks/BacktracingForkProcessor.scala new file mode 100644 index 000000000..a05110d49 --- /dev/null +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/forks/BacktracingForkProcessor.scala @@ -0,0 +1,71 @@ +package tech.cryptonomic.conseil.indexer.tezos.forks + +import java.time.Instant + +import tech.cryptonomic.conseil.common.io.Logging.ConseilLogSupport +import tech.cryptonomic.conseil.common.tezos.TezosTypes.{BlockData, TezosBlockHash} +import tech.cryptonomic.conseil.indexer.tezos.{TezosBlocksDataFetchers, TezosIndexedDataOperations, TezosRPCInterface} +import cats._ +import cats.implicits._ +import tech.cryptonomic.conseil.indexer.forks.ForkAmender +import tech.cryptonomic.conseil.indexer.forks.ForkDetector.SearchBlockId + +import scala.concurrent.{ExecutionContext, Future} + +class BacktracingForkProcessor( + val network: String, + val node: TezosRPCInterface, + tezosIndexedDataOperations: TezosIndexedDataOperations, + indexerSearch: SearchBlockId[Future, TezosBlockHash], + amender: ForkAmender[Future, TezosBlockHash] +)(ec: ExecutionContext) + extends TezosBlocksDataFetchers + with ConseilLogSupport { + + /** parallelism in the multiple requests decoding on the RPC interface */ + override def fetchConcurrency: Int = 50 + + implicit override def fetchFutureContext: ExecutionContext = ec + + /** + * Checks if there is a fork from given level down to (level - depth) + * @param level - from which level we start check for fork + * @param depth - how deep we check for forks + * @return a list of failed fork checks of levels + */ + def checkDepthLevel(level: Long, depth: Long): Future[List[Offset]] = { + import cats.instances.future._ + import cats.instances.list._ + import tech.cryptonomic.conseil.common.generic.chain.DataFetcher.{fetch, fetchMerge} + import tech.cryptonomic.conseil.common.tezos.TezosOptics.Blocks._ + implicit val fetcher = blocksRangeFetcher(level) + val res = fetch[Long, BlockData, Future, List, Throwable].run((0L to depth).toList) + res.flatMap { lst => + lst.map { + case (l, chainBlock) => + tezosIndexedDataOperations.fetchBlockAtLevel(chainBlock.header.level).map { indexedBlock => + if (indexedBlock.forall(_.hash == chainBlock.hash.value)) { + -1 + } else { + logger.debug(s"Hashes don't match: ${chainBlock.header.level} ${indexedBlock.map(_.hash)} && ${chainBlock.hash.value}") + l + } + } + }.sequence + }.map(_.filter(_ > 0)) + } + + def handleForkFrom(currentHeadLevel: Long, depth: Long): Future[Option[ForkAmender.Results]] = { + checkDepthLevel(currentHeadLevel, depth).flatMap { + case Nil => Option.empty.pure[Future] + case xs => + logger.info(s"Found forked blocks ${xs.sorted.mkString(",")}") + val forkLevel = xs.min + for { + forkBlockId <- indexerSearch.searchForLevel(forkLevel) + amendment <- amender.amendFork(forkLevel, forkBlockId, currentHeadLevel, Instant.now()) + } yield amendment.some + } + } + +} diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/forks/TezosForkInvalidatingAmender.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/forks/TezosForkInvalidatingAmender.scala index 5ad004e1b..a029739b7 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/forks/TezosForkInvalidatingAmender.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/forks/TezosForkInvalidatingAmender.scala @@ -80,6 +80,9 @@ class TezosForkInvalidatingAmender(db: Database)(implicit ec: ExecutionContext) DBOps.ForkInvalidation.fees.invalidate(forkLevel, asOf, forkId), DBOps.ForkInvalidation.governance.invalidate(forkLevel, asOf, forkId), DBOps.ForkInvalidation.tokenBalances.invalidate(forkLevel, asOf, forkId), + DBOps.ForkInvalidation.bigMaps.invalidate(forkLevel, asOf, forkId), + DBOps.ForkInvalidation.bigMapContents.invalidate(forkLevel, asOf, forkId), + DBOps.ForkInvalidation.bigMapContentsHistory.invalidate(forkLevel, asOf, forkId), DBOps.ForkInvalidation.deleteProcessedEvents(forkLevel) ).foldA } diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/michelson/contracts/TNSContract.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/michelson/contracts/TNSContract.scala index c7d763d10..fb494ca78 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/michelson/contracts/TNSContract.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/michelson/contracts/TNSContract.scala @@ -219,7 +219,7 @@ object TNSContract extends ConseilLogSupport { * Any other map (there should be exactly two) must then be the lookup, by exclusion. */ val (reverseMaps, lookupMaps) = maps.partition { - case BigMapsRow(id, keyType, valueType) => + case BigMapsRow(id, keyType, valueType, _, _, _) => (keyType, valueType) match { case (Some("address"), Some("string")) => true case _ => false diff --git a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/JsonDecodersTestFixtures.scala b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/JsonDecodersTestFixtures.scala index 5cdd849ba..f2aaf0996 100644 --- a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/JsonDecodersTestFixtures.scala +++ b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/JsonDecodersTestFixtures.scala @@ -128,25 +128,29 @@ trait DelegatesJsonData { frozen_balance_by_cycle = List( CycleBalance( cycle = 174, - deposit = PositiveDecimal(1321664000000L), + deposit = Some(PositiveDecimal(1321664000000L)), + deposits = None, fees = PositiveDecimal(572501), rewards = PositiveDecimal(37062032929L) ), CycleBalance( cycle = 175, - deposit = PositiveDecimal(1510400000000L), + deposit = Some(PositiveDecimal(1510400000000L)), + deposits = None, fees = PositiveDecimal(5341649990L), rewards = PositiveDecimal(45404449892L) ), CycleBalance( cycle = 176, - deposit = PositiveDecimal(1550400000000L), + deposit = Some(PositiveDecimal(1550400000000L)), + deposits = None, fees = PositiveDecimal(301772470L), rewards = PositiveDecimal(44143066264L) ), CycleBalance( cycle = 177, - deposit = PositiveDecimal(62848000000L), + deposit = Some(PositiveDecimal(62848000000L)), + deposits = None, fees = PositiveDecimal(550000L), rewards = PositiveDecimal(1817833324L) ) diff --git a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/bigmaps/BigMapsOperationsTest.scala b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/bigmaps/BigMapsOperationsTest.scala index c98a76ca7..02d108c17 100644 --- a/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/bigmaps/BigMapsOperationsTest.scala +++ b/conseil-lorre/src/test/scala/tech/cryptonomic/conseil/indexer/tezos/bigmaps/BigMapsOperationsTest.scala @@ -70,7 +70,9 @@ class BigMapsOperationsTest BigMapsRow( bigMapId = BigDecimal(1), keyType = Some("address"), - valueType = Some("nat") + valueType = Some("nat"), + forkId = Fork.mainForkId, + blockLevel = Some(1) ) ) } @@ -95,7 +97,8 @@ class BigMapsOperationsTest val initialBigMap = BigMapsRow( bigMapId = BigDecimal(1), keyType = Some("address"), - valueType = Some("nat") + valueType = Some("nat"), + forkId = Fork.mainForkId ) val block = generateSingleBlock(1, testReferenceDateTime) @@ -130,7 +133,8 @@ class BigMapsOperationsTest blockLevel = Some(block.data.header.level), timestamp = Some(Timestamp.from(block.data.header.timestamp.toInstant)), cycle = TezosOptics.Blocks.extractCycle(block), - period = TezosOptics.Blocks.extractPeriod(block.data.metadata) + period = TezosOptics.Blocks.extractPeriod(block.data.metadata), + forkId = Fork.mainForkId ) ) @@ -217,7 +221,8 @@ class BigMapsOperationsTest val initialBigMap = BigMapsRow( bigMapId = BigDecimal(tokenMap), keyType = Some("address"), - valueType = Some("pair (nat :balance) (map :approvals (address :spender) (nat :value))") + valueType = Some("pair (nat :balance) (map :approvals (address :spender) (nat :value))"), + forkId = Fork.mainForkId ) val block = generateSingleBlock(1, testReferenceDateTime) @@ -287,13 +292,15 @@ class BigMapsOperationsTest val initialBigMap = BigMapsRow( bigMapId = BigDecimal(1), keyType = Some("address"), - valueType = Some("nat") + valueType = Some("nat"), + forkId = Fork.mainForkId ) val initialBigMapContent = BigMapContentsRow( bigMapId = BigDecimal(1), key = "0x0000b2e19a9e74440d86c59f13dab8a18ff873e889ea", keyHash = Some("exprv6UsC1sN3Fk2XfgcJCL8NCerP5rCGy1PRESZAqr7L2JdzX55EN"), - value = Some("Pair 20 {}") + value = Some("Pair 20 {}"), + forkId = Fork.mainForkId ) val populate = for { @@ -342,7 +349,8 @@ class BigMapsOperationsTest blockLevel = Some(block.data.header.level), timestamp = Some(Timestamp.from(block.data.header.timestamp.toInstant)), cycle = TezosOptics.Blocks.extractCycle(block), - period = TezosOptics.Blocks.extractPeriod(block.data.metadata) + period = TezosOptics.Blocks.extractPeriod(block.data.metadata), + forkId = Fork.mainForkId ) ) } @@ -354,13 +362,15 @@ class BigMapsOperationsTest val initialBigMap = BigMapsRow( bigMapId = BigDecimal(1), keyType = Some("address"), - valueType = Some("nat") + valueType = Some("nat"), + forkId = Fork.mainForkId ) val initialBigMapContent = BigMapContentsRow( bigMapId = BigDecimal(1), key = "0x0000b2e19a9e74440d86c59f13dab8a18ff873e889ea", keyHash = Some("exprv6UsC1sN3Fk2XfgcJCL8NCerP5rCGy1PRESZAqr7L2JdzX55EN"), - value = Some("Pair 20 {}") + value = Some("Pair 20 {}"), + forkId = Fork.mainForkId ) val populate = for { @@ -431,7 +441,8 @@ class BigMapsOperationsTest blockLevel = Some(reverted.head.data.header.level), timestamp = Some(Timestamp.from(reverted.head.data.header.timestamp.toInstant)), cycle = TezosOptics.Blocks.extractCycle(reverted.head), - period = TezosOptics.Blocks.extractPeriod(reverted.head.data.metadata) + period = TezosOptics.Blocks.extractPeriod(reverted.head.data.metadata), + forkId = Fork.mainForkId ) ) @@ -448,7 +459,8 @@ class BigMapsOperationsTest BigMapsRow( bigMapId = BigDecimal(i), keyType = None, - valueType = None + valueType = None, + forkId = Fork.mainForkId ) ) @@ -458,7 +470,8 @@ class BigMapsOperationsTest bigMapId = BigDecimal(1), key = "0x0000b2e19a9e74440d86c59f13dab8a18ff873e889ea", keyHash = Some("exprv6UsC1sN3Fk2XfgcJCL8NCerP5rCGy1PRESZAqr7L2JdzX55EN"), - value = Some("Pair 20 {}") + value = Some("Pair 20 {}"), + forkId = Fork.mainForkId ) //store the data @@ -506,7 +519,8 @@ class BigMapsOperationsTest bigMapId = BigDecimal(2), key = "0x0000b2e19a9e74440d86c59f13dab8a18ff873e889ea", keyHash = Some("exprv6UsC1sN3Fk2XfgcJCL8NCerP5rCGy1PRESZAqr7L2JdzX55EN"), - value = Some("Pair 20 {}") + value = Some("Pair 20 {}"), + forkId = Fork.mainForkId ) ) @@ -523,7 +537,8 @@ class BigMapsOperationsTest BigMapsRow( bigMapId = BigDecimal(i), keyType = None, - valueType = None + valueType = None, + forkId = Fork.mainForkId ) ) @@ -533,13 +548,15 @@ class BigMapsOperationsTest bigMapId = BigDecimal(1), key = "0x0000b2e19a9e74440d86c59f13dab8a18ff873e889ea", keyHash = Some("exprv6UsC1sN3Fk2XfgcJCL8NCerP5rCGy1PRESZAqr7L2JdzX55EN"), - value = Some("Pair 10 {}") + value = Some("Pair 10 {}"), + forkId = Fork.mainForkId ), BigMapContentsRow( bigMapId = BigDecimal(2), key = "0x0000b2e19a9e74440d86c59f13dab8a18ff873e889ea", keyHash = Some("exprv6UsC1sN3Fk2XfgcJCL8NCerP5rCGy1PRESZAqr7L2JdzX55EN"), - value = Some("Pair 20 {}") + value = Some("Pair 20 {}"), + forkId = Fork.mainForkId ) ) @@ -609,7 +626,8 @@ class BigMapsOperationsTest bigMapId = BigDecimal(3), key = "0x0000b2e19a9e74440d86c59f13dab8a18ff873e889ea", keyHash = Some("exprv6UsC1sN3Fk2XfgcJCL8NCerP5rCGy1PRESZAqr7L2JdzX55EN"), - value = Some("Pair 20 {}") + value = Some("Pair 20 {}"), + forkId = Fork.mainForkId ) ) } @@ -625,7 +643,8 @@ class BigMapsOperationsTest BigMapsRow( bigMapId = BigDecimal(i), keyType = None, - valueType = None + valueType = None, + forkId = Fork.mainForkId ) ) @@ -635,7 +654,8 @@ class BigMapsOperationsTest bigMapId = BigDecimal(1), key = "0x0000b2e19a9e74440d86c59f13dab8a18ff873e889ea", keyHash = Some("exprv6UsC1sN3Fk2XfgcJCL8NCerP5rCGy1PRESZAqr7L2JdzX55EN"), - value = Some("Pair 20 {}") + value = Some("Pair 20 {}"), + forkId = Fork.mainForkId ) //the origination used for the generated sample is used to create the test big map diff --git a/sql/conseil.sql b/sql/conseil.sql index b0237a3f2..4fc1da48a 100644 --- a/sql/conseil.sql +++ b/sql/conseil.sql @@ -533,9 +533,13 @@ CREATE SEQUENCE tezos.operations_operation_id_seq ALTER SEQUENCE tezos.operations_operation_id_seq OWNED BY tezos.operations.operation_id; CREATE TABLE tezos.big_maps ( - big_map_id numeric PRIMARY KEY, + big_map_id numeric, key_type character varying, - value_type character varying + value_type character varying, + fork_id character varying NOT NULL, + block_level bigint, + invalidated_asof timestamp, + PRIMARY KEY (big_map_id, fork_id) ); CREATE TABLE tezos.big_map_contents ( @@ -549,7 +553,9 @@ CREATE TABLE tezos.big_map_contents ( "timestamp" timestamp without time zone, cycle integer, period integer, - PRIMARY KEY (big_map_id, key) + fork_id character varying NOT NULL, + invalidated_asof timestamp, + PRIMARY KEY (big_map_id, key, fork_id) ); CREATE TABLE tezos.big_map_contents_history ( @@ -561,7 +567,9 @@ CREATE TABLE tezos.big_map_contents_history ( block_level bigint, "timestamp" timestamp without time zone, cycle integer, - period integer + period integer, + fork_id character varying NOT NULL, + invalidated_asof timestamp ); CREATE INDEX big_map_id_idx ON tezos.big_map_contents USING btree (big_map_id); From 3505b30da297a28bc635b7687a0cdc11990019d8 Mon Sep 17 00:00:00 2001 From: piotrkosecki Date: Wed, 3 Nov 2021 14:23:49 +0100 Subject: [PATCH 6/6] added comments --- .../conseil/indexer/tezos/TezosIndexer.scala | 14 +++++++++++--- .../tezos/forks/BacktracingForkProcessor.scala | 16 ++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala index 626dcdac7..c1eccd2d6 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/TezosIndexer.scala @@ -193,6 +193,14 @@ class TezosIndexer private ( } else emptyOutcome } + /** + * Searches for fork between indexed head down to (head - depth) every [[interval]] of [[iteration]] + * @param maxIndexedLevel level of the currently indexed head + * @param depth how deep we look for forks from the current head + * @param interval every which iteration are we checking for forks + * @param iteration which iteration of main loop are we running + * @return + */ private def processLastForks( maxIndexedLevel: BlockLevel, depth: Long, @@ -203,11 +211,11 @@ class TezosIndexer private ( if (featureFlags.forkHandlingIsOn && maxIndexedLevel != indexedData.defaultBlockLevel && iteration % interval == 0) backtrackingForkProcessor.handleForkFrom(maxIndexedLevel, depth).flatMap { case None => - println(s"AFH: No fork detected up to $maxIndexedLevel") + logger.info(s"No local fork detected up to $maxIndexedLevel") emptyOutcome case Some((forkId, invalidations)) => - println( - s"AFH: A fork was detected somewhere before the currently indexed level $maxIndexedLevel. $invalidations entries were invalidated and connected to fork $forkId" + logger.info( + s" A local fork was detected somewhere before the currently indexed level $maxIndexedLevel. $invalidations entries were invalidated and connected to fork $forkId" ) /* locally processed events were invalidated on db, we need to reload them afresh */ accountsResetHandler diff --git a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/forks/BacktracingForkProcessor.scala b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/forks/BacktracingForkProcessor.scala index a05110d49..f66b991ed 100644 --- a/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/forks/BacktracingForkProcessor.scala +++ b/conseil-lorre/src/main/scala/tech/cryptonomic/conseil/indexer/tezos/forks/BacktracingForkProcessor.scala @@ -12,6 +12,16 @@ import tech.cryptonomic.conseil.indexer.forks.ForkDetector.SearchBlockId import scala.concurrent.{ExecutionContext, Future} +/** + * Class which handles processing forks with backtracing + * + * @param network tezos network + * @param node tezos RPC interface + * @param tezosIndexedDataOperations indexed data ops + * @param indexerSearch provides search through indexed data + * @param amender fork amender + * @param ec execution context + */ class BacktracingForkProcessor( val network: String, val node: TezosRPCInterface, @@ -55,6 +65,12 @@ class BacktracingForkProcessor( }.map(_.filter(_ > 0)) } + /** + * Searches for the first level from head to (head - depth) if there is difference and amends fork + * @param currentHeadLevel level of the currently stored head + * @param depth depth for the search + * @return + */ def handleForkFrom(currentHeadLevel: Long, depth: Long): Future[Option[ForkAmender.Results]] = { checkDepthLevel(currentHeadLevel, depth).flatMap { case Nil => Option.empty.pure[Future]