-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
6428091
commit e506f7d
Showing
4 changed files
with
176 additions
and
0 deletions.
There are no files selected for viewing
144 changes: 144 additions & 0 deletions
144
server/app/com/xsn/explorer/migrations/MigrationRunner.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
package com.xsn.explorer.migrations | ||
|
||
import anorm.SQL | ||
import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps | ||
import com.xsn.explorer.data.anorm.AnormPostgresDataHandler | ||
import com.xsn.explorer.data.anorm.dao.{BlockPostgresDAO, BlockRewardPostgresDAO} | ||
import com.xsn.explorer.data.async.BlockFutureDataHandler | ||
import com.xsn.explorer.executors.DatabaseExecutionContext | ||
import com.xsn.explorer.models.{BlockExtractionMethod, BlockRewards, rpc} | ||
import com.xsn.explorer.models.persisted.Block | ||
import com.xsn.explorer.models.values.{Blockhash, Height} | ||
import com.xsn.explorer.services.{BlockService, XSNService} | ||
import javax.inject.Inject | ||
import org.scalactic.{Bad, Good} | ||
import org.slf4j.LoggerFactory | ||
import play.api.db.Database | ||
|
||
import scala.concurrent.{ExecutionContext, Future} | ||
import scala.util.{Failure, Success} | ||
|
||
class MigrationRunner @Inject()( | ||
xsnService: XSNService, | ||
blockService: BlockService, | ||
blockDataHandler: BlockFutureDataHandler, | ||
db: MigrationRunner.DatabaseOperations | ||
)(implicit ec: ExecutionContext) { | ||
|
||
private val logger = LoggerFactory.getLogger(this.getClass) | ||
|
||
def run() = { | ||
val targetBlock = blockDataHandler.getLatestBlock().toFutureOr | ||
|
||
targetBlock.map { targetBlock => | ||
logger.info(s"Migrating Block Rewards from block 0 to block ${targetBlock.height}") | ||
|
||
val startingState = Future.successful(Good(())).toFutureOr | ||
val finalState = (0 to targetBlock.height.int).foldLeft(startingState) { | ||
case (state, height) => | ||
for { | ||
_ <- state | ||
block <- blockDataHandler.getBy(Height(height)).toFutureOr | ||
rpcBlock <- xsnService.getFullBlock(block.hash).toFutureOr | ||
_ <- block.extractionMethod match { | ||
case BlockExtractionMethod.TrustlessProofOfStake => | ||
recalculateExtractionMethod(block, rpcBlock).toFutureOr | ||
case _ => Future.successful(Good(())).toFutureOr | ||
} | ||
_ <- db.getReward(block.hash).toFutureOr.flatMap { | ||
case None => storeBlockReward(rpcBlock).toFutureOr | ||
case _ => Future.successful(Good(())).toFutureOr | ||
} | ||
_ = logProgress(height, targetBlock.height.int) | ||
} yield () | ||
} | ||
|
||
finalState.toFuture.onComplete { | ||
case Success(Good(_)) => logger.info("Block rewards successfully migrated") | ||
case Success(Bad(error)) => logger.info(s"Block reward migration failed due to ${error.toString}") | ||
case Failure(error) => logger.info(s"Block reward migration failed due to ${error.toString}") | ||
} | ||
} | ||
} | ||
|
||
private def logProgress(migratedHeight: Int, targetHeight: Int) = { | ||
val percentage: Int = 100 * migratedHeight / targetHeight | ||
val previousPercentage: Int = 100 * (migratedHeight - 1) / targetHeight | ||
|
||
if (percentage != 0 && percentage != previousPercentage) { | ||
logger.info(s"migrated block at height ${migratedHeight}, ${percentage}% done") | ||
} | ||
} | ||
|
||
private def recalculateExtractionMethod(block: Block, rpcBlock: rpc.Block[_]) = { | ||
val result = for { | ||
extractionMethod <- blockService.extractionMethod(rpcBlock).toFutureOr | ||
_ <- db.updateExtractionMethod(block.hash, extractionMethod).toFutureOr | ||
} yield Good(()) | ||
|
||
result.future | ||
} | ||
|
||
private def storeBlockReward(rpcBlock: rpc.Block[_]) = { | ||
val result = for { | ||
extractionMethod <- blockService.extractionMethod(rpcBlock).toFutureOr | ||
reward <- blockService | ||
.getBlockRewards(rpcBlock, extractionMethod) | ||
.map { | ||
case Good(reward) => Good(Some(reward)) | ||
case Bad(_) => Good(None) | ||
} | ||
.toFutureOr | ||
_ = reward.foreach(r => db.insertBlockRewards(rpcBlock.hash, r)) | ||
} yield Good(()) | ||
|
||
result.future | ||
} | ||
} | ||
|
||
object MigrationRunner { | ||
private class DatabaseOperations @Inject()( | ||
override val database: Database, | ||
blockPostgresDAO: BlockPostgresDAO, | ||
blockRewardPostgresDAO: BlockRewardPostgresDAO | ||
)( | ||
implicit dbEC: DatabaseExecutionContext | ||
) extends AnormPostgresDataHandler { | ||
|
||
def updateExtractionMethod(blockhash: Blockhash, extractionMethod: BlockExtractionMethod) = { | ||
Future { | ||
withConnection { implicit conn => | ||
SQL( | ||
""" | ||
|UPDATE blocks | ||
|SET extraction_method = {extraction_method}::BLOCK_EXTRACTION_METHOD_TYPE | ||
|WHERE blockhash = {blockhash} | ||
""".stripMargin | ||
).on( | ||
'blockhash -> blockhash.toBytesBE.toArray, | ||
'extraction_method -> extractionMethod.entryName | ||
) | ||
.execute | ||
|
||
Good(()) | ||
} | ||
} | ||
} | ||
|
||
def getReward(blockhash: Blockhash) = { | ||
Future { | ||
withConnection { implicit conn => | ||
Good(blockRewardPostgresDAO.getBy(blockhash)) | ||
} | ||
} | ||
} | ||
|
||
def insertBlockRewards(blockhash: Blockhash, reward: BlockRewards) = { | ||
Future { | ||
withConnection { implicit conn => | ||
Good(blockRewardPostgresDAO.upsert(blockhash, reward)) | ||
} | ||
} | ||
} | ||
} | ||
} |
6 changes: 6 additions & 0 deletions
6
server/app/com/xsn/explorer/modules/DatabaseMigrationsModule.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package com.xsn.explorer.modules | ||
|
||
import com.xsn.explorer.tasks.DatabaseMigrationsTask | ||
import play.api.inject.{SimpleModule, bind} | ||
|
||
class DatabaseMigrationsModule extends SimpleModule(bind[DatabaseMigrationsTask].toSelf.eagerly()) |
25 changes: 25 additions & 0 deletions
25
server/app/com/xsn/explorer/tasks/DatabaseMigrationsTask.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package com.xsn.explorer.tasks | ||
|
||
import akka.actor.ActorSystem | ||
import com.xsn.explorer.migrations.MigrationRunner | ||
import javax.inject.Inject | ||
import org.slf4j.LoggerFactory | ||
|
||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.duration._ | ||
|
||
class DatabaseMigrationsTask @Inject()( | ||
actorSystem: ActorSystem, | ||
migration: MigrationRunner | ||
)(implicit ec: ExecutionContext) { | ||
private val logger = LoggerFactory.getLogger(this.getClass) | ||
|
||
start() | ||
|
||
def start() = { | ||
logger.info("Starting database migrations task") | ||
actorSystem.scheduler.scheduleOnce(10 seconds) { | ||
migration.run | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters