Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
Merge branch 'change-plogcb-threadpool' of github.com:Shuo-Jia/rdsn i…
Browse files Browse the repository at this point in the history
…nto change-plogcb-threadpool
  • Loading branch information
foreverneverer committed Jul 23, 2020
2 parents 04758d2 + e2f4559 commit e7184c5
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 10 deletions.
19 changes: 11 additions & 8 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name,
break;
case bulk_load_status::BLS_FAILED:
handle_bulk_load_finish(bulk_load_status::BLS_FAILED);
// TODO(heyuchen): add perf-counter here
_stub->_counter_bulk_load_failed_count->increment();
break;
default:
break;
Expand Down Expand Up @@ -345,7 +345,8 @@ error_code replica_bulk_loader::start_download(const std::string &app_name,
ddebug_replica("node[{}] has {} replica executing downloading",
_stub->_primary_address_str,
_stub->_bulk_load_downloading_count.load());
// TODO(heyuchen): add perf-counter
_bulk_load_start_time_ms = dsn_now_ms();
_stub->_counter_bulk_load_downloading_count->increment();

// start download
ddebug_replica("start to download sst files");
Expand Down Expand Up @@ -418,12 +419,13 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name,
_download_status.store(ec);
derror_replica(
"failed to download file({}), error = {}", f_meta.name, ec.to_string());
// TODO(heyuchen): add perf-counter
_stub->_counter_bulk_load_download_file_fail_count->increment();
return;
}
// download file succeed, update progress
update_bulk_load_download_progress(f_size, f_meta.name);
// TODO(heyuchen): add perf-counter
_stub->_counter_bulk_load_download_file_succ_count->increment();
_stub->_counter_bulk_load_download_file_size->add(f_size);
});
_download_task[f_meta.name] = bulk_load_download_task;
}
Expand Down Expand Up @@ -507,7 +509,7 @@ void replica_bulk_loader::check_download_finish()
void replica_bulk_loader::start_ingestion()
{
_status = bulk_load_status::BLS_INGESTING;
// TODO(heyuchen): add perf-counter
_stub->_counter_bulk_load_ingestion_count->increment();
if (status() == partition_status::PS_PRIMARY) {
_replica->_primary_states.ingestion_is_empty_prepare_sent = false;
}
Expand Down Expand Up @@ -540,7 +542,7 @@ void replica_bulk_loader::handle_bulk_load_succeed()

_replica->_app->set_ingestion_status(ingestion_status::IS_INVALID);
_status = bulk_load_status::BLS_SUCCEED;
// TODO(heyuchen): add perf-counter
_stub->_counter_bulk_load_succeed_count->increment();
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down Expand Up @@ -613,7 +615,8 @@ void replica_bulk_loader::clear_bulk_load_states()
_replica->_is_bulk_load_ingestion = false;
_replica->_app->set_ingestion_status(ingestion_status::IS_INVALID);

// TODO(heyuchen): clear other states for perf-counter
_bulk_load_start_time_ms = 0;
_replica->_bulk_load_ingestion_start_time_ms = 0;

_status = bulk_load_status::BLS_INVALID;
}
Expand Down Expand Up @@ -771,7 +774,7 @@ void replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
if (is_group_ingestion_finish) {
ddebug_replica("finish ingestion, recover write");
_replica->_is_bulk_load_ingestion = false;
// TODO(heyuchen): reset perf-counter
_replica->_bulk_load_ingestion_start_time_ms = 0;
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/replica/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ class replica_bulk_loader : replica_base

inline void set_bulk_load_status(bulk_load_status::type status) { _status = status; }

inline uint64_t duration_ms() const
{
return _bulk_load_start_time_ms > 0 ? (dsn_now_ms() - _bulk_load_start_time_ms) : 0;
}

inline uint64_t ingestion_duration_ms() const
{
return _replica->_bulk_load_ingestion_start_time_ms > 0
? (dsn_now_ms() - _replica->_bulk_load_ingestion_start_time_ms)
: 0;
}

//
// helper functions
//
Expand All @@ -124,6 +136,7 @@ class replica_bulk_loader : replica_base
replica_stub *_stub;

friend class replica;
friend class replica_stub;
friend class replica_bulk_loader_test;

bulk_load_status::type _status{bulk_load_status::BLS_INVALID};
Expand All @@ -133,6 +146,9 @@ class replica_bulk_loader : replica_base
std::atomic<error_code> _download_status{ERR_OK};
// file_name -> downloading task
std::map<std::string, task_ptr> _download_task;

// Used for perf-counter
uint64_t _bulk_load_start_time_ms{0};
};

} // namespace replication
Expand Down
7 changes: 7 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// Bulk load
//
replica_bulk_loader *get_bulk_loader() const { return _bulk_loader.get(); }
inline uint64_t ingestion_duration_ms() const
{
return _bulk_load_ingestion_start_time_ms > 0
? (dsn_now_ms() - _bulk_load_ingestion_start_time_ms)
: 0;
}

//
// Statistics
Expand Down Expand Up @@ -543,6 +549,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
std::unique_ptr<replica_bulk_loader> _bulk_loader;
// if replica in bulk load ingestion 2pc, will reject other write requests
bool _is_bulk_load_ingestion{false};
uint64_t _bulk_load_ingestion_start_time_ms{0};

// perf counters
perf_counter_wrapper _counter_private_log_size;
Expand Down
4 changes: 2 additions & 2 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)

if (_is_bulk_load_ingestion) {
// reject write requests during ingestion
// TODO(heyuchen): add perf-counter here
_stub->_counter_bulk_load_ingestion_reject_write_count->increment();
response_client_write(request, ERR_OPERATION_DISABLED);
return;
}
Expand All @@ -91,7 +91,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}
_is_bulk_load_ingestion = true;
// TODO(heyuchen): set _bulk_load_ingestion_start_time_ms for perf-counter
_bulk_load_ingestion_start_time_ms = dsn_now_ms();
}

