diff --git a/irohad/network/impl/peer_communication_service_impl.cpp b/irohad/network/impl/peer_communication_service_impl.cpp index 58a029a000..3dddd7bb66 100644 --- a/irohad/network/impl/peer_communication_service_impl.cpp +++ b/irohad/network/impl/peer_communication_service_impl.cpp @@ -16,6 +16,8 @@ limitations under the License. #include "network/impl/peer_communication_service_impl.hpp" +#include "interfaces/iroha_internal/transaction_batch.hpp" + namespace iroha { namespace network { PeerCommunicationServiceImpl::PeerCommunicationServiceImpl( @@ -30,12 +32,20 @@ namespace iroha { } void PeerCommunicationServiceImpl::propagate_transaction( - std::shared_ptr - transaction) { + std::shared_ptr transaction) + const { log_->info("propagate tx"); ordering_gate_->propagateTransaction(transaction); } + void PeerCommunicationServiceImpl::propagate_batch( + const shared_model::interface::TransactionBatch &batch) const { + log_->info("propagate batch"); + for (const auto tx : batch.transactions()) { + ordering_gate_->propagateTransaction(tx); + } + } + rxcpp::observable> PeerCommunicationServiceImpl::on_proposal() const { return ordering_gate_->on_proposal(); diff --git a/irohad/network/impl/peer_communication_service_impl.hpp b/irohad/network/impl/peer_communication_service_impl.hpp index 8ac586efde..489236a092 100644 --- a/irohad/network/impl/peer_communication_service_impl.hpp +++ b/irohad/network/impl/peer_communication_service_impl.hpp @@ -37,7 +37,10 @@ namespace iroha { void propagate_transaction( std::shared_ptr - transaction) override; + transaction) const override; + + void propagate_batch(const shared_model::interface::TransactionBatch + &batch) const override; rxcpp::observable> on_proposal() const override; diff --git a/irohad/network/peer_communication_service.hpp b/irohad/network/peer_communication_service.hpp index be4243abc1..6a6bf6c6da 100644 --- a/irohad/network/peer_communication_service.hpp +++ b/irohad/network/peer_communication_service.hpp @@ -27,6 +27,7 @@ namespace shared_model { class Block; class Proposal; class Transaction; + class TransactionBatch; } // namespace interface } // namespace shared_model @@ -48,7 +49,14 @@ namespace iroha { */ virtual void propagate_transaction( std::shared_ptr - transaction) = 0; + transaction) const = 0; + + /** + * Propagate batch to the network + * @param batch - batch for propagation + */ + virtual void propagate_batch( + const shared_model::interface::TransactionBatch &batch) const = 0; /** * Event is triggered when proposal arrives from network. diff --git a/irohad/torii/processor/impl/transaction_processor_impl.cpp b/irohad/torii/processor/impl/transaction_processor_impl.cpp index f53c994dcd..332bec52de 100644 --- a/irohad/torii/processor/impl/transaction_processor_impl.cpp +++ b/irohad/torii/processor/impl/transaction_processor_impl.cpp @@ -22,6 +22,7 @@ #include "backend/protobuf/transaction.hpp" #include "interfaces/iroha_internal/block.hpp" #include "interfaces/iroha_internal/proposal.hpp" +#include "interfaces/iroha_internal/transaction_sequence.hpp" #include "validation/stateful_validator_common.hpp" namespace iroha { @@ -146,7 +147,8 @@ namespace iroha { } void TransactionProcessorImpl::transactionHandle( - std::shared_ptr transaction) { + std::shared_ptr transaction) + const { log_->info("handle transaction"); if (boost::size(transaction->signatures()) < transaction->quorum()) { log_->info("waiting for quorum signatures"); @@ -158,6 +160,22 @@ namespace iroha { pcs_->propagate_transaction(transaction); } + void TransactionProcessorImpl::transactionSequenceHandle( + const shared_model::interface::TransactionSequence + &transaction_sequence) const { + for (const auto &batch : transaction_sequence.batches()) { + if (batch.hasAllSignatures()) { + pcs_->propagate_batch(batch); + } else { + // TODO kamilsa 16.07.18 propagate full batch to mst when its + // interface is updated + for (const auto tx : batch.transactions()) { + mst_processor_->propagateTransaction(tx); + } + } + } + } + rxcpp::observable< std::shared_ptr> TransactionProcessorImpl::transactionNotifier() { diff --git a/irohad/torii/processor/transaction_processor.hpp b/irohad/torii/processor/transaction_processor.hpp index 3892ae10d6..e40b9dbfc1 100644 --- a/irohad/torii/processor/transaction_processor.hpp +++ b/irohad/torii/processor/transaction_processor.hpp @@ -24,6 +24,7 @@ namespace shared_model { namespace interface { class Transaction; class TransactionResponse; + class TransactionSequence; } // namespace interface } // namespace shared_model @@ -42,7 +43,16 @@ namespace iroha { */ virtual void transactionHandle( std::shared_ptr - transaction) = 0; + transaction) const = 0; + + /** + * Process transaction sequence and propagate batches from it either to + * the MST or PCS + * @param transaction_sequence - transaction sequence for processing + */ + virtual void transactionSequenceHandle( + const shared_model::interface::TransactionSequence + &transaction_sequence) const = 0; /** * Subscribers will be notified with transaction status diff --git a/irohad/torii/processor/transaction_processor_impl.hpp b/irohad/torii/processor/transaction_processor_impl.hpp index 6f8915ce7a..7a2ff0c53e 100644 --- a/irohad/torii/processor/transaction_processor_impl.hpp +++ b/irohad/torii/processor/transaction_processor_impl.hpp @@ -40,7 +40,11 @@ namespace iroha { void transactionHandle( std::shared_ptr transaction) - override; + const override; + + void transactionSequenceHandle( + const shared_model::interface::TransactionSequence + &transaction_sequence) const override; rxcpp::observable< std::shared_ptr> diff --git a/test/framework/batch_helper.hpp b/test/framework/batch_helper.hpp index 58cd0706b2..929747bcb1 100644 --- a/test/framework/batch_helper.hpp +++ b/test/framework/batch_helper.hpp @@ -7,7 +7,11 @@ #define IROHA_BATCH_HELPER_HPP #include + +#include "framework/result_fixture.hpp" +#include "interfaces/iroha_internal/transaction_batch.hpp" #include "module/shared_model/builders/protobuf/test_transaction_builder.hpp" +#include "validators/transactions_collection/batch_order_validator.hpp" namespace framework { namespace batch { @@ -165,6 +169,32 @@ namespace framework { return createUnsignedBatchTransactions(batch_type, creators, now); } + auto createValidBatch(const size_t &size) { + using namespace shared_model::validation; + using TxValidator = + TransactionValidator>; + + using TxsValidator = + UnsignedTransactionsCollectionValidator; + + auto batch_type = shared_model::interface::types::BatchType::ATOMIC; + std::vector> + transaction_fields; + for (size_t i = 0; i < size; ++i) { + transaction_fields.push_back(std::make_pair( + batch_type, "account" + std::to_string(i) + "@domain")); + } + + auto txs = createBatchOneSignTransactions(transaction_fields); + auto result_batch = + shared_model::interface::TransactionBatch::createTransactionBatch( + txs, TxsValidator()); + + return framework::expected::val(result_batch).value().value; + } + } // namespace batch } // namespace framework diff --git a/test/module/irohad/network/network_mocks.hpp b/test/module/irohad/network/network_mocks.hpp index 62c540a10b..b37eac58a2 100644 --- a/test/module/irohad/network/network_mocks.hpp +++ b/test/module/irohad/network/network_mocks.hpp @@ -19,6 +19,7 @@ #define IROHA_NETWORK_MOCKS_HPP #include +#include "interfaces/iroha_internal/transaction_batch.hpp" #include "network/block_loader.hpp" #include "network/consensus_gate.hpp" #include "network/ordering_gate.hpp" @@ -34,10 +35,13 @@ namespace iroha { namespace network { class MockPeerCommunicationService : public PeerCommunicationService { public: - MOCK_METHOD1( + MOCK_CONST_METHOD1( propagate_transaction, void(std::shared_ptr)); + MOCK_CONST_METHOD1(propagate_batch, + void(const shared_model::interface::TransactionBatch &)); + MOCK_CONST_METHOD0( on_proposal, rxcpp::observable< diff --git a/test/module/irohad/torii/processor/transaction_processor_test.cpp b/test/module/irohad/torii/processor/transaction_processor_test.cpp index 157923f328..4765b570a0 100644 --- a/test/module/irohad/torii/processor/transaction_processor_test.cpp +++ b/test/module/irohad/torii/processor/transaction_processor_test.cpp @@ -7,8 +7,10 @@ #include "builders/protobuf/common_objects/proto_signature_builder.hpp" #include "builders/protobuf/proposal.hpp" #include "builders/protobuf/transaction.hpp" +#include "framework/batch_helper.hpp" #include "framework/specified_visitor.hpp" #include "framework/test_subscriber.hpp" +#include "interfaces/iroha_internal/transaction_sequence.hpp" #include "module/irohad/multi_sig_transactions/mst_mocks.hpp" #include "module/irohad/network/network_mocks.hpp" #include "module/shared_model/builders/protobuf/test_block_builder.hpp" @@ -140,6 +142,66 @@ TEST_F(TransactionProcessorTest, TransactionProcessorOnProposalTest) { validateStatuses(txs); } +/** + * @given transactions from the same batch + * @when transactions sequence is created and propagated @and all transactions + * were returned by pcs in proposal notifier + * @then all transactions have stateless valid status + */ +TEST_F(TransactionProcessorTest, TransactionProcessorOnProposalBatchTest) { + using namespace shared_model::validation; + using TxValidator = + TransactionValidator>; + + using TxsValidator = + UnsignedTransactionsCollectionValidator; + + auto transactions = + framework::batch::createValidBatch(proposal_size).transactions(); + + auto wrapper = + make_test_subscriber(tp->transactionNotifier(), proposal_size); + wrapper.subscribe([this](auto response) { + status_map[response->transactionHash()] = response; + }); + + auto transaction_sequence_result = + shared_model::interface::TransactionSequence::createTransactionSequence( + transactions, TxsValidator()); + auto transaction_sequence = + framework::expected::val(transaction_sequence_result).value().value; + + EXPECT_CALL(*mp, propagateTransactionImpl(_)).Times(0); + EXPECT_CALL(*pcs, propagate_batch(_)) + .Times(transaction_sequence.batches().size()); + + tp->transactionSequenceHandle(transaction_sequence); + + // create proposal from sequence transactions and notify about it + std::vector proto_transactions; + + std::transform( + transactions.begin(), + transactions.end(), + std::back_inserter(proto_transactions), + [](const auto tx) { + return *std::static_pointer_cast(tx); + }); + + auto proposal = std::make_shared( + TestProposalBuilder().transactions(proto_transactions).build()); + + prop_notifier.get_subscriber().on_next(proposal); + prop_notifier.get_subscriber().on_completed(); + + ASSERT_TRUE(wrapper.validate()); + + SCOPED_TRACE("Stateless valid status verification"); + validateStatuses( + proto_transactions); +} + /** * @given transaction processor * @when transactions compose proposal which is sent to peer diff --git a/test/module/irohad/torii/torii_service_test.cpp b/test/module/irohad/torii/torii_service_test.cpp index 3f1f35cfcf..c28deb1aa1 100644 --- a/test/module/irohad/torii/torii_service_test.cpp +++ b/test/module/irohad/torii/torii_service_test.cpp @@ -62,7 +62,10 @@ class CustomPeerCommunicationServiceMock : public PeerCommunicationService { void propagate_transaction( std::shared_ptr transaction) - override {} + const override {} + + void propagate_batch( + const shared_model::interface::TransactionBatch &batch) const override {} rxcpp::observable> on_proposal() const override {