Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor unapplied transaction queue #295

Merged
merged 19 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d3a3602
Merge pull request #8043 from EOSIO/incoming-trx-handling
heifner Oct 16, 2019
36161d0
Always call next() since net_plugin needs callback for tracking trans…
heifner Nov 2, 2019
2bb4e34
Guarantee that invariant of next function is called. Indicate duplica…
heifner Nov 5, 2019
9c6cc68
Always call next if set so that http does not have to wait for expira…
heifner May 27, 2020
eb9facc
EPE-145: unapplied_transaction_queue incorrectly caches incoming_count
vzqzhang Jun 18, 2020
b17b6b9
fix an error in the unit test
vzqzhang Jun 18, 2020
2852050
change the comment
vzqzhang Jun 18, 2020
f880f9b
Keep unapplied trx around for faster apply block
heifner Oct 21, 2020
5a74319
When speculatively executing last block doesn't apply
heifner Oct 22, 2020
5fca32e
During speculative execution, restart block if current block is exhau…
heifner Oct 22, 2020
66fec95
Remove process_mode as it is not needed
heifner Oct 22, 2020
d2089e2
Fix merge issue
heifner May 21, 2022
cee6013
Track and use return_failure_trace in unapplied transaction queue.
jgiszczak Aug 31, 2021
ae1e8bc
Default return_failure_trace to false in unapplied transaction queue
jgiszczak Aug 31, 2021
e80ed93
Fix handling of duplicate trx in unapplied trx queue. Honor return_fa…
heifner Sep 2, 2021
dc9abf3
Fail the current duplicate instead of previous
heifner Sep 2, 2021
9937e6d
call q.begin and q.end, instead of q.unapplied_begin and q.unapplied_…
vzqzhang Jun 19, 2020
f0bc64c
Fix merge issues
heifner May 21, 2022
83d23e7
No need to keep persisted after applied in a block. The idea before w…
heifner May 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 118 additions & 39 deletions libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#pragma once

#include <eosio/chain/transaction_metadata.hpp>
#include <eosio/chain/trace.hpp>
#include <eosio/chain/block_state.hpp>
#include <eosio/chain/exceptions.hpp>

#include <boost/multi_index_container.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>

Expand All @@ -23,13 +23,19 @@ enum class trx_enum_type {
unknown = 0,
persisted = 1,
forked = 2,
aborted = 3
aborted = 3,
incoming_persisted = 4,
incoming = 5 // incoming_end() needs to be updated if this changes
};

using next_func_t = std::function<void(const std::variant<fc::exception_ptr, transaction_trace_ptr>&)>;

