Skip to content

Commit

Permalink
server: Synchronize the TPoS contracts
Browse files Browse the repository at this point in the history
- A TPoS contract is created when the transaction is synchronized.
- A TPoS contract is deleted when the transaction is rolled back.
- A TPoS contract is closed when the collateral output is spent.
- A TPoS contract is enabled when the collateral output gets unspent.
  • Loading branch information
AlexITC committed Mar 31, 2019
1 parent 87793c7 commit 887af0f
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 43 deletions.
3 changes: 2 additions & 1 deletion server/app/com/xsn/explorer/data/LedgerDataHandler.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.xsn.explorer.data

import com.alexitc.playsonify.core.ApplicationResult
import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models.persisted.Block

import scala.language.higherKinds
Expand All @@ -16,7 +17,7 @@ trait LedgerDataHandler[F[_]] {
* - The ledger is empty and the block is the genesis one.
* - The ledger has some blocks and the block goes just after the latest one.
*/
def push(block: Block.HasTransactions): F[Unit]
def push(block: Block.HasTransactions, tposContracts: List[TPoSContract]): F[Unit]

/**
* Remove the latest block from the ledger, it will succeed only if the ledger is not empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.xsn.explorer.data.LedgerBlockingDataHandler
import com.xsn.explorer.data.anorm.dao._
import com.xsn.explorer.errors.{PostgresForeignKeyViolationError, PreviousBlockMissingError, RepeatedBlockHeightError}
import com.xsn.explorer.gcs.{GolombCodedSet, GolombEncoding}
import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models.persisted.{Balance, Block, Transaction}
import com.xsn.explorer.models.values.Address
import com.xsn.explorer.util.Extensions.ListOptionExt
Expand All @@ -33,13 +34,14 @@ class LedgerPostgresDataHandler @Inject() (
* to have a next block, we remove the link because that block is not stored yet.
*/
override def push(
block: Block.HasTransactions): ApplicationResult[Unit] = {
block: Block.HasTransactions,
tposContracts: List[TPoSContract]): ApplicationResult[Unit] = {

// the filter is computed outside the transaction to avoid unnecessary locking
val filter = GolombEncoding.encode(block)
val result = withTransaction { implicit conn =>
val result = for {
_ <- upsertBlockCascade(block.asTip, filter)
_ <- upsertBlockCascade(block.asTip, filter, tposContracts)
} yield ()

result
Expand Down Expand Up @@ -72,7 +74,8 @@ class LedgerPostgresDataHandler @Inject() (

private def upsertBlockCascade(
block: Block.HasTransactions,
filter: Option[GolombCodedSet])(
filter: Option[GolombCodedSet],
tposContracts: List[TPoSContract])(
implicit conn: Connection): Option[Unit] = {

val result = for {
Expand All @@ -82,7 +85,7 @@ class LedgerPostgresDataHandler @Inject() (
_ = filter.foreach { f => blockFilterPostgresDAO.insert(block.hash, f) }

// batch insert
_ <- transactionPostgresDAO.insert(block.transactions)
_ <- transactionPostgresDAO.insert(block.transactions, tposContracts)

// balances
balanceList = balances(block.transactions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory
class TransactionPostgresDAO @Inject() (
transactionInputDAO: TransactionInputPostgresDAO,
transactionOutputDAO: TransactionOutputPostgresDAO,
tposContractDAO: TPoSContractDAO,
addressTransactionDetailsDAO: AddressTransactionDetailsPostgresDAO,
fieldOrderingSQLInterpreter: FieldOrderingSQLInterpreter) {

Expand All @@ -31,11 +32,13 @@ class TransactionPostgresDAO @Inject() (
_ <- transactionOutputDAO.batchInsertOutputs(transaction.outputs)
_ <- transactionInputDAO.batchInsertInputs(transaction.inputs.map(transaction.id -> _))
_ <- transactionOutputDAO.batchSpend(transaction.id, transaction.inputs)
_ = closeContracts(List(transaction))
_ = transactionOutputDAO.batchSpend(transaction.id, transaction.inputs)
_ <- addressTransactionDetailsDAO.batchInsertDetails(transaction)
} yield Transaction.HasIO(partialTx, inputs = transaction.inputs, outputs = transaction.outputs)
}

def insert(transactions: List[Transaction.HasIO])(implicit conn: Connection): Option[List[Transaction]] = {
def insert(transactions: List[Transaction.HasIO], tposContracts: List[TPoSContract])(implicit conn: Connection): Option[List[Transaction]] = {
for {
r <- batchInsert(transactions.map(_.transaction))

Expand All @@ -47,6 +50,8 @@ class TransactionPostgresDAO @Inject() (
} yield {
insertDetails(transactions)
spend(transactions)
closeContracts(transactions)
tposContracts.foreach { contract => tposContractDAO.create(contract) }
r
}
}
Expand All @@ -63,6 +68,17 @@ class TransactionPostgresDAO @Inject() (
assert(spendResult.forall(_.isDefined), "Spending inputs batch failed")
}

private def closeContracts(transactions: List[Transaction.HasIO])(implicit conn: Connection): Unit = {
for {
tx <- transactions
// a contract requires 1 XSN
input <- tx.inputs if input.value == 1
} {
val id = TPoSContract.Id(input.fromTxid, input.fromOutputIndex)
tposContractDAO.close(id, tx.id)
}
}

private def batchInsert(transactions: List[Transaction])(implicit conn: Connection): Option[List[Transaction]] = {
transactions match {
case Nil => Some(transactions)
Expand Down Expand Up @@ -112,9 +128,16 @@ class TransactionPostgresDAO @Inject() (
).as(parseTransaction.*)

val result = expectedTransactions.map { tx =>
val _ = (
tposContractDAO.deleteBy(tx.id),
addressTransactionDetailsDAO.deleteDetails(tx.id)
)
val inputs = transactionInputDAO.deleteInputs(tx.id)
val outputs = transactionOutputDAO.deleteOutputs(tx.id)
val _ = addressTransactionDetailsDAO.deleteDetails(tx.id)

inputs
.map { input => TPoSContract.Id(input.fromTxid, input.fromOutputIndex) }
.foreach(tposContractDAO.open(_))

Transaction.HasIO(tx, inputs = inputs, outputs = outputs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.xsn.explorer.data.async
import com.alexitc.playsonify.core.FutureApplicationResult
import com.xsn.explorer.data.{LedgerBlockingDataHandler, LedgerDataHandler}
import com.xsn.explorer.executors.DatabaseExecutionContext
import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models.persisted.Block
import javax.inject.Inject

Expand All @@ -13,8 +14,8 @@ class LedgerFutureDataHandler @Inject() (
implicit ec: DatabaseExecutionContext)
extends LedgerDataHandler[FutureApplicationResult] {

override def push(block: Block.HasTransactions): FutureApplicationResult[Unit] = Future {
blockingDataHandler.push(block)
override def push(block: Block.HasTransactions, tposContracts: List[TPoSContract]): FutureApplicationResult[Unit] = Future {
blockingDataHandler.push(block, tposContracts)
}

override def pop(): FutureApplicationResult[Block] = Future {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package com.xsn.explorer.services

import com.alexitc.playsonify.core.FutureApplicationResult
import com.alexitc.playsonify.core.FutureOr.Implicits.{FutureOps, OptionOps}
import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult}
import com.xsn.explorer.data.async.{BlockFutureDataHandler, LedgerFutureDataHandler}
import com.xsn.explorer.errors.BlockNotFoundError
import com.xsn.explorer.models.TPoSContract
import com.xsn.explorer.models.persisted.Block
import com.xsn.explorer.models.transformers._
import com.xsn.explorer.models.values.{Blockhash, Height}
import com.xsn.explorer.util.Extensions.FutureOrExt
import javax.inject.Inject
import org.scalactic.Good
import org.scalactic.{Bad, Good}
import org.slf4j.LoggerFactory

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -23,6 +24,8 @@ class LedgerSynchronizerService @Inject() (
blockDataHandler: BlockFutureDataHandler)(
implicit ec: ExecutionContext) {

import LedgerSynchronizerService._

private val logger = LoggerFactory.getLogger(this.getClass)

/**
Expand All @@ -34,14 +37,15 @@ class LedgerSynchronizerService @Inject() (
*/
def synchronize(blockhash: Blockhash): FutureApplicationResult[Unit] = {
val result = for {
block <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(block).toFutureOr
data <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(data).toFutureOr
} yield ()

result.toFuture
}

private def synchronize(block: Block.HasTransactions): FutureApplicationResult[Unit] = {
private def synchronize(data: BlockData): FutureApplicationResult[Unit] = {
val block = data._1
logger.info(s"Synchronize block ${block.height}, hash = ${block.hash}")

val result = for {
Expand All @@ -52,8 +56,8 @@ class LedgerSynchronizerService @Inject() (
.recoverFrom(BlockNotFoundError)(None)

_ <- latestBlockMaybe
.map { latestBlock => onLatestBlock(latestBlock, block) }
.getOrElse { onEmptyLedger(block) }
.map { latestBlock => onLatestBlock(latestBlock, data) }
.getOrElse { onEmptyLedger(data) }
.toFutureOr
} yield ()

Expand All @@ -65,15 +69,16 @@ class LedgerSynchronizerService @Inject() (
* 1.1. the given block is the genensis block, it is added.
* 1.2. the given block is not the genesis block, sync everything until the given block.
*/
private def onEmptyLedger(block: Block.HasTransactions): FutureApplicationResult[Unit] = {
private def onEmptyLedger(data: BlockData): FutureApplicationResult[Unit] = {
val block = data._1
if (block.height.int == 0) {
logger.info(s"Synchronize genesis block on empty ledger, hash = ${block.hash}")
ledgerDataHandler.push(block)
ledgerDataHandler.push(block, data._2)
} else {
logger.info(s"Synchronize block ${block.height} on empty ledger, hash = ${block.hash}")
val result = for {
_ <- sync(0 until block.height.int).toFutureOr
_ <- synchronize(block).toFutureOr
_ <- synchronize(data).toFutureOr
} yield ()

result.toFuture
Expand All @@ -88,27 +93,28 @@ class LedgerSynchronizerService @Inject() (
* 2.4. if H <= N, if the hash already exists, it is ignored.
* 2.5. if H <= N, if the hash doesn't exists, remove blocks from N to H (included), then, add the new H.
*/
private def onLatestBlock(ledgerBlock: Block, newBlock: Block.HasTransactions): FutureApplicationResult[Unit] = {
private def onLatestBlock(ledgerBlock: Block, newData: BlockData): FutureApplicationResult[Unit] = {
val newBlock = newData._1
if (ledgerBlock.height.int + 1 == newBlock.height.int &&
newBlock.previousBlockhash.contains(ledgerBlock.hash)) {

logger.info(s"Appending block ${newBlock.height}, hash = ${newBlock.hash}")
ledgerDataHandler.push(newBlock)
ledgerDataHandler.push(newBlock, newData._2)
} else if (ledgerBlock.height.int + 1 == newBlock.height.int) {
logger.info(s"Reorganization to push block ${newBlock.height}, hash = ${newBlock.hash}")
val result = for {
blockhash <- newBlock.previousBlockhash.toFutureOr(BlockNotFoundError)
previousBlock <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(previousBlock).toFutureOr
_ <- synchronize(newBlock).toFutureOr
_ <- synchronize(newData).toFutureOr
} yield ()

result.toFuture
} else if (newBlock.height.int > ledgerBlock.height.int) {
logger.info(s"Filling holes to push block ${newBlock.height}, hash = ${newBlock.hash}")
val result = for {
_ <- sync(ledgerBlock.height.int + 1 until newBlock.height.int).toFutureOr
_ <- synchronize(newBlock).toFutureOr
_ <- synchronize(newData).toFutureOr
} yield ()

result.toFuture
Expand All @@ -126,7 +132,7 @@ class LedgerSynchronizerService @Inject() (
.getOrElse {
val x = for {
_ <- trimTo(newBlock.height).toFutureOr
_ <- synchronize(newBlock).toFutureOr
_ <- synchronize(newData).toFutureOr
} yield ()
x.toFuture
}
Expand All @@ -148,24 +154,59 @@ class LedgerSynchronizerService @Inject() (
val result = for {
_ <- previous.toFutureOr
blockhash <- xsnService.getBlockhash(Height(height)).toFutureOr
block <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(block).toFutureOr
data <- getRPCBlock(blockhash).toFutureOr
_ <- synchronize(data).toFutureOr
} yield ()

result.toFuture
}
}

private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[Block.HasTransactions] = {
private def getRPCBlock(blockhash: Blockhash): FutureApplicationResult[BlockData] = {
val result = for {
rpcBlock <- xsnService.getBlock(blockhash).toFutureOr
extractionMethod <- blockService.extractionMethod(rpcBlock).toFutureOr
transactions <- transactionRPCService.getTransactions(rpcBlock.transactions).toFutureOr
} yield toPersistedBlock(rpcBlock, extractionMethod).withTransactions(transactions)
data <- transactionRPCService.getTransactions(rpcBlock.transactions).toFutureOr
(transactions, contracts) = data
validContracts <- getValidContracts(contracts).toFutureOr
} yield {
val block = toPersistedBlock(rpcBlock, extractionMethod).withTransactions(transactions)
(block, validContracts)
}

result.toFuture
}

private def getValidContracts(contracts: List[TPoSContract]): FutureApplicationResult[List[TPoSContract]] = {
val listF = contracts
.map { contract =>
xsnService
.isTPoSContract(contract.txid)
.toFutureOr
.map { valid =>
if (valid) Some(contract)
else None
}
.toFuture
}

val futureList = Future.sequence(listF)
futureList.map { list =>
val x = list.flatMap {
case Good(a) => a.map(Good(_))
case Bad(e) => Some(Bad(e))
}

val initial: ApplicationResult[List[TPoSContract]] = Good(List.empty)
x.foldLeft(initial) { case (acc, cur) =>
cur match {
case Good(contract) => acc.map(contract :: _)
case Bad(e) => acc.badMap(prev => prev ++ e)
}
}
}
}

/**
* Trim the ledger until the given block height, if the height is 4,
* the last stored block will be 3.
Expand All @@ -188,3 +229,8 @@ class LedgerSynchronizerService @Inject() (
result.toFuture
}
}

object LedgerSynchronizerService {

type BlockData = (Block.HasTransactions, List[TPoSContract])
}
17 changes: 12 additions & 5 deletions server/app/com/xsn/explorer/services/TransactionRPCService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.xsn.explorer.errors.{InvalidRawTransactionError, TransactionFormatErr
import com.xsn.explorer.models.persisted.Transaction
import com.xsn.explorer.models.rpc.TransactionVIN
import com.xsn.explorer.models.values._
import com.xsn.explorer.models.{TransactionDetails, TransactionValue}
import com.xsn.explorer.models.{TPoSContract, TransactionDetails, TransactionValue}
import com.xsn.explorer.util.Extensions.FutureOrExt
import javax.inject.Inject
import org.scalactic.{Bad, Good, One, Or}
Expand Down Expand Up @@ -49,12 +49,12 @@ class TransactionRPCService @Inject() (
result.toFuture
}

def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction.HasIO] = {
def getTransaction(txid: TransactionId): FutureApplicationResult[(Transaction.HasIO, Option[TPoSContract])] = {
val result = for {
tx <- xsnService.getTransaction(txid).toFutureOr
transactionVIN <- getTransactionVIN(tx.vin).toFutureOr
rpcTransaction = tx.copy(vin = transactionVIN)
} yield Transaction.fromRPC(rpcTransaction)._1
} yield Transaction.fromRPC(rpcTransaction)

result.toFuture
}
Expand Down Expand Up @@ -92,8 +92,8 @@ class TransactionRPCService @Inject() (
}
}

def getTransactions(ids: List[TransactionId]): FutureApplicationResult[List[Transaction.HasIO]] = {
def loadTransactionsSlowly(pending: List[TransactionId]): FutureOr[List[Transaction.HasIO]] = pending match {
def getTransactions(ids: List[TransactionId]): FutureApplicationResult[(List[Transaction.HasIO], List[TPoSContract])] = {
def loadTransactionsSlowly(pending: List[TransactionId]): FutureOr[List[(Transaction.HasIO, Option[TPoSContract])]] = pending match {
case x :: xs =>
for {
tx <- getTransaction(x).toFutureOr
Expand All @@ -116,6 +116,13 @@ class TransactionRPCService @Inject() (
logger.warn(s"Unable to load transactions due to server error, loading them sequentially, error = ${ex.getMessage}")
loadTransactionsSlowly(ids).toFuture
}
.toFutureOr
.map { result =>
val contracts = result.flatMap(_._2)
val txs = result.map(_._1)
(txs, contracts)
}
.toFuture
}

def sendRawTransaction(hexString: String): FutureApplicationResult[JsValue] = {
Expand Down
Loading

0 comments on commit 887af0f

Please sign in to comment.