diff --git a/include/dsn/tool-api/task.h b/include/dsn/tool-api/task.h index ada5bb8787..2561fd69e5 100644 --- a/include/dsn/tool-api/task.h +++ b/include/dsn/tool-api/task.h @@ -439,6 +439,7 @@ class rpc_request_task : public task spec().name.c_str(), _request->header->from_address.to_string(), _request->header->client.timeout_ms); + spec().on_rpc_task_dropped.execute(this); } } diff --git a/include/dsn/tool-api/task_spec.h b/include/dsn/tool-api/task_spec.h index 3cd2c80142..cb210714d4 100644 --- a/include/dsn/tool-api/task_spec.h +++ b/include/dsn/tool-api/task_spec.h @@ -221,6 +221,7 @@ class task_spec : public extensible_object join_point on_rpc_call; // return true means continue, otherwise dropped and (optionally) timedout join_point on_rpc_request_enqueue; + join_point on_rpc_task_dropped; // rpc task dropped // RPC_RESPONSE join_point on_rpc_reply; diff --git a/src/runtime/profiler.cpp b/src/runtime/profiler.cpp index b7f2e303c5..f7f6650358 100644 --- a/src/runtime/profiler.cpp +++ b/src/runtime/profiler.cpp @@ -106,7 +106,12 @@ counter_info *counter_info_ptr[] = { "TIMEOUT(#/s)", "#/s"), new counter_info( - {"task.inqueue", "tiq"}, TASK_IN_QUEUE, COUNTER_TYPE_NUMBER, "InQueue(#)", "#")}; + {"task.inqueue", "tiq"}, TASK_IN_QUEUE, COUNTER_TYPE_NUMBER, "InQueue(#)", "#"), + new counter_info({"rpc.dropped", "rdit"}, + RPC_DROPPED_IF_TIMEOUT, + COUNTER_TYPE_NUMBER, + "RPC.DROPPED(#)", + "#")}; // call normal task static void profiler_on_task_create(task *caller, task *callee) @@ -269,6 +274,15 @@ static void profiler_on_rpc_request_enqueue(rpc_request_task *callee) } } +static void profile_on_rpc_task_dropped(rpc_request_task *callee) +{ + auto code = callee->spec().code; + auto ptr = s_spec_profilers[code].ptr[RPC_DROPPED_IF_TIMEOUT].get(); + if (ptr != nullptr) { + ptr->increment(); + } +} + static void profiler_on_rpc_create_response(message_ex *req, message_ex *resp) { message_ext_for_profiler::get(resp) = message_ext_for_profiler::get(req); @@ -543,6 +557,17 @@ void profiler::install(service_spec &) COUNTER_TYPE_NUMBER_PERCENTILES, ""); } + if (dsn_config_get_value_bool( + section_name.c_str(), + "rpc_request_dropped_before_execution_when_timeout", + false, + "whether to profile the number of rpc dropped for timeout")) + s_spec_profilers[i].ptr[RPC_DROPPED_IF_TIMEOUT].init_global_counter( + "zion", + "profiler", + (name + std::string(".rpc.dropped")).c_str(), + COUNTER_TYPE_NUMBER, + "rpc dropped if queue time exceed client timeout"); } else if (spec->type == dsn_task_type_t::TASK_TYPE_RPC_RESPONSE) { if (dsn_config_get_value_bool(section_name.c_str(), "profiler::latency.client", @@ -601,6 +626,7 @@ void profiler::install(service_spec &) spec->on_aio_enqueue.put_back(profiler_on_aio_enqueue, "profiler"); spec->on_rpc_call.put_back(profiler_on_rpc_call, "profiler"); spec->on_rpc_request_enqueue.put_back(profiler_on_rpc_request_enqueue, "profiler"); + spec->on_rpc_task_dropped.put_back(profile_on_rpc_task_dropped, "profiler"); spec->on_rpc_create_response.put_back(profiler_on_rpc_create_response, "profiler"); spec->on_rpc_reply.put_back(profiler_on_rpc_reply, "profiler"); spec->on_rpc_response_enqueue.put_back(profiler_on_rpc_response_enqueue, "profiler"); diff --git a/src/runtime/profiler_header.h b/src/runtime/profiler_header.h index 4fbc30bc1a..120909bce3 100644 --- a/src/runtime/profiler_header.h +++ b/src/runtime/profiler_header.h @@ -51,6 +51,7 @@ enum perf_counter_ptr_type RPC_CLIENT_NON_TIMEOUT_LATENCY_NS, RPC_CLIENT_TIMEOUT_THROUGHPUT, TASK_IN_QUEUE, + RPC_DROPPED_IF_TIMEOUT, PERF_COUNTER_COUNT, PERF_COUNTER_INVALID diff --git a/src/runtime/task/task_spec.cpp b/src/runtime/task/task_spec.cpp index 33788c535a..6631e3baad 100644 --- a/src/runtime/task/task_spec.cpp +++ b/src/runtime/task/task_spec.cpp @@ -162,6 +162,7 @@ task_spec::task_spec(int code, on_rpc_call((std::string(name) + std::string(".rpc.call")).c_str()), on_rpc_request_enqueue((std::string(name) + std::string(".rpc.request.enqueue")).c_str()), + on_rpc_task_dropped((std::string(name) + std::string(".dropped")).c_str()), on_rpc_reply((std::string(name) + std::string(".rpc.reply")).c_str()), on_rpc_response_enqueue((std::string(name) + std::string(".rpc.response.enqueue")).c_str()), on_rpc_create_response((std::string(name) + std::string("rpc.create.response")).c_str())