Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Batch to pcs and tx processor (#1571)
Browse files Browse the repository at this point in the history
* Batch to pcs and tx processor

Signed-off-by: kamilsa <[email protected]>
  • Loading branch information
kamilsa authored and l4l committed Jul 25, 2018
1 parent d4aedeb commit b9a3c7d
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 9 deletions.
14 changes: 12 additions & 2 deletions irohad/network/impl/peer_communication_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

#include "network/impl/peer_communication_service_impl.hpp"

#include "interfaces/iroha_internal/transaction_batch.hpp"

namespace iroha {
namespace network {
PeerCommunicationServiceImpl::PeerCommunicationServiceImpl(
Expand All @@ -30,12 +32,20 @@ namespace iroha {
}

void PeerCommunicationServiceImpl::propagate_transaction(
std::shared_ptr<const shared_model::interface::Transaction>
transaction) {
std::shared_ptr<const shared_model::interface::Transaction> transaction)
const {
log_->info("propagate tx");
ordering_gate_->propagateTransaction(transaction);
}

void PeerCommunicationServiceImpl::propagate_batch(
const shared_model::interface::TransactionBatch &batch) const {
log_->info("propagate batch");
for (const auto tx : batch.transactions()) {
ordering_gate_->propagateTransaction(tx);
}
}

rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>
PeerCommunicationServiceImpl::on_proposal() const {
return ordering_gate_->on_proposal();
Expand Down
5 changes: 4 additions & 1 deletion irohad/network/impl/peer_communication_service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ namespace iroha {

void propagate_transaction(
std::shared_ptr<const shared_model::interface::Transaction>
transaction) override;
transaction) const override;

void propagate_batch(const shared_model::interface::TransactionBatch
&batch) const override;

rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>
on_proposal() const override;
Expand Down
10 changes: 9 additions & 1 deletion irohad/network/peer_communication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace shared_model {
class Block;
class Proposal;
class Transaction;
class TransactionBatch;
} // namespace interface
} // namespace shared_model

Expand All @@ -48,7 +49,14 @@ namespace iroha {
*/
virtual void propagate_transaction(
std::shared_ptr<const shared_model::interface::Transaction>
transaction) = 0;
transaction) const = 0;

/**
* Propagate batch to the network
* @param batch - batch for propagation
*/
virtual void propagate_batch(
const shared_model::interface::TransactionBatch &batch) const = 0;

/**
* Event is triggered when proposal arrives from network.
Expand Down
20 changes: 19 additions & 1 deletion irohad/torii/processor/impl/transaction_processor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "backend/protobuf/transaction.hpp"
#include "interfaces/iroha_internal/block.hpp"
#include "interfaces/iroha_internal/proposal.hpp"
#include "interfaces/iroha_internal/transaction_sequence.hpp"
#include "validation/stateful_validator_common.hpp"

namespace iroha {
Expand Down Expand Up @@ -146,7 +147,8 @@ namespace iroha {
}

void TransactionProcessorImpl::transactionHandle(
std::shared_ptr<shared_model::interface::Transaction> transaction) {
std::shared_ptr<shared_model::interface::Transaction> transaction)
const {
log_->info("handle transaction");
if (boost::size(transaction->signatures()) < transaction->quorum()) {
log_->info("waiting for quorum signatures");
Expand All @@ -158,6 +160,22 @@ namespace iroha {
pcs_->propagate_transaction(transaction);
}

void TransactionProcessorImpl::transactionSequenceHandle(
const shared_model::interface::TransactionSequence
&transaction_sequence) const {
for (const auto &batch : transaction_sequence.batches()) {
if (batch.hasAllSignatures()) {
pcs_->propagate_batch(batch);
} else {
// TODO kamilsa 16.07.18 propagate full batch to mst when its
// interface is updated
for (const auto tx : batch.transactions()) {
mst_processor_->propagateTransaction(tx);
}
}
}
}

rxcpp::observable<
std::shared_ptr<shared_model::interface::TransactionResponse>>
TransactionProcessorImpl::transactionNotifier() {
Expand Down
12 changes: 11 additions & 1 deletion irohad/torii/processor/transaction_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace shared_model {
namespace interface {
class Transaction;
class TransactionResponse;
class TransactionSequence;
} // namespace interface
} // namespace shared_model

Expand All @@ -42,7 +43,16 @@ namespace iroha {
*/
virtual void transactionHandle(
std::shared_ptr<shared_model::interface::Transaction>
transaction) = 0;
transaction) const = 0;

/**
* Process transaction sequence and propagate batches from it either to
* the MST or PCS
* @param transaction_sequence - transaction sequence for processing
*/
virtual void transactionSequenceHandle(
const shared_model::interface::TransactionSequence
&transaction_sequence) const = 0;

/**
* Subscribers will be notified with transaction status
Expand Down
6 changes: 5 additions & 1 deletion irohad/torii/processor/transaction_processor_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ namespace iroha {

void transactionHandle(
std::shared_ptr<shared_model::interface::Transaction> transaction)
override;
const override;

void transactionSequenceHandle(
const shared_model::interface::TransactionSequence
&transaction_sequence) const override;

rxcpp::observable<
std::shared_ptr<shared_model::interface::TransactionResponse>>
Expand Down
30 changes: 30 additions & 0 deletions test/framework/batch_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
#define IROHA_BATCH_HELPER_HPP

#include <boost/range/irange.hpp>

#include "framework/result_fixture.hpp"
#include "interfaces/iroha_internal/transaction_batch.hpp"
#include "module/shared_model/builders/protobuf/test_transaction_builder.hpp"
#include "validators/transactions_collection/batch_order_validator.hpp"

namespace framework {
namespace batch {
Expand Down Expand Up @@ -165,6 +169,32 @@ namespace framework {
return createUnsignedBatchTransactions(batch_type, creators, now);
}

auto createValidBatch(const size_t &size) {
using namespace shared_model::validation;
using TxValidator =
TransactionValidator<FieldValidator,
CommandValidatorVisitor<FieldValidator>>;

using TxsValidator =
UnsignedTransactionsCollectionValidator<TxValidator,
BatchOrderValidator>;

auto batch_type = shared_model::interface::types::BatchType::ATOMIC;
std::vector<std::pair<decltype(batch_type), std::string>>
transaction_fields;
for (size_t i = 0; i < size; ++i) {
transaction_fields.push_back(std::make_pair(
batch_type, "account" + std::to_string(i) + "@domain"));
}

auto txs = createBatchOneSignTransactions(transaction_fields);
auto result_batch =
shared_model::interface::TransactionBatch::createTransactionBatch(
txs, TxsValidator());

return framework::expected::val(result_batch).value().value;
}

} // namespace batch
} // namespace framework

Expand Down
6 changes: 5 additions & 1 deletion test/module/irohad/network/network_mocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define IROHA_NETWORK_MOCKS_HPP

#include <gmock/gmock.h>
#include "interfaces/iroha_internal/transaction_batch.hpp"
#include "network/block_loader.hpp"
#include "network/consensus_gate.hpp"
#include "network/ordering_gate.hpp"
Expand All @@ -34,10 +35,13 @@ namespace iroha {
namespace network {
class MockPeerCommunicationService : public PeerCommunicationService {
public:
MOCK_METHOD1(
MOCK_CONST_METHOD1(
propagate_transaction,
void(std::shared_ptr<const shared_model::interface::Transaction>));

MOCK_CONST_METHOD1(propagate_batch,
void(const shared_model::interface::TransactionBatch &));

MOCK_CONST_METHOD0(
on_proposal,
rxcpp::observable<
Expand Down
62 changes: 62 additions & 0 deletions test/module/irohad/torii/processor/transaction_processor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
#include "builders/protobuf/common_objects/proto_signature_builder.hpp"
#include "builders/protobuf/proposal.hpp"
#include "builders/protobuf/transaction.hpp"
#include "framework/batch_helper.hpp"
#include "framework/specified_visitor.hpp"
#include "framework/test_subscriber.hpp"
#include "interfaces/iroha_internal/transaction_sequence.hpp"
#include "module/irohad/multi_sig_transactions/mst_mocks.hpp"
#include "module/irohad/network/network_mocks.hpp"
#include "module/shared_model/builders/protobuf/test_block_builder.hpp"
Expand Down Expand Up @@ -140,6 +142,66 @@ TEST_F(TransactionProcessorTest, TransactionProcessorOnProposalTest) {
validateStatuses<shared_model::interface::StatelessValidTxResponse>(txs);
}

/**
* @given transactions from the same batch
* @when transactions sequence is created and propagated @and all transactions
* were returned by pcs in proposal notifier
* @then all transactions have stateless valid status
*/
TEST_F(TransactionProcessorTest, TransactionProcessorOnProposalBatchTest) {
using namespace shared_model::validation;
using TxValidator =
TransactionValidator<FieldValidator,
CommandValidatorVisitor<FieldValidator>>;

using TxsValidator =
UnsignedTransactionsCollectionValidator<TxValidator, BatchOrderValidator>;

auto transactions =
framework::batch::createValidBatch(proposal_size).transactions();

auto wrapper =
make_test_subscriber<CallExact>(tp->transactionNotifier(), proposal_size);
wrapper.subscribe([this](auto response) {
status_map[response->transactionHash()] = response;
});

auto transaction_sequence_result =
shared_model::interface::TransactionSequence::createTransactionSequence(
transactions, TxsValidator());
auto transaction_sequence =
framework::expected::val(transaction_sequence_result).value().value;

EXPECT_CALL(*mp, propagateTransactionImpl(_)).Times(0);
EXPECT_CALL(*pcs, propagate_batch(_))
.Times(transaction_sequence.batches().size());

tp->transactionSequenceHandle(transaction_sequence);

// create proposal from sequence transactions and notify about it
std::vector<shared_model::proto::Transaction> proto_transactions;

std::transform(
transactions.begin(),
transactions.end(),
std::back_inserter(proto_transactions),
[](const auto tx) {
return *std::static_pointer_cast<shared_model::proto::Transaction>(tx);
});

auto proposal = std::make_shared<shared_model::proto::Proposal>(
TestProposalBuilder().transactions(proto_transactions).build());

prop_notifier.get_subscriber().on_next(proposal);
prop_notifier.get_subscriber().on_completed();

ASSERT_TRUE(wrapper.validate());

SCOPED_TRACE("Stateless valid status verification");
validateStatuses<shared_model::interface::StatelessValidTxResponse>(
proto_transactions);
}

/**
* @given transaction processor
* @when transactions compose proposal which is sent to peer
Expand Down
5 changes: 4 additions & 1 deletion test/module/irohad/torii/torii_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ class CustomPeerCommunicationServiceMock : public PeerCommunicationService {

void propagate_transaction(
std::shared_ptr<const shared_model::interface::Transaction> transaction)
override {}
const override {}

void propagate_batch(
const shared_model::interface::TransactionBatch &batch) const override {}

rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>
on_proposal() const override {
Expand Down

0 comments on commit b9a3c7d

Please sign in to comment.