Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
#291, #421, #515, Address launcher bifurcated tenwork, finalize the l…
Browse files Browse the repository at this point in the history
…ocaition for test verifying this solution, suppressed debug output and added a config option to net plugin
  • Loading branch information
pmesnier committed Oct 5, 2017
1 parent ab96a06 commit e445707
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 40 deletions.
41 changes: 24 additions & 17 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ namespace eos {
vector<char> blk_buffer;
size_t message_size;

fc::sha256 remote_node_id;
fc::sha256 node_id;
handshake_message last_handshake;
deque<net_message> out_queue;
bool connecting;
Expand Down Expand Up @@ -394,7 +394,7 @@ namespace eos {
pending_message_size(0),
pending_message_buffer(recv_buf_size),
send_buffer(send_buf_size),
remote_node_id(),
node_id(),
last_handshake(),
out_queue(),
connecting (false),
Expand All @@ -418,7 +418,7 @@ namespace eos {
pending_message_size(0),
pending_message_buffer(recv_buf_size),
send_buffer(send_buf_size),
remote_node_id(),
node_id(),
last_handshake(),
out_queue(),
connecting (false),
Expand All @@ -439,7 +439,7 @@ namespace eos {
}

void connection::initialize () {
auto *rnd = remote_node_id.data();
auto *rnd = node_id.data();
rnd[0] = 0;
response_expected.reset(new boost::asio::steady_timer (app().get_io_service()));
}
Expand Down Expand Up @@ -592,15 +592,15 @@ namespace eos {

void connection::fetch_timeout( boost::system::error_code ec ) {
if( !ec ) {
dlog ("fetch timeout occurred");
// dlog ("fetch timeout occurred");
if( !( pending_fetch->req_trx.empty( ) || pending_fetch->req_blocks.empty( ) ) ) {
enqueue( ( request_message ) {vector<transaction_id_type>( ), vector<block_id_type>( )} );
my_impl->sync_master->reassign_fetch( shared_from_this( ) );
}
}
else if( ec == boost::asio::error::operation_aborted ) {
if( !connected( ) ) {
dlog ("fetch timeout was cancelled due to dead connection");
// dlog ("fetch timeout was cancelled due to dead connection");
my_impl->sync_master->reassign_fetch( shared_from_this( ) );
}
}
Expand Down Expand Up @@ -912,9 +912,9 @@ namespace eos {
}
}

if( c->remote_node_id != msg.node_id) {
if( c->node_id != msg.node_id) {
// c->reset();
c->remote_node_id = msg.node_id;
c->node_id = msg.node_id;
}

c->syncing = false;
Expand All @@ -923,16 +923,12 @@ namespace eos {
block_id_type head_id = cc.head_block_id();

if( msg.last_irreversible_block_num > head || sync_master->syncing() ) {
dlog("calling start_sync, myhead = ${h} their last_irreversable = ${mhn}",
( "h",head)("mhn",peer_lib));
sync_master->start_sync( c, peer_lib);
}
else if( msg.head_id != head_id && head_id != block_id_type( )) {
dlog("peer doesn't need to sync but does need to fetch blocks/trans");
if( msg.head_num >= lib_num ) {
notice_message msg;
msg.known_blocks = cc.get_block_ids_on_fork(cc.head_block_id());
dlog("known_blks size is ${s}",("s",msg.known_blocks.size()));
#warning("TODO: get a list of known, unblocked transactions");
// msg.known_trx = cc.get_pending_transaction_ids();
c->enqueue( msg);
Expand Down Expand Up @@ -967,9 +963,9 @@ namespace eos {
}

c->offset = (double(c->rec - c->org) + double(msg.xmt - c->dst)) / 2;
double NsecPerUsec{1000};
// double NsecPerUsec{1000};
//
dlog("Clock offset is ${o}ns (${us}us)", ("o", c->offset)("us", c->offset/NsecPerUsec));
// dlog("Clock offset is ${o}ns (${us}us)", ("o", c->offset)("us", c->offset/NsecPerUsec));
c->org = 0;
c->rec = 0;
}
Expand Down Expand Up @@ -1177,7 +1173,8 @@ namespace eos {

}

void net_plugin_impl::handle_message( connection_ptr c, const signed_block &msg) {
void net_plugin_impl::handle_message( connection_ptr c, const signed_block &msg) {
// dlog ("got block #${num}, from ${id}",("num",msg.block_num())("id",c->node_id));
chain_controller &cc = chain_plug->chain();
try {
if( cc.is_known_block(msg.id())) {
Expand Down Expand Up @@ -1207,7 +1204,8 @@ namespace eos {
}
}
bool accepted = false;
if( !syncing || num == cc.head_block_num()+1) {
// dlog ("last irrevesible block = ${lib}", ("lib", cc.last_irreversible_block_num()));
if( !syncing || num == cc.head_block_num()+1 || num > cc.last_irreversible_block_num()) {
try {
chain_plug->accept_block(msg, syncing);
accepted = true;
Expand Down Expand Up @@ -1235,8 +1233,10 @@ namespace eos {
}
}
else {
#warning( "TODO: only send if not requested, and if the size is less than the just send it size otherwise send a notice");
// dlog ("forwarding the signed block");
if (fc::raw::pack_size(msg) < just_send_it_max)
send_all( msg, [c](connection_ptr conn) -> bool {
// dlog( "sending to ${c}",("c", conn->peer_addr));
return( c != conn);
});
}
Expand Down Expand Up @@ -1550,6 +1550,7 @@ namespace eos {
( "remote-endpoint", bpo::value< vector<string> >()->composing(), "The IP address and port of a remote peer to sync with.")
( "public-endpoint", bpo::value<string>(), "Overrides the advertised listen endpointlisten ip address.")
( "agent-name", bpo::value<string>()->default_value("EOS Test Agent"), "The name supplied to identify this node amongst the peers.")
( "send-whole-blocks", bpo::value<bool>()->default_value(def_send_whole_blocks), "True to always send full blocks, false to send block summaries" )
;
}

Expand Down Expand Up @@ -1605,9 +1606,14 @@ namespace eos {
if( options.count("agent-name")) {
my->user_agent_name = options.at( "agent-name").as< string >( );
}
if( options.count( "send-whole-blocks")) {
my->send_whole_blocks = options.at( "send-whole-blocks" ).as<bool>();
}

my->chain_plug = app().find_plugin<chain_plugin>();
my->chain_plug->get_chain_id(my->chain_id);
fc::rand_pseudo_bytes(my->node_id.data(), my->node_id.data_size());
ilog ("my node_id is $id",("id",my->node_id));

my->keepalive_timer.reset(new boost::asio::steady_timer (app().get_io_service()));
my->ticker();
Expand Down Expand Up @@ -1653,6 +1659,7 @@ namespace eos {
} FC_CAPTURE_AND_RETHROW() }

void net_plugin::broadcast_block( const chain::signed_block &sb) {
// dlog( "broadcasting block #${num}",("num",sb.block_num()) );
my->broadcast_block_impl( sb);
}
}
18 changes: 10 additions & 8 deletions programs/launcher/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@ struct localIdentity {
cerr << "unable to retrieve host name: " << ec.message() << endl;
}
else {
cerr << "adding hostname " << hn << endl;
names.push_back (hn);
if (hn.find ('.') != string::npos) {
names.push_back (hn.substr (0,hn.find('.')));
cerr << "adding hostname " << hn.substr (0,hn.find('.')) << endl;
}
}

Expand All @@ -64,7 +62,6 @@ struct localIdentity {
if (in_addr != 0) {
fc::ip::address ifa(in_addr);
addrs.push_back (ifa);
cout << "found interface " << (string)ifa << endl;
}
}
}
Expand Down Expand Up @@ -146,7 +143,7 @@ struct eosd_def {

const string &dot_alias (const string &name) {
if (dot_alias_str.empty()) {
dot_alias_str = name + "\nprod=";
dot_alias_str = name + "\\nprod=";
if (producers.empty()) {
dot_alias_str += "<none>";
}
Expand Down Expand Up @@ -326,13 +323,13 @@ launcher_def::generate () {
void
launcher_def::write_dot_file () {
bf::ofstream df ("testnet.dot");
df << "digraph G\n{\n";
df << "digraph G\n{\nlayout=\"circo\";";
for (auto &node : network.nodes) {
for (const auto &p : node.second.peers) {
string pname=network.nodes.find(p)->second.dot_alias(p);
df << "\"" << node.second.dot_alias (node.first)
<< "\"->\"" << pname
<< "\" [dir=\"both\"]" << std::endl;
<< "\" [dir=\"forward\"];" << std::endl;
}
}
df << "}\n";
Expand Down Expand Up @@ -490,7 +487,10 @@ launcher_def::make_star () {
if (total_nodes > 12) {
links = (size_t)sqrt(total_nodes);
}
size_t gap = total_nodes > 6 ? 4 : total_nodes - links;
size_t gap = total_nodes > 6 ? 3 : (total_nodes - links)/2 +1;
while (total_nodes % gap == 0) {
++gap;
}
// use to prevent duplicates since all connections are bidirectional
std::map <string, std::set<string>> peers_to_from;
for (size_t i = 0; i < total_nodes; i++) {
Expand All @@ -514,6 +514,8 @@ launcher_def::make_star () {
if (ndx == total_nodes) {
ndx = 0;
}


peer = aliases[ndx];

found = true;
Expand Down Expand Up @@ -659,7 +661,7 @@ launcher_def::launch (eosd_def &node, string &gts) {
info.kill_cmd = "";

if(!c.running()) {
cout << "child not running after spawn " << eosdcmd << endl;
cerr << "child not running after spawn " << eosdcmd << endl;
for (int i = 0; i > 0; i++) {
if (c.running () ) break;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,56 +1,69 @@
#!/bin/bash

cd ../..
count=5
cd ../../..
pnodes=10
npnodes=0
topo=star
if [ -n "$1" ]; then
count=$1
pnodes=$1
if [ -n "$2" ]; then
topo=$2
if [ -n "$3" ]; then
npnodes=$3
fi
fi
fi

total_nodes=`expr $count + 2`
total_nodes=`expr $pnodes + $npnodes`

rm -rf tn_data_*
programs/launcher/launcher -p $count -n $total_nodes -s $topo
programs/launcher/launcher -p $pnodes -n $total_nodes -s $topo
sleep 7
echo "start" > test.out
port=8888
endport=`expr $port + $count`
endport=`expr $port + $total_nodes`
echo endport = $endport
while [ $port -ne $endport ]; do
programs/eosc/eosc --port $port get block 1 >> test.out 2>&1;
port=`expr $port + 1`
done

grep 'producer"' test.out | tee summary | sort -u -k2 | tee unique
prods=`wc -l < unique`
prodsfound=`wc -l < unique`
lines=`wc -l < summary`
if [ $lines -eq $count -a $prods -eq 1 ]; then
if [ $lines -eq $total_nodes -a $prodsfound -eq 1 ]; then
echo all synced
programs/launcher/launcher -k 15
cd -
exit
fi
echo lines = $lines and prods = $prods
echo $lines reports out of $total_nodes and prods = $prodsfound
sleep 18
programs/eosc/eosc --port 8888 get block 5
sleep 18
sleep 15
programs/eosc/eosc --port 8888 get block 10
sleep 16
sleep 15
programs/eosc/eosc --port 8888 get block 15
sleep 16
sleep 15
programs/eosc/eosc --port 8888 get block 20
sleep 16
sleep 15
echo "pass 2" > test.out
port=8888
while [ $port -ne $endport ]; do
programs/eosc/eosc --port $port get block 1 >> test.out 2>&1;
port=`expr $port + 1`
done

grep 'producer"' test.out

programs/launcher/launcher -k 9
grep 'producer"' test.out | tee summary | sort -u -k2 | tee unique
prodsfound=`wc -l < unique`
lines=`wc -l < summary`
if [ $lines -eq $total_nodes -a $prodsfound -eq 1 ]; then
echo all synced
programs/launcher/launcher -k 15
cd -
exit
fi
echo ERROR: $lines reports out of $total_nodes and prods = $prodsfound
cd -
exit 1

0 comments on commit e445707

Please sign in to comment.