Skip to content

Commit

Permalink
Merge pull request #295 from eosnetworkfoundation/unapplied-trx
Browse files Browse the repository at this point in the history
Refactor unapplied transaction queue
  • Loading branch information
heifner authored May 26, 2022
2 parents aef77f1 + 83d23e7 commit 16a39ce
Show file tree
Hide file tree
Showing 5 changed files with 571 additions and 116 deletions.
140 changes: 101 additions & 39 deletions libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#pragma once

#include <eosio/chain/transaction_metadata.hpp>
#include <eosio/chain/trace.hpp>
#include <eosio/chain/block_state.hpp>
#include <eosio/chain/exceptions.hpp>

#include <boost/multi_index_container.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>

Expand All @@ -23,13 +23,19 @@ enum class trx_enum_type {
unknown = 0,
persisted = 1,
forked = 2,
aborted = 3
aborted = 3,
incoming_persisted = 4,
incoming = 5 // incoming_end() needs to be updated if this changes
};

using next_func_t = std::function<void(const std::variant<fc::exception_ptr, transaction_trace_ptr>&)>;

struct unapplied_transaction {
const transaction_metadata_ptr trx_meta;
const fc::time_point expiry;
trx_enum_type trx_type = trx_enum_type::unknown;
bool return_failure_trace = false;
next_func_t next;

const transaction_id_type& id()const { return trx_meta->id(); }

Expand All @@ -44,13 +50,6 @@ struct unapplied_transaction {
* Persisted are first so that they can be applied in each block until expired.
*/
class unapplied_transaction_queue {
public:
enum class process_mode {
non_speculative, // HEAD, READ_ONLY, IRREVERSIBLE
speculative_non_producer, // will never produce
speculative_producer // can produce
};

private:
struct by_trx_id;
struct by_type;
Expand All @@ -67,16 +66,13 @@ class unapplied_transaction_queue {
> unapplied_trx_queue_type;

unapplied_trx_queue_type queue;
process_mode mode = process_mode::speculative_producer;
uint64_t max_transaction_queue_size = 1024*1024*1024; // enforced for incoming
uint64_t size_in_bytes = 0;
size_t incoming_count = 0;

public:

void set_mode( process_mode new_mode ) {
if( new_mode != mode ) {
FC_ASSERT( empty(), "set_mode, queue required to be empty" );
}
mode = new_mode;
}
void set_max_transaction_queue_size( uint64_t v ) { max_transaction_queue_size = v; }

bool empty() const {
return queue.empty();
Expand All @@ -90,14 +86,8 @@ class unapplied_transaction_queue {
queue.clear();
}

bool contains_persisted()const {
return queue.get<by_type>().find( trx_enum_type::persisted ) != queue.get<by_type>().end();
}

bool is_persisted(const transaction_metadata_ptr& trx)const {
auto itr = queue.get<by_trx_id>().find( trx->id() );
if( itr == queue.get<by_trx_id>().end() ) return false;
return itr->trx_type == trx_enum_type::persisted;
size_t incoming_size()const {
return incoming_count;
}

transaction_metadata_ptr get_trx( const transaction_id_type& id ) const {
Expand All @@ -109,12 +99,24 @@ class unapplied_transaction_queue {
template <typename Func>
bool clear_expired( const time_point& pending_block_time, const time_point& deadline, Func&& callback ) {
auto& persisted_by_expiry = queue.get<by_expiry>();
while(!persisted_by_expiry.empty() && persisted_by_expiry.begin()->expiry <= pending_block_time) {
if (deadline <= fc::time_point::now()) {
while( !persisted_by_expiry.empty() ) {
const auto& itr = persisted_by_expiry.begin();
if( itr->expiry > pending_block_time ) {
break;
}
if( deadline <= fc::time_point::now() ) {
return false;
}
callback( persisted_by_expiry.begin()->trx_meta->packed_trx(), persisted_by_expiry.begin()->trx_type );
persisted_by_expiry.erase( persisted_by_expiry.begin() );
callback( itr->trx_meta->packed_trx(), itr->trx_type );
if( itr->next ) {
itr->next( std::static_pointer_cast<fc::exception>(
std::make_shared<expired_tx_exception>(
FC_LOG_MESSAGE( error, "expired transaction ${id}, expiration ${e}, block time ${bt}",
("id", itr->id())("e", itr->trx_meta->packed_trx()->expiration())
("bt", pending_block_time) ) ) ) );
}
removed( itr );
persisted_by_expiry.erase( itr );
}
return true;
}
Expand All @@ -125,59 +127,119 @@ class unapplied_transaction_queue {
for( const auto& receipt : bs->block->transactions ) {
if( std::holds_alternative<packed_transaction>(receipt.trx) ) {
const auto& pt = std::get<packed_transaction>(receipt.trx);
auto itr = queue.get<by_trx_id>().find( pt.id() );
if( itr != queue.get<by_trx_id>().end() ) {
if( itr->trx_type != trx_enum_type::persisted ) {
idx.erase( pt.id() );
auto itr = idx.find( pt.id() );
if( itr != idx.end() ) {
if( itr->next ) {
itr->next( std::static_pointer_cast<fc::exception>( std::make_shared<tx_duplicate>(
FC_LOG_MESSAGE( info, "duplicate transaction ${id}", ("id", itr->trx_meta->id())))));
}
removed( itr );
idx.erase( itr );
}
}
}
}

void add_forked( const branch_type& forked_branch ) {
if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return;
// forked_branch is in reverse order
for( auto ritr = forked_branch.rbegin(), rend = forked_branch.rend(); ritr != rend; ++ritr ) {
const block_state_ptr& bsptr = *ritr;
for( auto itr = bsptr->trxs_metas().begin(), end = bsptr->trxs_metas().end(); itr != end; ++itr ) {
const auto& trx = *itr;
fc::time_point expiry = trx->packed_trx()->expiration();
queue.insert( { trx, expiry, trx_enum_type::forked } );
auto insert_itr = queue.insert( { trx, expiry, trx_enum_type::forked } );
if( insert_itr.second ) added( insert_itr.first );
}
}
}

void add_aborted( std::vector<transaction_metadata_ptr> aborted_trxs ) {
if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return;
for( auto& trx : aborted_trxs ) {
fc::time_point expiry = trx->packed_trx()->expiration();
queue.insert( { std::move( trx ), expiry, trx_enum_type::aborted } );
auto insert_itr = queue.insert( { std::move( trx ), expiry, trx_enum_type::aborted } );
if( insert_itr.second ) added( insert_itr.first );
}
}

void add_persisted( const transaction_metadata_ptr& trx ) {
if( mode == process_mode::non_speculative ) return;
auto itr = queue.get<by_trx_id>().find( trx->id() );
if( itr == queue.get<by_trx_id>().end() ) {
fc::time_point expiry = trx->packed_trx()->expiration();
queue.insert( { trx, expiry, trx_enum_type::persisted } );
auto insert_itr = queue.insert( { trx, expiry, trx_enum_type::persisted } );
if( insert_itr.second ) added( insert_itr.first );
} else if( itr->trx_type != trx_enum_type::persisted ) {
if (itr->trx_type == trx_enum_type::incoming || itr->trx_type == trx_enum_type::incoming_persisted)
--incoming_count;
queue.get<by_trx_id>().modify( itr, [](auto& un){
un.trx_type = trx_enum_type::persisted;
} );
}
}

void add_incoming( const transaction_metadata_ptr& trx, bool persist_until_expired, bool return_failure_trace, next_func_t next ) {
auto itr = queue.get<by_trx_id>().find( trx->id() );
if( itr == queue.get<by_trx_id>().end() ) {
fc::time_point expiry = trx->packed_trx()->expiration();
auto insert_itr = queue.insert(
{ trx, expiry, persist_until_expired ? trx_enum_type::incoming_persisted : trx_enum_type::incoming, return_failure_trace, std::move( next ) } );
if( insert_itr.second ) added( insert_itr.first );
} else {
if( itr->trx_meta == trx ) return; // same trx meta pointer
if( next ) {
next( std::static_pointer_cast<fc::exception>( std::make_shared<tx_duplicate>(
FC_LOG_MESSAGE( info, "duplicate transaction ${id}", ("id", trx->id()) ) ) ) );
}
}
}

using iterator = unapplied_trx_queue_type::index<by_type>::type::iterator;

iterator begin() { return queue.get<by_type>().begin(); }
iterator end() { return queue.get<by_type>().end(); }

// persisted, forked, aborted
iterator unapplied_begin() { return queue.get<by_type>().begin(); }
iterator unapplied_end() { return queue.get<by_type>().upper_bound( trx_enum_type::aborted ); }

iterator persisted_begin() { return queue.get<by_type>().lower_bound( trx_enum_type::persisted ); }
iterator persisted_end() { return queue.get<by_type>().upper_bound( trx_enum_type::persisted ); }

iterator erase( iterator itr ) { return queue.get<by_type>().erase( itr ); }
iterator incoming_begin() { return queue.get<by_type>().lower_bound( trx_enum_type::incoming_persisted ); }
iterator incoming_end() { return queue.get<by_type>().end(); } // if changed to upper_bound, verify usage performance

/// caller's responsibility to call next() if applicable
iterator erase( iterator itr ) {
removed( itr );
return queue.get<by_type>().erase( itr );
}

private:
template<typename Itr>
void added( Itr itr ) {
auto size = calc_size( itr->trx_meta );
if( itr->trx_type == trx_enum_type::incoming || itr->trx_type == trx_enum_type::incoming_persisted ) {
++incoming_count;
EOS_ASSERT( size_in_bytes + size < max_transaction_queue_size, tx_resource_exhaustion,
"Transaction ${id}, size ${s} bytes would exceed configured "
"incoming-transaction-queue-size-mb ${qs}, current queue size ${cs} bytes",
("id", itr->trx_meta->id())("s", size)("qs", max_transaction_queue_size/(1024*1024))
("cs", size_in_bytes) );
}
size_in_bytes += size;
}

template<typename Itr>
void removed( Itr itr ) {
if( itr->trx_type == trx_enum_type::incoming || itr->trx_type == trx_enum_type::incoming_persisted ) {
--incoming_count;
}
size_in_bytes -= calc_size( itr->trx_meta );
}

static uint64_t calc_size( const transaction_metadata_ptr& trx ) {
// packed_trx caches unpacked transaction so double
return (trx->packed_trx()->get_unprunable_size() + trx->packed_trx()->get_prunable_size()) * 2 + sizeof( *trx );
}

};

Expand Down
Loading

0 comments on commit 16a39ce

Please sign in to comment.