From 9d1059eb6597efbfb00b405c867d9b0edd82e0ec Mon Sep 17 00:00:00 2001 From: HeYuchen <377710264@qq.com> Date: Thu, 23 Jul 2020 10:31:36 +0800 Subject: [PATCH] feat(bulk-load): add perf-counter (#567) --- src/replica/bulk_load/replica_bulk_loader.cpp | 19 +++--- src/replica/bulk_load/replica_bulk_loader.h | 16 +++++ src/replica/replica.h | 7 ++ src/replica/replica_2pc.cpp | 4 +- src/replica/replica_stub.cpp | 65 +++++++++++++++++++ src/replica/replica_stub.h | 14 ++++ 6 files changed, 115 insertions(+), 10 deletions(-) diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 0c1d765dd9..0c1737ad93 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -263,7 +263,7 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name, break; case bulk_load_status::BLS_FAILED: handle_bulk_load_finish(bulk_load_status::BLS_FAILED); - // TODO(heyuchen): add perf-counter here + _stub->_counter_bulk_load_failed_count->increment(); break; default: break; @@ -345,7 +345,8 @@ error_code replica_bulk_loader::start_download(const std::string &app_name, ddebug_replica("node[{}] has {} replica executing downloading", _stub->_primary_address_str, _stub->_bulk_load_downloading_count.load()); - // TODO(heyuchen): add perf-counter + _bulk_load_start_time_ms = dsn_now_ms(); + _stub->_counter_bulk_load_downloading_count->increment(); // start download ddebug_replica("start to download sst files"); @@ -418,12 +419,13 @@ error_code replica_bulk_loader::download_sst_files(const std::string &app_name, _download_status.store(ec); derror_replica( "failed to download file({}), error = {}", f_meta.name, ec.to_string()); - // TODO(heyuchen): add perf-counter + _stub->_counter_bulk_load_download_file_fail_count->increment(); return; } // download file succeed, update progress update_bulk_load_download_progress(f_size, f_meta.name); - // TODO(heyuchen): add perf-counter + _stub->_counter_bulk_load_download_file_succ_count->increment(); + _stub->_counter_bulk_load_download_file_size->add(f_size); }); _download_task[f_meta.name] = bulk_load_download_task; } @@ -507,7 +509,7 @@ void replica_bulk_loader::check_download_finish() void replica_bulk_loader::start_ingestion() { _status = bulk_load_status::BLS_INGESTING; - // TODO(heyuchen): add perf-counter + _stub->_counter_bulk_load_ingestion_count->increment(); if (status() == partition_status::PS_PRIMARY) { _replica->_primary_states.ingestion_is_empty_prepare_sent = false; } @@ -540,7 +542,7 @@ void replica_bulk_loader::handle_bulk_load_succeed() _replica->_app->set_ingestion_status(ingestion_status::IS_INVALID); _status = bulk_load_status::BLS_SUCCEED; - // TODO(heyuchen): add perf-counter + _stub->_counter_bulk_load_succeed_count->increment(); } // ThreadPool: THREAD_POOL_REPLICATION @@ -613,7 +615,8 @@ void replica_bulk_loader::clear_bulk_load_states() _replica->_is_bulk_load_ingestion = false; _replica->_app->set_ingestion_status(ingestion_status::IS_INVALID); - // TODO(heyuchen): clear other states for perf-counter + _bulk_load_start_time_ms = 0; + _replica->_bulk_load_ingestion_start_time_ms = 0; _status = bulk_load_status::BLS_INVALID; } @@ -771,7 +774,7 @@ void replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon if (is_group_ingestion_finish) { ddebug_replica("finish ingestion, recover write"); _replica->_is_bulk_load_ingestion = false; - // TODO(heyuchen): reset perf-counter + _replica->_bulk_load_ingestion_start_time_ms = 0; } } diff --git a/src/replica/bulk_load/replica_bulk_loader.h b/src/replica/bulk_load/replica_bulk_loader.h index 750425ea78..f6f61b512d 100644 --- a/src/replica/bulk_load/replica_bulk_loader.h +++ b/src/replica/bulk_load/replica_bulk_loader.h @@ -112,6 +112,18 @@ class replica_bulk_loader : replica_base inline void set_bulk_load_status(bulk_load_status::type status) { _status = status; } + inline uint64_t duration_ms() const + { + return _bulk_load_start_time_ms > 0 ? (dsn_now_ms() - _bulk_load_start_time_ms) : 0; + } + + inline uint64_t ingestion_duration_ms() const + { + return _replica->_bulk_load_ingestion_start_time_ms > 0 + ? (dsn_now_ms() - _replica->_bulk_load_ingestion_start_time_ms) + : 0; + } + // // helper functions // @@ -124,6 +136,7 @@ class replica_bulk_loader : replica_base replica_stub *_stub; friend class replica; + friend class replica_stub; friend class replica_bulk_loader_test; bulk_load_status::type _status{bulk_load_status::BLS_INVALID}; @@ -133,6 +146,9 @@ class replica_bulk_loader : replica_base std::atomic _download_status{ERR_OK}; // file_name -> downloading task std::map _download_task; + + // Used for perf-counter + uint64_t _bulk_load_start_time_ms{0}; }; } // namespace replication diff --git a/src/replica/replica.h b/src/replica/replica.h index 98c2990ca8..a260532543 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -189,6 +189,12 @@ class replica : public serverlet, public ref_counter, public replica_ba // Bulk load // replica_bulk_loader *get_bulk_loader() const { return _bulk_loader.get(); } + inline uint64_t ingestion_duration_ms() const + { + return _bulk_load_ingestion_start_time_ms > 0 + ? (dsn_now_ms() - _bulk_load_ingestion_start_time_ms) + : 0; + } // // Statistics @@ -543,6 +549,7 @@ class replica : public serverlet, public ref_counter, public replica_ba std::unique_ptr _bulk_loader; // if replica in bulk load ingestion 2pc, will reject other write requests bool _is_bulk_load_ingestion{false}; + uint64_t _bulk_load_ingestion_start_time_ms{0}; // perf counters perf_counter_wrapper _counter_private_log_size; diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 199d96a5d4..b4cba80ffb 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -76,7 +76,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) if (_is_bulk_load_ingestion) { // reject write requests during ingestion - // TODO(heyuchen): add perf-counter here + _stub->_counter_bulk_load_ingestion_reject_write_count->increment(); response_client_write(request, ERR_OPERATION_DISABLED); return; } @@ -91,7 +91,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) return; } _is_bulk_load_ingestion = true; - // TODO(heyuchen): set _bulk_load_ingestion_start_time_ms for perf-counter + _bulk_load_ingestion_start_time_ms = dsn_now_ms(); } if (static_cast(_primary_states.membership.secondaries.size()) + 1 < diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 0d6db9c8af..c670ec694d 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -335,6 +335,57 @@ void replica_stub::install_perf_counters() COUNTER_TYPE_VOLATILE_NUMBER, "write size exceed threshold count in the recent period"); + // <- Bulk Load Metrics -> + + _counter_bulk_load_running_count.init_app_counter("eon.replica_stub", + "bulk.load.running.count", + COUNTER_TYPE_VOLATILE_NUMBER, + "current bulk load running count"); + _counter_bulk_load_downloading_count.init_app_counter("eon.replica_stub", + "bulk.load.downloading.count", + COUNTER_TYPE_VOLATILE_NUMBER, + "current bulk load downloading count"); + _counter_bulk_load_ingestion_count.init_app_counter("eon.replica_stub", + "bulk.load.ingestion.count", + COUNTER_TYPE_VOLATILE_NUMBER, + "current bulk load ingestion count"); + _counter_bulk_load_succeed_count.init_app_counter("eon.replica_stub", + "bulk.load.succeed.count", + COUNTER_TYPE_VOLATILE_NUMBER, + "current bulk load succeed count"); + _counter_bulk_load_failed_count.init_app_counter("eon.replica_stub", + "bulk.load.failed.count", + COUNTER_TYPE_VOLATILE_NUMBER, + "current bulk load failed count"); + _counter_bulk_load_ingestion_reject_write_count.init_app_counter( + "eon.replica_stub", + "bulk.load.ingestion.reject.write.count", + COUNTER_TYPE_VOLATILE_NUMBER, + "bulk load ingestion reject write requests count"); + _counter_bulk_load_download_file_succ_count.init_app_counter( + "eon.replica_stub", + "bulk.load.download.file.success.count", + COUNTER_TYPE_VOLATILE_NUMBER, + "bulk load recent download file success count"); + _counter_bulk_load_download_file_fail_count.init_app_counter( + "eon.replica_stub", + "bulk.load.download.file.fail.count", + COUNTER_TYPE_VOLATILE_NUMBER, + "bulk load recent download file failed count"); + _counter_bulk_load_download_file_size.init_app_counter("eon.replica_stub", + "bulk.load.download.file.size", + COUNTER_TYPE_VOLATILE_NUMBER, + "bulk load recent download file size"); + _counter_bulk_load_max_ingestion_time_ms.init_app_counter( + "eon.replica_stub", + "bulk.load.max.ingestion.duration.time.ms", + COUNTER_TYPE_NUMBER, + "bulk load max ingestion duration time(ms)"); + _counter_bulk_load_max_duration_time_ms.init_app_counter("eon.replica_stub", + "bulk.load.max.duration.time.ms", + COUNTER_TYPE_NUMBER, + "bulk load max duration time(ms)"); + #ifdef DSN_ENABLE_GPERF _counter_tcmalloc_release_memory_size.init_app_counter("eon.replica_stub", "tcmalloc.release.memory.size", @@ -1716,6 +1767,9 @@ void replica_stub::on_gc() uint64_t cold_backup_running_count = 0; uint64_t cold_backup_max_duration_time_ms = 0; uint64_t cold_backup_max_upload_file_size = 0; + uint64_t bulk_load_running_count = 0; + uint64_t bulk_load_max_ingestion_time_ms = 0; + uint64_t bulk_load_max_duration_time_ms = 0; for (auto &kv : rs) { replica_ptr &rep = kv.second.rep; if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) { @@ -1733,6 +1787,14 @@ void replica_stub::on_gc() cold_backup_max_duration_time_ms, rep->_cold_backup_max_duration_time_ms.load()); cold_backup_max_upload_file_size = std::max( cold_backup_max_upload_file_size, rep->_cold_backup_max_upload_file_size.load()); + + if (rep->get_bulk_loader()->get_bulk_load_status() != bulk_load_status::BLS_INVALID) { + bulk_load_running_count++; + bulk_load_max_ingestion_time_ms = + std::max(bulk_load_max_ingestion_time_ms, rep->ingestion_duration_ms()); + bulk_load_max_duration_time_ms = + std::max(bulk_load_max_duration_time_ms, rep->get_bulk_loader()->duration_ms()); + } } } @@ -1742,6 +1804,9 @@ void replica_stub::on_gc() _counter_cold_backup_running_count->set(cold_backup_running_count); _counter_cold_backup_max_duration_time_ms->set(cold_backup_max_duration_time_ms); _counter_cold_backup_max_upload_file_size->set(cold_backup_max_upload_file_size); + _counter_bulk_load_running_count->set(bulk_load_running_count); + _counter_bulk_load_max_ingestion_time_ms->set(bulk_load_max_ingestion_time_ms); + _counter_bulk_load_max_duration_time_ms->set(bulk_load_max_duration_time_ms); ddebug("finish to garbage collection, time_used_ns = %" PRIu64, dsn_now_ns() - start); } diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index cc80577ffc..74c5c93dae 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -435,6 +435,20 @@ class replica_stub : public serverlet, public ref_counter #ifdef DSN_ENABLE_GPERF perf_counter_wrapper _counter_tcmalloc_release_memory_size; #endif + + // <- Bulk load Metrics -> + perf_counter_wrapper _counter_bulk_load_running_count; + perf_counter_wrapper _counter_bulk_load_downloading_count; + perf_counter_wrapper _counter_bulk_load_ingestion_count; + perf_counter_wrapper _counter_bulk_load_succeed_count; + perf_counter_wrapper _counter_bulk_load_failed_count; + perf_counter_wrapper _counter_bulk_load_ingestion_reject_write_count; + perf_counter_wrapper _counter_bulk_load_download_file_succ_count; + perf_counter_wrapper _counter_bulk_load_download_file_fail_count; + perf_counter_wrapper _counter_bulk_load_download_file_size; + perf_counter_wrapper _counter_bulk_load_max_ingestion_time_ms; + perf_counter_wrapper _counter_bulk_load_max_duration_time_ms; + dsn::task_tracker _tracker; }; } // namespace replication