Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Fix for resync startup #172
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Nov 2, 2017
1 parent 5a7553b commit 46387eb
Showing 1 changed file with 55 additions and 43 deletions.
98 changes: 55 additions & 43 deletions plugins/db_plugin/db_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,43 +110,59 @@ const std::string db_plugin_impl::accounts_col = "Accounts";


void db_plugin_impl::applied_irreversible_block(const signed_block& block) {
if (startup) {
// on startup we don't want to queue, instead push back on caller
process_irreversible_block(block);
} else {
boost::mutex::scoped_lock lock(mtx);
queue.push(block);
lock.unlock();
condtion.notify_one();
try {
if (startup) {
// on startup we don't want to queue, instead push back on caller
process_irreversible_block(block);
} else {
boost::mutex::scoped_lock lock(mtx);
queue.push(block);
lock.unlock();
condtion.notify_one();
}
} catch (fc::exception& e) {
elog("FC Exception while applied_irreversible_block ${e}", ("e", e.to_string()));
} catch (std::exception& e) {
elog("STD Exception while applied_irreversible_block ${e}", ("e", e.what()));
} catch (...) {
elog("Unknown exception while applied_irreversible_block");
}
}

void db_plugin_impl::consum_blocks() {
signed_block block;
size_t size = 0;
while (true) {
boost::mutex::scoped_lock lock(mtx);
while (queue.empty() && !done) {
condtion.wait(lock);
}
size = queue.size();
if (size > 0) {
block = queue.front();
queue.pop();
lock.unlock();
// warn if queue size greater than 75%
if (size > (queue_size * 0.75)) {
wlog("queue size: ${q}", ("q", size + 1));
try {
signed_block block;
size_t size = 0;
while (true) {
boost::mutex::scoped_lock lock(mtx);
while (queue.empty() && !done) {
condtion.wait(lock);
}
size = queue.size();
if (size > 0) {
block = queue.front();
queue.pop();
lock.unlock();
// warn if queue size greater than 75%
if (size > (queue_size * 0.75)) {
wlog("queue size: ${q}", ("q", size + 1));
} else if (done) {
ilog("draining queue, size: ${q}", ("q", size + 1));
}
process_irreversible_block(block);
continue;
} else if (done) {
ilog("draining queue, size: ${q}", ("q", size + 1));
break;
}
process_irreversible_block(block);
continue;
} else if (done) {
break;
}
ilog("db_plugin consum thread shutdown gracefully");
} catch (fc::exception& e) {
elog("FC Exception while consuming block ${e}", ("e", e.to_string()));
} catch (std::exception& e) {
elog("STD Exception while consuming block ${e}", ("e", e.what()));
} catch (...) {
elog("Unknown exception while consuming block");
}
ilog("db_plugin consum thread shutdown gracefully");
}

namespace {
Expand Down Expand Up @@ -575,7 +591,7 @@ void db_plugin_impl::init() {
////////////

db_plugin::db_plugin()
:my(nullptr)
:my(new db_plugin_impl)
{
}

Expand All @@ -600,21 +616,17 @@ void db_plugin::set_program_options(options_description& cli, options_descriptio

void db_plugin::wipe_database() {
#ifdef MONGODB
if (my) {
if (!my->startup) {
elog("ERROR: db_plugin::wipe_database() called before configuration or after startup. Ignoring.");
} else {
my->wipe_database_on_startup = true;
}
if (!my->startup) {
elog("ERROR: db_plugin::wipe_database() called before configuration or after startup. Ignoring.");
} else {
my->wipe_database_on_startup = true;
}
#endif
}

void db_plugin::applied_irreversible_block(const signed_block& block) {
#ifdef MONGODB
if (my) {
my->applied_irreversible_block(block);
}
my->applied_irreversible_block(block);
#endif
}

Expand All @@ -623,8 +635,8 @@ void db_plugin::plugin_initialize(const variables_map& options)
{
#ifdef MONGODB
if (options.count("mongodb-uri")) {
my.reset(new db_plugin_impl);
ilog("initializing db plugin");
my->configured = true;
if (options.count("filter-on-accounts")) {
auto foa = options.at("filter-on-accounts").as<std::vector<std::string>>();
for (auto filter_account : foa)
Expand All @@ -647,16 +659,16 @@ void db_plugin::plugin_initialize(const variables_map& options)
}
my->init();
} else {
ilog("eos::db_plugin configured, but no --mongodb-uri specified.");
ilog("db_plugin disabled.");
wlog("eos::db_plugin configured, but no --mongodb-uri specified.");
wlog("db_plugin disabled.");
}
#endif
}

void db_plugin::plugin_startup()
{
#ifdef MONGODB
if (my) {
if (my->configured) {
ilog("starting db plugin");

my->consum_thread = boost::thread([this] { my->consum_blocks(); });
Expand Down

0 comments on commit 46387eb

Please sign in to comment.