struct unapplied_transaction {
const transaction_metadata_ptr trx_meta;
const fc::time_point expiry;
trx_enum_type trx_type = trx_enum_type::unknown;
bool return_failure_trace = false;
next_func_t next;

const transaction_id_type& id()const { return trx_meta->id(); }

Expand All @@ -44,13 +50,6 @@ struct unapplied_transaction {
* Persisted are first so that they can be applied in each block until expired.
*/
class unapplied_transaction_queue {
public:
enum class process_mode {
non_speculative, // HEAD, READ_ONLY, IRREVERSIBLE
speculative_non_producer, // will never produce
speculative_producer // can produce
};

private:
struct by_trx_id;
struct by_type;
Expand All @@ -67,16 +66,13 @@ class unapplied_transaction_queue {
> unapplied_trx_queue_type;

unapplied_trx_queue_type queue;
process_mode mode = process_mode::speculative_producer;
uint64_t max_transaction_queue_size = 1024*1024*1024; // enforced for incoming
uint64_t size_in_bytes = 0;
size_t incoming_count = 0;

public:

void set_mode( process_mode new_mode ) {
if( new_mode != mode ) {
FC_ASSERT( empty(), "set_mode, queue required to be empty" );
}
mode = new_mode;
}
void set_max_transaction_queue_size( uint64_t v ) { max_transaction_queue_size = v; }

bool empty() const {
return queue.empty();
Expand All @@ -90,14 +86,8 @@ class unapplied_transaction_queue {
queue.clear();
}

bool contains_persisted()const {
return queue.get<by_type>().find( trx_enum_type::persisted ) != queue.get<by_type>().end();
}

bool is_persisted(const transaction_metadata_ptr& trx)const {
auto itr = queue.get<by_trx_id>().find( trx->id() );
if( itr == queue.get<by_trx_id>().end() ) return false;
return itr->trx_type == trx_enum_type::persisted;
size_t incoming_size()const {
return incoming_count;
}

transaction_metadata_ptr get_trx( const transaction_id_type& id ) const {
Expand All @@ -109,12 +99,24 @@ class unapplied_transaction_queue {
template <typename Func>
bool clear_expired( const time_point& pending_block_time, const time_point& deadline, Func&& callback ) {
auto& persisted_by_expiry = queue.get<by_expiry>();
while(!persisted_by_expiry.empty() && persisted_by_expiry.begin()->expiry <= pending_block_time) {
if (deadline <= fc::time_point::now()) {
while( !persisted_by_expiry.empty() ) {
const auto& itr = persisted_by_expiry.begin();
if( itr->expiry > pending_block_time ) {
break;
}
if( deadline <= fc::time_point::now() ) {
return false;
}
callback( persisted_by_expiry.begin()->trx_meta->packed_trx(), persisted_by_expiry.begin()->trx_type );
persisted_by_expiry.erase( persisted_by_expiry.begin() );
callback( itr->trx_meta->packed_trx(), itr->trx_type );
if( itr->next ) {
itr->next( std::static_pointer_cast<fc::exception>(
std::make_shared<expired_tx_exception>(
FC_LOG_MESSAGE( error, "expired transaction ${id}, expiration ${e}, block time ${bt}",
("id", itr->id())("e", itr->trx_meta->packed_trx()->expiration())
("bt", pending_block_time) ) ) ) );
}
removed( itr );
persisted_by_expiry.erase( itr );
}
return true;
}
Expand All @@ -125,59 +127,136 @@ class unapplied_transaction_queue {
for( const auto& receipt : bs->block->transactions ) {
if( std::holds_alternative<packed_transaction>(receipt.trx) ) {
const auto& pt = std::get<packed_transaction>(receipt.trx);
auto itr = queue.get<by_trx_id>().find( pt.id() );
if( itr != queue.get<by_trx_id>().end() ) {
if( itr->trx_type != trx_enum_type::persisted ) {
idx.erase( pt.id() );
auto itr = idx.find( pt.id() );
if( itr != idx.end() ) {
if( itr->next ) {
itr->next( std::static_pointer_cast<fc::exception>( std::make_shared<tx_duplicate>(
FC_LOG_MESSAGE( info, "duplicate transaction ${id}", ("id", itr->trx_meta->id())))));
}
if( itr->trx_type != trx_enum_type::persisted &&
itr->trx_type != trx_enum_type::incoming_persisted ) {
removed( itr );
idx.erase( itr );
} else if( itr->next ) {
idx.modify( itr, [](auto& un){
un.next = nullptr;
} );
}
}
}
}
}

void add_forked( const branch_type& forked_branch ) {
if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return;
// forked_branch is in reverse order
for( auto ritr = forked_branch.rbegin(), rend = forked_branch.rend(); ritr != rend; ++ritr ) {
const block_state_ptr& bsptr = *ritr;
for( auto itr = bsptr->trxs_metas().begin(), end = bsptr->trxs_metas().end(); itr != end; ++itr ) {
const auto& trx = *itr;
fc::time_point expiry = trx->packed_trx()->expiration();
queue.insert( { trx, expiry, trx_enum_type::forked } );
auto insert_itr = queue.insert( { trx, expiry, trx_enum_type::forked } );
if( insert_itr.second ) added( insert_itr.first );
}
}
}

void add_aborted( std::vector<transaction_metadata_ptr> aborted_trxs ) {
if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return;
for( auto& trx : aborted_trxs ) {
fc::time_point expiry = trx->packed_trx()->expiration();
queue.insert( { std::move( trx ), expiry, trx_enum_type::aborted } );
auto insert_itr = queue.insert( { std::move( trx ), expiry, trx_enum_type::aborted } );
if( insert_itr.second ) added( insert_itr.first );
}
}

void add_persisted( const transaction_metadata_ptr& trx ) {
if( mode == process_mode::non_speculative ) return;
auto itr = queue.get<by_trx_id>().find( trx->id() );
if( itr == queue.get<by_trx_id>().end() ) {
fc::time_point expiry = trx->packed_trx()->expiration();
queue.insert( { trx, expiry, trx_enum_type::persisted } );
auto insert_itr = queue.insert( { trx, expiry, trx_enum_type::persisted } );
if( insert_itr.second ) added( insert_itr.first );
} else if( itr->trx_type != trx_enum_type::persisted ) {
if (itr->trx_type == trx_enum_type::incoming || itr->trx_type == trx_enum_type::incoming_persisted)
--incoming_count;
queue.get<by_trx_id>().modify( itr, [](auto& un){
un.trx_type = trx_enum_type::persisted;
} );
}
}

void add_incoming( const transaction_metadata_ptr& trx, bool persist_until_expired, bool return_failure_trace, next_func_t next ) {
auto itr = queue.get<by_trx_id>().find( trx->id() );
if( itr == queue.get<by_trx_id>().end() ) {
fc::time_point expiry = trx->packed_trx()->expiration();
auto insert_itr = queue.insert(
{ trx, expiry, persist_until_expired ? trx_enum_type::incoming_persisted : trx_enum_type::incoming, return_failure_trace, std::move( next ) } );
if( insert_itr.second ) added( insert_itr.first );
} else {
if( !(itr->trx_meta == trx) && next ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that this logic makes sense, and the comment on the next line seems wrong. I think at the very least we should return whether or not next is defined here (and will we ever not have next set??).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

// next will be updated in modify() below, notify previous next of duplicate
next( std::static_pointer_cast<fc::exception>( std::make_shared<tx_duplicate>(
FC_LOG_MESSAGE( info, "duplicate transaction ${id}", ("id", trx->id()) ) ) ) );
return;
}

if (itr->trx_type != trx_enum_type::incoming && itr->trx_type != trx_enum_type::incoming_persisted)
++incoming_count;

queue.get<by_trx_id>().modify( itr, [persist_until_expired, return_failure_trace, next{std::move(next)}](auto& un) mutable {
un.trx_type = persist_until_expired ? trx_enum_type::incoming_persisted : trx_enum_type::incoming;
un.return_failure_trace = return_failure_trace;
un.next = std::move( next );
} );
}
}

using iterator = unapplied_trx_queue_type::index<by_type>::type::iterator;

iterator begin() { return queue.get<by_type>().begin(); }
iterator end() { return queue.get<by_type>().end(); }

// persisted, forked, aborted
iterator unapplied_begin() { return queue.get<by_type>().begin(); }
iterator unapplied_end() { return queue.get<by_type>().upper_bound( trx_enum_type::aborted ); }

iterator persisted_begin() { return queue.get<by_type>().lower_bound( trx_enum_type::persisted ); }
iterator persisted_end() { return queue.get<by_type>().upper_bound( trx_enum_type::persisted ); }

iterator erase( iterator itr ) { return queue.get<by_type>().erase( itr ); }
iterator incoming_begin() { return queue.get<by_type>().lower_bound( trx_enum_type::incoming_persisted ); }
iterator incoming_end() { return queue.get<by_type>().end(); } // if changed to upper_bound, verify usage performance

/// caller's responsibilty to call next() if applicable
iterator erase( iterator itr ) {
removed( itr );
return queue.get<by_type>().erase( itr );
}

private:
template<typename Itr>
void added( Itr itr ) {
auto size = calc_size( itr->trx_meta );
if( itr->trx_type == trx_enum_type::incoming || itr->trx_type == trx_enum_type::incoming_persisted ) {
++incoming_count;
EOS_ASSERT( size_in_bytes + size < max_transaction_queue_size, tx_resource_exhaustion,
"Transaction ${id}, size ${s} bytes would exceed configured "
"incoming-transaction-queue-size-mb ${qs}, current queue size ${cs} bytes",
("id", itr->trx_meta->id())("s", size)("qs", max_transaction_queue_size/(1024*1024))
("cs", size_in_bytes) );
}
size_in_bytes += size;
}

template<typename Itr>
void removed( Itr itr ) {
if( itr->trx_type == trx_enum_type::incoming || itr->trx_type == trx_enum_type::incoming_persisted ) {
--incoming_count;
}
size_in_bytes -= calc_size( itr->trx_meta );
}

static uint64_t calc_size( const transaction_metadata_ptr& trx ) {
// packed_trx caches unpacked transaction so double
return (trx->packed_trx()->get_unprunable_size() + trx->packed_trx()->get_prunable_size()) * 2 + sizeof( *trx );
}

};

Expand Down
Loading