if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
Expand Down
65 changes: 65 additions & 0 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,57 @@ void replica_stub::install_perf_counters()
COUNTER_TYPE_VOLATILE_NUMBER,
"write size exceed threshold count in the recent period");

// <- Bulk Load Metrics ->

_counter_bulk_load_running_count.init_app_counter("eon.replica_stub",
"bulk.load.running.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"current bulk load running count");
_counter_bulk_load_downloading_count.init_app_counter("eon.replica_stub",
"bulk.load.downloading.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"current bulk load downloading count");
_counter_bulk_load_ingestion_count.init_app_counter("eon.replica_stub",
"bulk.load.ingestion.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"current bulk load ingestion count");
_counter_bulk_load_succeed_count.init_app_counter("eon.replica_stub",
"bulk.load.succeed.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"current bulk load succeed count");
_counter_bulk_load_failed_count.init_app_counter("eon.replica_stub",
"bulk.load.failed.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"current bulk load failed count");
_counter_bulk_load_ingestion_reject_write_count.init_app_counter(
"eon.replica_stub",
"bulk.load.ingestion.reject.write.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"bulk load ingestion reject write requests count");
_counter_bulk_load_download_file_succ_count.init_app_counter(
"eon.replica_stub",
"bulk.load.download.file.success.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"bulk load recent download file success count");
_counter_bulk_load_download_file_fail_count.init_app_counter(
"eon.replica_stub",
"bulk.load.download.file.fail.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"bulk load recent download file failed count");
_counter_bulk_load_download_file_size.init_app_counter("eon.replica_stub",
"bulk.load.download.file.size",
COUNTER_TYPE_VOLATILE_NUMBER,
"bulk load recent download file size");
_counter_bulk_load_max_ingestion_time_ms.init_app_counter(
"eon.replica_stub",
"bulk.load.max.ingestion.duration.time.ms",
COUNTER_TYPE_NUMBER,
"bulk load max ingestion duration time(ms)");
_counter_bulk_load_max_duration_time_ms.init_app_counter("eon.replica_stub",
"bulk.load.max.duration.time.ms",
COUNTER_TYPE_NUMBER,
"bulk load max duration time(ms)");

#ifdef DSN_ENABLE_GPERF
_counter_tcmalloc_release_memory_size.init_app_counter("eon.replica_stub",
"tcmalloc.release.memory.size",
Expand Down Expand Up @@ -1716,6 +1767,9 @@ void replica_stub::on_gc()
uint64_t cold_backup_running_count = 0;
uint64_t cold_backup_max_duration_time_ms = 0;
uint64_t cold_backup_max_upload_file_size = 0;
uint64_t bulk_load_running_count = 0;
uint64_t bulk_load_max_ingestion_time_ms = 0;
uint64_t bulk_load_max_duration_time_ms = 0;
for (auto &kv : rs) {
replica_ptr &rep = kv.second.rep;
if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) {
Expand All @@ -1733,6 +1787,14 @@ void replica_stub::on_gc()
cold_backup_max_duration_time_ms, rep->_cold_backup_max_duration_time_ms.load());
cold_backup_max_upload_file_size = std::max(
cold_backup_max_upload_file_size, rep->_cold_backup_max_upload_file_size.load());

if (rep->get_bulk_loader()->get_bulk_load_status() != bulk_load_status::BLS_INVALID) {
bulk_load_running_count++;
bulk_load_max_ingestion_time_ms =
std::max(bulk_load_max_ingestion_time_ms, rep->ingestion_duration_ms());
bulk_load_max_duration_time_ms =
std::max(bulk_load_max_duration_time_ms, rep->get_bulk_loader()->duration_ms());
}
}
}

Expand All @@ -1742,6 +1804,9 @@ void replica_stub::on_gc()
_counter_cold_backup_running_count->set(cold_backup_running_count);
_counter_cold_backup_max_duration_time_ms->set(cold_backup_max_duration_time_ms);
_counter_cold_backup_max_upload_file_size->set(cold_backup_max_upload_file_size);
_counter_bulk_load_running_count->set(bulk_load_running_count);
_counter_bulk_load_max_ingestion_time_ms->set(bulk_load_max_ingestion_time_ms);
_counter_bulk_load_max_duration_time_ms->set(bulk_load_max_duration_time_ms);

ddebug("finish to garbage collection, time_used_ns = %" PRIu64, dsn_now_ns() - start);
}
Expand Down
14 changes: 14 additions & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,20 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
#ifdef DSN_ENABLE_GPERF
perf_counter_wrapper _counter_tcmalloc_release_memory_size;
#endif

// <- Bulk load Metrics ->
perf_counter_wrapper _counter_bulk_load_running_count;
perf_counter_wrapper _counter_bulk_load_downloading_count;
perf_counter_wrapper _counter_bulk_load_ingestion_count;
perf_counter_wrapper _counter_bulk_load_succeed_count;
perf_counter_wrapper _counter_bulk_load_failed_count;
perf_counter_wrapper _counter_bulk_load_ingestion_reject_write_count;
perf_counter_wrapper _counter_bulk_load_download_file_succ_count;
perf_counter_wrapper _counter_bulk_load_download_file_fail_count;
perf_counter_wrapper _counter_bulk_load_download_file_size;
perf_counter_wrapper _counter_bulk_load_max_ingestion_time_ms;
perf_counter_wrapper _counter_bulk_load_max_duration_time_ms;

dsn::task_tracker _tracker;
};
} // namespace replication
Expand Down

0 comments on commit e7184c5

Please sign in to comment.