From 6961c764f797f71a24483983a211b530485f87e3 Mon Sep 17 00:00:00 2001 From: Alexis Hernandez Date: Fri, 26 Apr 2019 13:01:08 -0600 Subject: [PATCH] server: Refactor the TransactionCollectorService --- .../TransactionCollectorService.scala | 135 ++++++------ .../TransactionCollectorServiceSpec.scala | 197 +++++++++++++++++- 2 files changed, 261 insertions(+), 71 deletions(-) diff --git a/server/app/com/xsn/explorer/services/TransactionCollectorService.scala b/server/app/com/xsn/explorer/services/TransactionCollectorService.scala index 5a049a27..ecd5bfaa 100644 --- a/server/app/com/xsn/explorer/services/TransactionCollectorService.scala +++ b/server/app/com/xsn/explorer/services/TransactionCollectorService.scala @@ -1,12 +1,11 @@ package com.xsn.explorer.services -import com.alexitc.playsonify.core.FutureApplicationResult import com.alexitc.playsonify.core.FutureOr.Implicits.FutureOps +import com.alexitc.playsonify.core.{ApplicationResult, FutureApplicationResult} import com.xsn.explorer.data.async.TransactionFutureDataHandler import com.xsn.explorer.errors.TransactionNotFoundError import com.xsn.explorer.models._ import com.xsn.explorer.models.values._ -import com.xsn.explorer.util.Extensions.FutureOrExt import javax.inject.Inject import org.scalactic.{Bad, Good, One, Or} @@ -39,7 +38,7 @@ class TransactionCollectorService @Inject() ( def collect(txidList: List[TransactionId], excludedTransactions: List[TransactionId]): FutureApplicationResult[Result] = { val futureOr = for { rpcTransactions <- getRPCTransactions(txidList).toFutureOr - completeTransactions <- getRPCTransactionsWithValues(rpcTransactions, excludedTransactions).toFutureOr + completeTransactions <- completeValues(rpcTransactions, excludedTransactions).toFutureOr } yield { val result = completeTransactions.map(persisted.Transaction.fromRPC) val contracts = result.flatMap(_._2) @@ -50,7 +49,7 @@ class TransactionCollectorService @Inject() ( futureOr.toFuture } - private[services] def getRPCTransactionsWithValues(rpcTransactions: RPCTransactionList, excludedTransactions: List[TransactionId]): FutureApplicationResult[RPCTransactionListWithValues] = { + private[services] def completeValues(rpcTransactions: List[RPCTransaction], excludedTransactions: List[TransactionId]): FutureApplicationResult[List[RPCCompleteTransaction]] = { val neutral: FutureApplicationResult[List[rpc.Transaction[rpc.TransactionVIN.HasValues]]] = Future.successful(Good(List.empty)) val future = rpcTransactions.foldLeft(neutral) { case (acc, tx) => val result = for { @@ -76,46 +75,46 @@ class TransactionCollectorService @Inject() ( * - The ones that weren't retrieved are retried sequentially using the RPC API. */ private[services] def getRPCTransactionVIN(vinList: List[rpc.TransactionVIN], excludedTransactions: List[TransactionId]): FutureApplicationResult[List[rpc.TransactionVIN.HasValues]] = { - val futureList = Future.sequence { - vinList - .filterNot(excludedTransactions contains _.txid) - .map { vin => - transactionDataHandler - .getOutput(vin.txid, vin.voutIndex) - .toFutureOr - .map { output => - vin.withValues(value = output.value, address = output.address) - } - .recoverWith(TransactionNotFoundError) { - getRPCTransactionVINWithValues(vin).toFutureOr - } - .toFuture - .map(vin -> _) + val filtered = vinList.filterNot(excludedTransactions contains _.txid) + getDBPartialVINList(filtered) + .flatMap(completeRPCVINSequentially) + } + + private[services] def getDBPartialVINList(vinList: List[rpc.TransactionVIN]): Future[List[PartialTransactionVIN]] = { + val futures = for (vin <- vinList) yield { + transactionDataHandler + .getOutput(vin.txid, vin.voutIndex) + .toFutureOr + .map { output => + vin.withValues(value = output.value, address = output.address) } + .toFuture + .map(vin -> _) } - val future = futureList - .flatMap { resultList => - val neutral: FutureApplicationResult[List[rpc.TransactionVIN.HasValues]] = Future.successful(Good(List.empty)) - resultList.foldLeft(neutral) { - case (acc, (vin, Bad(_))) => - val result = for { - ready <- acc.toFutureOr - newVIN <- getRPCTransactionVINWithValues(vin).toFutureOr - } yield newVIN :: ready + Future.sequence(futures) + } - result.toFuture + private[services] def completeRPCVINSequentially(partial: List[PartialTransactionVIN]): FutureApplicationResult[List[rpc.TransactionVIN.HasValues]] = { + val neutral: FutureApplicationResult[List[rpc.TransactionVIN.HasValues]] = Future.successful(Good(List.empty)) + val result = partial.foldLeft(neutral) { + case (acc, (vin, Bad(_))) => + val result = for { + ready <- acc.toFutureOr + newVIN <- getRPCTransactionVINWithValues(vin).toFutureOr + } yield newVIN :: ready - case (acc, (_, Good(newVIN))) => - val result = for { - ready <- acc.toFutureOr - } yield newVIN :: ready + result.toFuture - result.toFuture - } - } + case (acc, (_, Good(newVIN))) => + val result = for { + ready <- acc.toFutureOr + } yield newVIN :: ready - future + result.toFuture + } + + result .toFutureOr .map(_.reverse) .toFuture @@ -126,37 +125,41 @@ class TransactionCollectorService @Inject() ( * - Try to get them all from the RPC API in parallel * - Retry the ones that weren't retrieved in parallel by retrieving them sequentially. */ - private[services] def getRPCTransactions(txidList: List[TransactionId]): FutureApplicationResult[RPCTransactionList] = { - val futureList = Future.sequence { - txidList.map { txid => - for { - r <- xsnService.getTransaction(txid) - } yield txid -> r - } + private[services] def getRPCTransactions(txidList: List[TransactionId]): FutureApplicationResult[List[RPCTransaction]] = { + getPartialRPCTransactions(txidList) + .flatMap(completeRPCTransactionsSequentially) + } + + private[services] def getPartialRPCTransactions(txidList: List[TransactionId]): Future[List[PartialRPCTransaction]] = { + val futures = for (txid <- txidList) yield { + for { + r <- xsnService.getTransaction(txid) + } yield txid -> r } - val future = futureList - .flatMap { resultList => - val neutral: FutureApplicationResult[RPCTransactionList] = Future.successful(Good(List.empty)) - resultList.foldLeft(neutral) { - case (acc, (txid, Bad(_))) => - val result = for { - ready <- acc.toFutureOr - tx <- xsnService.getTransaction(txid).toFutureOr - } yield tx :: ready + Future.sequence(futures) + } - result.toFuture + private[services] def completeRPCTransactionsSequentially(partial: List[PartialRPCTransaction]): FutureApplicationResult[List[RPCTransaction]] = { + val neutral: FutureApplicationResult[List[RPCTransaction]] = Future.successful(Good(List.empty)) + val result = partial.foldLeft(neutral) { + case (acc, (txid, Bad(_))) => + val result = for { + ready <- acc.toFutureOr + tx <- xsnService.getTransaction(txid).toFutureOr + } yield tx :: ready - case (acc, (_, Good(tx))) => - val result = for { - ready <- acc.toFutureOr - } yield tx :: ready + result.toFuture - result.toFuture - } - } + case (acc, (_, Good(tx))) => + val result = for { + ready <- acc.toFutureOr + } yield tx :: ready - future + result.toFuture + } + + result .toFutureOr .map(_.reverse) .toFuture @@ -191,6 +194,10 @@ object TransactionCollectorService { type Result = (List[persisted.Transaction.HasIO], List[TPoSContract]) - private type RPCTransactionList = List[rpc.Transaction[rpc.TransactionVIN]] - private type RPCTransactionListWithValues = List[rpc.Transaction[rpc.TransactionVIN.HasValues]] + private type RPCTransaction = rpc.Transaction[rpc.TransactionVIN] + private type RPCCompleteTransaction = rpc.Transaction[rpc.TransactionVIN.HasValues] + + private type PartialTransactionVIN = (rpc.TransactionVIN, ApplicationResult[rpc.TransactionVIN.HasValues]) + private type PartialRPCTransaction = (TransactionId, ApplicationResult[RPCTransaction]) + } diff --git a/server/test/com/xsn/explorer/services/TransactionCollectorServiceSpec.scala b/server/test/com/xsn/explorer/services/TransactionCollectorServiceSpec.scala index dc978594..830d3d07 100644 --- a/server/test/com/xsn/explorer/services/TransactionCollectorServiceSpec.scala +++ b/server/test/com/xsn/explorer/services/TransactionCollectorServiceSpec.scala @@ -1,28 +1,194 @@ package com.xsn.explorer.services +import com.alexitc.playsonify.core.FutureApplicationResult +import com.xsn.explorer.data.TransactionBlockingDataHandler +import com.xsn.explorer.data.async.TransactionFutureDataHandler +import com.xsn.explorer.errors.TransactionNotFoundError +import com.xsn.explorer.helpers.{DataGenerator, DummyXSNService, Executors} +import com.xsn.explorer.models._ +import com.xsn.explorer.models.rpc.{ScriptPubKey, Transaction, TransactionVIN} +import com.xsn.explorer.models.values._ +import org.scalactic.{Bad, Good, One} +import org.scalatest.EitherValues._ +import org.scalatest.MustMatchers._ import org.scalatest.WordSpec +import org.scalatest.concurrent.ScalaFutures._ + +import scala.concurrent.Future class TransactionCollectorServiceSpec extends WordSpec { - lazy val service: TransactionCollectorService = ??? + import Executors.globalEC + + def create( + xsnService: XSNService, + transactionDataHandler: TransactionBlockingDataHandler): TransactionCollectorService = { + + val futureDataHandler = new TransactionFutureDataHandler(transactionDataHandler)(Executors.databaseEC) + new TransactionCollectorService(xsnService, futureDataHandler) + } "getRPCTransactionVINWithValues" should { + val txid = DataGenerator.randomTransactionId + val outputIndex = 1 + val vin = rpc.TransactionVIN.Raw(txid, outputIndex) + val address = DataGenerator.randomAddress + "return the values" in { - pending + val expected = vin.withValues(100, address) + val xsnService = new DummyXSNService { + override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction[TransactionVIN]] = { + val output = rpc.TransactionVOUT(100, outputIndex, Some(createScript(address))) + val tx = createTransaction(txid, List(output)) + Future.successful(Good(tx)) + } + } + + val service = create(xsnService, null) + whenReady(service.getRPCTransactionVINWithValues(vin)) { result => + result.toEither.right.value must be(expected) + } } "fail when the transaction doesn't have the referenced output" in { - pending + val xsnService = new DummyXSNService { + override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction[TransactionVIN]] = { + val output = rpc.TransactionVOUT(100, 1 + outputIndex, Some(createScript(address))) + val tx = createTransaction(txid, List(output)) + Future.successful(Good(tx)) + } + } + + val service = create(xsnService, null) + whenReady(service.getRPCTransactionVINWithValues(vin)) { result => + result.toEither.left.value must be(One(TransactionNotFoundError)) + } } "fail when the transaction doesn't exists" in { - pending + val xsnService = new DummyXSNService { + override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction[TransactionVIN]] = { + Future.successful(Bad(TransactionNotFoundError).accumulating) + } + } + + val service = create(xsnService, null) + whenReady(service.getRPCTransactionVINWithValues(vin)) { result => + result.toEither.left.value must be(One(TransactionNotFoundError)) + } + } + } + + "completeRPCTransactionsSequentially" should { + "do nothing on empty list" in { + val service = create(null, null) + whenReady(service.completeRPCTransactionsSequentially(List.empty)) { result => + result.toEither.right.value must be(empty) + } + } + + "do nothing when all transactions are loaded" in { + val input = for (_ <- 1 to 10) yield { + val txid = DataGenerator.randomTransactionId + val tx = createTransaction(txid, List.empty) + txid -> Good(tx) + } + + val service = create(null, null) + whenReady(service.completeRPCTransactionsSequentially(input.toList)) { result => + result.toEither.right.value must be(input.flatMap(_._2.toOption)) + } + } + + "fail when a single tx can't be completed" in { + val completed = for (_ <- 1 to 10) yield { + val txid = DataGenerator.randomTransactionId + val tx = createTransaction(txid, List.empty) + txid -> Good(tx) + } + + val pending = DataGenerator.randomTransactionId -> Bad(TransactionNotFoundError).accumulating + val input = (completed.take(5).toList ::: pending :: completed.drop(5).toList).reverse + + val xsnService = new DummyXSNService { + override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction[TransactionVIN]] = { + Future.successful(Bad(TransactionNotFoundError).accumulating) + } + } + + val service = create(xsnService, null) + whenReady(service.completeRPCTransactionsSequentially(input)) { result => + result.toEither.left.value must be(One(TransactionNotFoundError)) + } + } + + "complete the missing transactions" in { + val completed = for (_ <- 1 to 10) yield { + val txid = DataGenerator.randomTransactionId + val tx = createTransaction(txid, List.empty) + txid -> Good(tx) + } + val firstHalf = completed.take(5).toList + val secondHalf = completed.drop(5).toList + + val pending1 = DataGenerator.randomTransactionId -> Bad(TransactionNotFoundError).accumulating + val pending1Tx = createTransaction(pending1._1, List.empty) + + val pending2 = DataGenerator.randomTransactionId -> Bad(TransactionNotFoundError).accumulating + val pending2Tx = createTransaction(pending2._1, List.empty) + + val input = firstHalf ::: List(pending1) ::: secondHalf ::: List(pending2) + val xsnService = new DummyXSNService { + override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction[TransactionVIN]] = { + if (txid == pending1._1) { + Future.successful(Good(pending1Tx)) + } else if (txid == pending2._1) { + Future.successful(Good(pending2Tx)) + } else { + Future.successful(Bad(TransactionNotFoundError).accumulating) + } + } + } + + val service = create(xsnService, null) + whenReady(service.completeRPCTransactionsSequentially(input)) { result => + val expected = firstHalf.flatMap(_._2.toOption) ::: List(pending1Tx) ::: secondHalf.flatMap(_._2.toOption) ::: List(pending2Tx) + result.toEither.right.value must be(expected) + } } } "getRPCTransactions" should { - "work" in { - pending + "fallback to retrieving transactions sequentally" in { + val tx = createTransaction(DataGenerator.randomTransactionId, List.empty) + val pending = createTransaction(DataGenerator.randomTransactionId, List.empty) + + val xsnService: XSNService = new DummyXSNService { + + var ready = Set.empty[TransactionId] + + override def getTransaction(txid: TransactionId): FutureApplicationResult[Transaction[TransactionVIN]] = { + if (txid == tx.id) { + Future.successful(Good(tx)) + } else if (txid == pending.id) { + if (ready contains txid) { + Future.successful(Good(pending)) + } else { + ready = ready + txid + Future.successful(Bad(TransactionNotFoundError).accumulating) + } + } else { + Future.successful(Bad(TransactionNotFoundError).accumulating) + } + } + } + + val input = List(tx.id, pending.id) + val service = create(xsnService, null) + whenReady(service.getRPCTransactions(input)) { result => + val expected = List(tx, pending) + result.toEither.right.value must be(expected) + } } } @@ -32,7 +198,7 @@ class TransactionCollectorServiceSpec extends WordSpec { } } - "getRPCTransactionsWithValues" should { + "completeValues" should { "work" in { pending } @@ -43,4 +209,21 @@ class TransactionCollectorServiceSpec extends WordSpec { pending } } + + def createScript(address: Address) = { + ScriptPubKey("nulldata", "", HexString.from("00").get, List(address)) + } + + def createTransaction(txid: TransactionId, outputs: List[rpc.TransactionVOUT]) = { + rpc.Transaction( + id = txid, + size = Size(100), + blockhash = DataGenerator.randomBlockhash, + time = 0L, + blocktime = 0L, + confirmations = Confirmations(0), + vin = List.empty[rpc.TransactionVIN], + vout = outputs + ) + } }