diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYbServersMetrics.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYbServersMetrics.java new file mode 100644 index 000000000000..43919444c5ce --- /dev/null +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYbServersMetrics.java @@ -0,0 +1,131 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package org.yb.pgsql; + +import com.google.common.net.HostAndPort; +import org.json.JSONObject; +import java.util.ArrayList; +import java.util.Arrays; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yb.minicluster.MiniYBClusterBuilder; +import org.yb.YBTestRunner; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import static org.yb.AssertionWrappers.*; + +@RunWith(value = YBTestRunner.class) +public class TestYbServersMetrics extends BasePgSQLTest { + private static final Logger LOG = LoggerFactory.getLogger(TestYbServersMetrics.class); + private static final int NUM_TSERVERS = 3; + private static final int RF = 3; + private static ArrayList expectedKeys = new ArrayList(Arrays.asList( + "memory_free", "memory_available", "memory_total", + "tserver_root_memory_limit", "tserver_root_memory_soft_limit", + "tserver_root_memory_consumption", + "cpu_usage_user", "cpu_usage_system")); + + @Override + public ConnectionBuilder getConnectionBuilder() { + ConnectionBuilder cb = new ConnectionBuilder(miniCluster); + cb.setLoadBalance(true); + return cb; + } + + @Override + protected void customizeMiniClusterBuilder(MiniYBClusterBuilder builder){ + super.customizeMiniClusterBuilder(builder); + builder.numTservers(NUM_TSERVERS); + builder.replicationFactor(RF); + builder.tserverHeartbeatTimeoutMs(7000); + } + + private void assertYbServersMetricsOutput(int expectedRows, int expectedStatusOkRows, + int tserverNo) throws Exception{ + ConnectionBuilder b = getConnectionBuilder(); + if (tserverNo >= 0){ + b = b.withTServer(tserverNo); + } + Connection conn = b.connect(); + try { + Statement st = conn.createStatement(); + final long startTimeMillis = System.currentTimeMillis(); + ResultSet rs = st.executeQuery("select * from yb_servers_metrics()"); + final long result = System.currentTimeMillis() - startTimeMillis; + // There is a timeout of 5000ms for each RPC call to tserver. + assertLessThan(result, Long.valueOf(6000)); + int row_count = 0; + int ok_count = 0; + List errors = new ArrayList(); + while (rs.next()) { + String uuid = rs.getString(1); + String metrics = rs.getString(2); + String status = rs.getString(3); + String error = rs.getString(4); + if (status.equals("OK")) { + ++ok_count; + JSONObject metricsJson = new JSONObject(metrics); + ArrayList metricKeys = new ArrayList(metricsJson.keySet()); + assertTrue("Expected keys are not present. Present keys are:" + + metricKeys, + metricKeys.containsAll(expectedKeys)); + } else { + assertEquals("{}", metrics); + errors.add(error); + } + ++row_count; + } + assertEquals("Unexpected tservers count", expectedRows, row_count); + assertEquals("Unexpected OK tserver count. Errors: "+ errors, expectedStatusOkRows, ok_count); + } catch (SQLException e) { + throw new RuntimeException("Failed to execute yb_servers_metrics query", e); + } finally { + conn.close(); + } + } + + @Test + public void testYBServersMetricsFunction() throws Exception { + assertYbServersMetricsOutput(NUM_TSERVERS, NUM_TSERVERS, -1); + + // add a new tserver + miniCluster.startTServer(getTServerFlags()); + assertTrue(miniCluster.waitForTabletServers(4)); + waitForTServerHeartbeat(); + assertYbServersMetricsOutput(NUM_TSERVERS + 1, NUM_TSERVERS + 1, -1); + + // kill a tserver + // closing root connection to avoid potential errors during clean up. + connection.close(); + List tserverList = new ArrayList<>(miniCluster.getTabletServers().keySet()); + HostAndPort tserver = tserverList.get(tserverList.size() - 1); + miniCluster.killTabletServerOnHostPort(tserver); + // Initially we will get NUM_TSERVERS + 1 rows, with one of them having status as "ERROR" + //killed last tserver, so connect to first + assertYbServersMetricsOutput(NUM_TSERVERS + 1, NUM_TSERVERS, 0); + + // After the tserver is removed and updated in cache, + // we will get NUM_TSERVERS rows, with all of them having status as "OK" + Thread.sleep(2 * miniCluster.getClusterParameters().getTServerHeartbeatTimeoutMs()); + assertYbServersMetricsOutput(NUM_TSERVERS, NUM_TSERVERS, 0); + } + +} diff --git a/src/postgres/src/backend/catalog/yb_system_views.sql b/src/postgres/src/backend/catalog/yb_system_views.sql index 198a61c77ab7..55b4275f697b 100644 --- a/src/postgres/src/backend/catalog/yb_system_views.sql +++ b/src/postgres/src/backend/catalog/yb_system_views.sql @@ -41,6 +41,10 @@ CREATE VIEW yb_query_diagnostics_status AS SELECT * FROM yb_get_query_diagnostics_status(); +CREATE VIEW yb_servers_metrics AS + SELECT * + FROM yb_servers_metrics(); + CREATE VIEW pg_roles AS SELECT rolname, diff --git a/src/postgres/src/backend/utils/misc/pg_yb_utils.c b/src/postgres/src/backend/utils/misc/pg_yb_utils.c index 168158e5d905..ffbaf0f3a46b 100644 --- a/src/postgres/src/backend/utils/misc/pg_yb_utils.c +++ b/src/postgres/src/backend/utils/misc/pg_yb_utils.c @@ -100,6 +100,7 @@ #include "utils/spccache.h" #include "utils/syscache.h" #include "utils/uuid.h" +#include "utils/jsonb.h" #include "fmgr.h" #include "funcapi.h" #include "mb/pg_wchar.h" @@ -3548,6 +3549,105 @@ yb_local_tablets(PG_FUNCTION_ARGS) return (Datum) 0; } +static Datum GetMetricsAsJsonbDatum(YBCMetricsInfo* metrics, size_t metricsCount){ + JsonbParseState *state = NULL; + JsonbValue result; + JsonbValue key; + JsonbValue value; + pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL); + for (int j = 0; j < metricsCount; j++) { + key.type = jbvString; + key.val.string.val = (char *)metrics[j].name; + key.val.string.len = strlen(metrics[j].name); + pushJsonbValue(&state, WJB_KEY, &key); + + value.type = jbvString; + value.val.string.val = (char *)metrics[j].value; + value.val.string.len = strlen(metrics[j].value); + pushJsonbValue(&state, WJB_VALUE, &value); + } + result = *pushJsonbValue(&state, WJB_END_OBJECT, NULL); + Jsonb *jsonb = JsonbValueToJsonb(&result); + return JsonbPGetDatum(jsonb); +} + +Datum +yb_servers_metrics(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + int i; +#define YB_SERVERS_METRICS_COLS 4 + + /* only superuser and yb_db_admin can query this function */ + if (!superuser() && !IsYbDbAdminUser(GetUserId())) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("only superusers and yb_db_admin can query yb_servers_metrics")))); + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + + /* + * Switch context to construct returned data structures and store + * returned values from tserver. + */ + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + /* Build a tuple descriptor */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + ereport(ERROR, + (errmsg_internal("return type must be a row type"))); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + YBCPgServerMetricsInfo *servers_metrics_info = NULL; + size_t num_servers = 0; + HandleYBStatus(YBCServersMetrics(&servers_metrics_info, &num_servers)); + + for (i = 0; i < num_servers; ++i) + { + YBCPgServerMetricsInfo *metricsInfo = (YBCPgServerMetricsInfo *)servers_metrics_info + i; + Datum values[YB_SERVERS_METRICS_COLS]; + bool nulls[YB_SERVERS_METRICS_COLS]; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = CStringGetTextDatum(metricsInfo->uuid); + values[1] = GetMetricsAsJsonbDatum(metricsInfo->metrics, metricsInfo->metrics_count); + values[2] = CStringGetTextDatum(metricsInfo->status); + values[3] = CStringGetTextDatum(metricsInfo->error); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + +#undef YB_SERVERS_METRICS_COLS + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + MemoryContextSwitchTo(oldcontext); + + return (Datum) 0; +} + /*---------------------------------------------------------------------------*/ /* Deterministic DETAIL order */ /*---------------------------------------------------------------------------*/ diff --git a/src/postgres/src/include/catalog/catalog.h b/src/postgres/src/include/catalog/catalog.h index 0a91f1361174..65f16873477c 100644 --- a/src/postgres/src/include/catalog/catalog.h +++ b/src/postgres/src/include/catalog/catalog.h @@ -29,7 +29,7 @@ * If you increment it, make sure you didn't forget to add a new SQL migration * (see pg_yb_migration.dat and src/yb/yql/pgwrapper/ysql_migrations/README.md) */ -#define YB_LAST_USED_OID 8071 +#define YB_LAST_USED_OID 8072 extern bool IsSystemRelation(Relation relation); extern bool IsToastRelation(Relation relation); diff --git a/src/postgres/src/include/catalog/pg_proc.dat b/src/postgres/src/include/catalog/pg_proc.dat index cfe96933f646..a8e70aa5270b 100644 --- a/src/postgres/src/include/catalog/pg_proc.dat +++ b/src/postgres/src/include/catalog/pg_proc.dat @@ -10536,4 +10536,13 @@ proparallel => 'r', prorettype => 'void', proargtypes => 'bool', prosrc => 'binary_upgrade_set_next_tablegroup_default' }, +{ oid => '8072', + descr => 'Get metrics of all nodes', + proname => 'yb_servers_metrics', prorows => '10', + proretset => 't', provolatile => 'v', proparallel => 'r', + prorettype => 'record', proargtypes => '', + proallargtypes => '{text,jsonb,text,text}', + proargnames => '{uuid,metrics,status,error}', + proargmodes => '{o,o,o,o}', + prosrc => 'yb_servers_metrics'}, ] diff --git a/src/postgres/src/include/catalog/pg_yb_migration.dat b/src/postgres/src/include/catalog/pg_yb_migration.dat index a16cbbd096bf..08b6d2e9f6cc 100644 --- a/src/postgres/src/include/catalog/pg_yb_migration.dat +++ b/src/postgres/src/include/catalog/pg_yb_migration.dat @@ -12,7 +12,7 @@ [ # For better version control conflict detection, list latest migration filename -# here: V57__23312__binary_upgrade_set_next_tablegroup_default.sql -{ major => '57', minor => '0', name => '', time_applied => '_null_' } +# here: V58__23542__yb_servers_metrics.sql +{ major => '58', minor => '0', name => '', time_applied => '_null_' } ] diff --git a/src/postgres/src/test/regress/expected/yb_pg_rules.out b/src/postgres/src/test/regress/expected/yb_pg_rules.out index 98587c08a5d5..3b23bac737fe 100644 --- a/src/postgres/src/test/regress/expected/yb_pg_rules.out +++ b/src/postgres/src/test/regress/expected/yb_pg_rules.out @@ -2451,6 +2451,11 @@ yb_query_diagnostics_status| SELECT yb_get_query_diagnostics_status.status, yb_get_query_diagnostics_status.explain_params, yb_get_query_diagnostics_status.path FROM yb_get_query_diagnostics_status() yb_get_query_diagnostics_status(status, description, query_id, start_time, diagnostics_interval_sec, bind_var_query_min_duration_ms, explain_params, path); +yb_servers_metrics| SELECT yb_servers_metrics.uuid, + yb_servers_metrics.metrics, + yb_servers_metrics.status, + yb_servers_metrics.error + FROM yb_servers_metrics() yb_servers_metrics(uuid, metrics, status, error); yb_terminated_queries| SELECT d.datname AS databasename, s.backend_pid, s.query_text, diff --git a/src/yb/common/common.proto b/src/yb/common/common.proto index d7701e51e8db..55bda2b54e7a 100644 --- a/src/yb/common/common.proto +++ b/src/yb/common/common.proto @@ -711,3 +711,8 @@ message CDCSDKStreamCreateOptionsPB { optional CDCSDKDynamicTablesOption cdcsdk_dynamic_tables_option = 1 [default = DYNAMIC_TABLES_ENABLED]; } + +message TserverMetricsInfoPB { + required string name = 1; + required string value = 2; +} diff --git a/src/yb/master/master_tserver.cc b/src/yb/master/master_tserver.cc index edae2cf8f8f0..4aa12f147ecd 100644 --- a/src/yb/master/master_tserver.cc +++ b/src/yb/master/master_tserver.cc @@ -219,5 +219,10 @@ Result> MasterTabletServer::GetLocalTabletsM return STATUS_FORMAT(InternalError, "Unexpected call of GetLocalTabletsMetadata()"); } +Result> MasterTabletServer::GetMetrics() const { + LOG(DFATAL) << "Unexpected call of GetMetrics()"; + return STATUS_FORMAT(InternalError, "Unexpected call of GetMetrics()"); +} + } // namespace master } // namespace yb diff --git a/src/yb/master/master_tserver.h b/src/yb/master/master_tserver.h index 19845b59c472..91b68ab2ad1c 100644 --- a/src/yb/master/master_tserver.h +++ b/src/yb/master/master_tserver.h @@ -107,6 +107,8 @@ class MasterTabletServer : public tserver::TabletServerIf, virtual Result> GetLocalTabletsMetadata() const override; + virtual Result> GetMetrics() const override; + private: Master* master_ = nullptr; scoped_refptr metric_entity_; diff --git a/src/yb/tserver/metrics_snapshotter.cc b/src/yb/tserver/metrics_snapshotter.cc index c4210dca4947..906313aaa328 100644 --- a/src/yb/tserver/metrics_snapshotter.cc +++ b/src/yb/tserver/metrics_snapshotter.cc @@ -160,9 +160,6 @@ class MetricsSnapshotter::Thread { return log_prefix_; } - // Retrieves current cpu usage information. - Result> GetCpuUsage(); - // The server for which we are collecting metrics. TabletServer* const server_; @@ -217,6 +214,44 @@ Status MetricsSnapshotter::Stop() { return thread_->Stop(); } +Result> MetricsSnapshotter::GetCpuUsageInInterval(int ms) { + std::vector cpu_usage; + auto cur_ticks1 = VERIFY_RESULT(GetCpuUsage()); + bool get_cpu_success = std::all_of( + cur_ticks1.begin(), cur_ticks1.end(), [](uint64_t v) { return v > 0; }); + if (!get_cpu_success) { + return STATUS_FORMAT(RuntimeError, "Failed to retrieve CPU ticks. Got " + "[total_ticks, user-ticks, system_ticks]=$0.", cur_ticks1); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); + auto cur_ticks2 = VERIFY_RESULT(GetCpuUsage()); + get_cpu_success = std::all_of( + cur_ticks2.begin(), cur_ticks2.end(), [](uint64_t v) { return v > 0; }); + if (!get_cpu_success) { + return STATUS_FORMAT(RuntimeError, "Failed to retrieve CPU ticks. Got " + "[total_ticks, user-ticks, system_ticks]=$0.", cur_ticks2); + } + + uint64_t total_ticks = cur_ticks2[0] - cur_ticks1[0]; + uint64_t user_ticks = cur_ticks2[1] - cur_ticks1[1]; + uint64_t system_ticks = cur_ticks2[2] - cur_ticks1[2]; + if (total_ticks < 0) { + return STATUS_FORMAT(RuntimeError, "Failed to calculate CPU usage - " + "invalid total CPU ticks: $0.", total_ticks); + } else if (total_ticks == 0) { + cpu_usage.emplace_back(0); + cpu_usage.emplace_back(0); + } else { + cpu_usage.emplace_back(static_cast(user_ticks) / total_ticks); + cpu_usage.emplace_back(static_cast(system_ticks) / total_ticks); + } + + return cpu_usage; +} + + + //////////////////////////////////////////////////////////// // MetricsSnapshotter::Thread //////////////////////////////////////////////////////////// @@ -449,14 +484,14 @@ Status MetricsSnapshotter::Thread::DoMetricsSnapshot() { if (tserver_metrics_whitelist_.contains(kMetricWhitelistItemCpuUsage)) { // Store the {total_ticks, user_ticks, and system_ticks} - auto cur_ticks = CHECK_RESULT(GetCpuUsage()); + auto cur_ticks = CHECK_RESULT(MetricsSnapshotter::GetCpuUsage()); bool get_cpu_success = std::all_of( cur_ticks.begin(), cur_ticks.end(), [](bool v) { return v > 0; }); if (get_cpu_success && first_run_cpu_ticks_) { prev_ticks_ = cur_ticks; first_run_cpu_ticks_ = false; std::this_thread::sleep_for(std::chrono::milliseconds(500)); - cur_ticks = CHECK_RESULT(GetCpuUsage()); + cur_ticks = CHECK_RESULT(MetricsSnapshotter::GetCpuUsage()); get_cpu_success = std::all_of( cur_ticks.begin(), cur_ticks.end(), [](bool v) { return v > 0; }); } @@ -511,7 +546,49 @@ Status MetricsSnapshotter::Thread::DoMetricsSnapshot() { return Status::OK(); } -Result> MetricsSnapshotter::Thread::GetCpuUsage() { +Result> MetricsSnapshotter::GetMemoryUsage() { + uint64_t total_memory = 0, free_memory = 0, available_memory = 0; +#ifdef __APPLE__ + // Implementation for APPLE OS + // Retrieve physical memory information using sysctl + int mib[2]; + mib[0] = CTL_HW; + mib[1] = HW_MEMSIZE; + int64_t physical_memory; + size_t length = sizeof(physical_memory); + if (sysctl(mib, 2, &physical_memory, &length, NULL, 0) != 0) { + return STATUS(RuntimeError, "Failed to retrieve physical memory information"); + } + total_memory = physical_memory; + free_memory = 0; // Not available on macOS + available_memory = 0; // Not available on macOS +#else + // Implementation for Linux + FILE* file = fopen("/proc/meminfo", "r"); + if (!file) { + return STATUS(RuntimeError, "Failed to open /proc/meminfo"); + } + char line[128]; + while (fgets(line, sizeof(line), file)) { + if (strncmp(line, "MemTotal:", 9) == 0) { + sscanf(line + 9, "%lu", &total_memory); + } else if (strncmp(line, "MemFree:", 8) == 0) { + sscanf(line + 8, "%lu", &free_memory); + } else if (strncmp(line, "MemAvailable:", 13) == 0) { + sscanf(line + 13, "%lu", &available_memory); + } + } + fclose(file); + // proc meminfo reports in KB, convert to bytes + total_memory *= 1024; + free_memory *= 1024; + available_memory *= 1024; +#endif + vector ret = {total_memory, free_memory, available_memory}; + return ret; +} + +Result> MetricsSnapshotter::GetCpuUsage() { uint64_t total_ticks = 0, total_user_ticks = 0, total_system_ticks = 0; #ifdef __APPLE__ host_cpu_load_info_data_t cpuinfo; @@ -543,14 +620,14 @@ Result> MetricsSnapshotter::Thread::GetCpuUsage() { YB_LOG_EVERY_N_SECS(WARNING, 120) << Format("Failed to scan /proc/stat for cpu ticks. ", "Expected 4 inputs but got $0.", scanned); } else { - if (fclose(file)) { - YB_LOG_EVERY_N_SECS(WARNING, 120) << "Failed to close /proc/stat with errno: " - << strerror(errno); - } total_ticks = user_ticks + user_nice_ticks + system_ticks + idle_ticks; total_user_ticks = user_ticks + user_nice_ticks; total_system_ticks = system_ticks; } + if (fclose(file)) { + YB_LOG_EVERY_N_SECS(WARNING, 120) << "Failed to close /proc/stat with errno: " + << strerror(errno); + } #endif vector ret = {total_ticks, total_user_ticks, total_system_ticks}; return ret; diff --git a/src/yb/tserver/metrics_snapshotter.h b/src/yb/tserver/metrics_snapshotter.h index 91628b61158e..8a4b42de15c5 100644 --- a/src/yb/tserver/metrics_snapshotter.h +++ b/src/yb/tserver/metrics_snapshotter.h @@ -14,6 +14,7 @@ #pragma once #include +#include #include "yb/gutil/macros.h" @@ -32,10 +33,13 @@ class MetricsSnapshotter { Status Stop(); ~MetricsSnapshotter(); + static Result> GetCpuUsageInInterval(int ms); + static Result> GetMemoryUsage(); private: class Thread; std::unique_ptr thread_; + static Result> GetCpuUsage(); DISALLOW_COPY_AND_ASSIGN(MetricsSnapshotter); }; diff --git a/src/yb/tserver/pg_client.proto b/src/yb/tserver/pg_client.proto index b5d1fa0af550..b37601eeaf21 100644 --- a/src/yb/tserver/pg_client.proto +++ b/src/yb/tserver/pg_client.proto @@ -100,6 +100,8 @@ service PgClientService { rpc TabletsMetadata(PgTabletsMetadataRequestPB) returns (PgTabletsMetadataResponsePB); + rpc ServersMetrics(PgServersMetricsRequestPB) returns (PgServersMetricsResponsePB); + // DEPRECATED: GetReplicationSlot RPC is a superset of this GetReplicationSlotStatus. // So GetReplicationSlot should be used everywhere. rpc GetReplicationSlotStatus(PgGetReplicationSlotStatusRequestPB) @@ -964,3 +966,23 @@ message PgTabletsMetadataResponsePB { AppStatusPB status = 1; repeated tablet.TabletStatusPB tablets = 2; } + +message PgServersMetricsRequestPB { +} + +message PgServerMetricsInfoPB { + string uuid = 1; + repeated TserverMetricsInfoPB metrics = 2; + PgMetricsInfoStatus status = 3; + string error = 4; +} + +enum PgMetricsInfoStatus { + OK = 0; + ERROR = 1; +} + +message PgServersMetricsResponsePB { + AppStatusPB status = 1; + repeated PgServerMetricsInfoPB servers_metrics = 2; +} diff --git a/src/yb/tserver/pg_client_service.cc b/src/yb/tserver/pg_client_service.cc index 231fb1789700..a665a34001c4 100644 --- a/src/yb/tserver/pg_client_service.cc +++ b/src/yb/tserver/pg_client_service.cc @@ -69,6 +69,7 @@ #include "yb/util/debug.h" #include "yb/util/flags.h" #include "yb/util/flags/flag_tags.h" +#include "yb/util/jsonwriter.h" #include "yb/util/logging.h" #include "yb/util/net/net_util.h" #include "yb/util/random_util.h" @@ -81,6 +82,7 @@ #include "yb/util/thread.h" #include "yb/util/yb_pg_errcodes.h" + using namespace std::literals; DEFINE_UNKNOWN_uint64(pg_client_session_expiration_ms, 60000, @@ -1700,6 +1702,58 @@ class PgClientServiceImpl::Impl { return Status::OK(); } + Status ServersMetrics( + const PgServersMetricsRequestPB& req, PgServersMetricsResponsePB* resp, + rpc::RpcContext* context) { + + std::vector result; + std::vector> status_futures; + std::vector> node_responses; + + GetMetricsRequestPB metrics_req; + const auto remote_tservers = VERIFY_RESULT(tablet_server_.GetRemoteTabletServers()); + status_futures.reserve(remote_tservers.size()); + node_responses.reserve(remote_tservers.size()); + + for (const auto& remote_tserver : remote_tservers) { + RETURN_NOT_OK(remote_tserver->InitProxy(&client())); + auto proxy = remote_tserver->proxy(); + auto status_promise = std::make_shared>(); + status_futures.push_back(status_promise->get_future()); + auto node_resp = std::make_shared(); + node_responses.push_back(node_resp); + + std::shared_ptr controller = std::make_shared(); + controller->set_timeout(MonoDelta::FromMilliseconds(5000)); + + proxy->GetMetricsAsync(metrics_req, node_resp.get(), controller.get(), + [controller, status_promise] { + status_promise->set_value(controller->status()); + }); + } + for (size_t i = 0; i < status_futures.size(); ++i) { + auto& node_resp = node_responses[i]; + auto s = status_futures[i].get(); + tserver::PgServerMetricsInfoPB server_metrics; + server_metrics.set_uuid(remote_tservers[i]->permanent_uuid()); + if (!s.ok()) { + server_metrics.set_status(tserver::PgMetricsInfoStatus::ERROR); + server_metrics.set_error(s.ToUserMessage()); + } else if (node_resp->has_error()) { + server_metrics.set_status(tserver::PgMetricsInfoStatus::ERROR); + server_metrics.set_error(node_resp->error().status().message()); + } else { + server_metrics.mutable_metrics()->Swap(node_resp->mutable_metrics()); + server_metrics.set_status(tserver::PgMetricsInfoStatus::OK); + server_metrics.set_error(""); + } + result.emplace_back(std::move(server_metrics)); + } + + *resp->mutable_servers_metrics() = {result.begin(), result.end()}; + return Status::OK(); + } + #define PG_CLIENT_SESSION_METHOD_FORWARD(r, data, method) \ Status method( \ const BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)& req, \ diff --git a/src/yb/tserver/pg_client_service.h b/src/yb/tserver/pg_client_service.h index dd77ba7d7434..a38a7618e8f1 100644 --- a/src/yb/tserver/pg_client_service.h +++ b/src/yb/tserver/pg_client_service.h @@ -83,6 +83,7 @@ class TserverXClusterContextIf; (ReserveOids) \ (GetNewObjectId) \ (RollbackToSubTransaction) \ + (ServersMetrics) \ (SetActiveSubTransaction) \ (TabletsMetadata) \ (TabletServerCount) \ diff --git a/src/yb/tserver/tablet_server.cc b/src/yb/tserver/tablet_server.cc index 4f1b57dba386..ef3361a366b5 100644 --- a/src/yb/tserver/tablet_server.cc +++ b/src/yb/tserver/tablet_server.cc @@ -1538,6 +1538,54 @@ Result> TabletServer::GetLocalTabletsMetadat return result; } +Result> TabletServer::GetMetrics() const { + std::vector result; + + std::vector cpu_usage = VERIFY_RESULT(MetricsSnapshotter::GetCpuUsageInInterval(500)); + TserverMetricsInfoPB cpu_usage_user; + cpu_usage_user.set_name("cpu_usage_user"); + TserverMetricsInfoPB cpu_usage_system; + cpu_usage_system.set_name("cpu_usage_system"); + cpu_usage_user.set_value(std::to_string(cpu_usage[0])); + cpu_usage_system.set_value(std::to_string(cpu_usage[1])); + result.emplace_back(std::move(cpu_usage_user)); + result.emplace_back(std::move(cpu_usage_system)); + + std::vector memory_usage = VERIFY_RESULT(MetricsSnapshotter::GetMemoryUsage()); + TserverMetricsInfoPB node_memory_total; + node_memory_total.set_name("memory_total"); + node_memory_total.set_value(std::to_string(memory_usage[0])); + result.emplace_back(std::move(node_memory_total)); + TserverMetricsInfoPB node_memory_free; + node_memory_free.set_name("memory_free"); + node_memory_free.set_value(std::to_string(memory_usage[1])); + result.emplace_back(std::move(node_memory_free)); + TserverMetricsInfoPB node_memory_available; + node_memory_available.set_name("memory_available"); + node_memory_available.set_value(std::to_string(memory_usage[2])); + result.emplace_back(std::move(node_memory_available)); + + auto root_mem_tracker = MemTracker::GetRootTracker(); + int64_t tserver_root_memory_consumption = root_mem_tracker->consumption(); + int64_t tserver_root_memory_limit = root_mem_tracker->limit(); + int64_t tserver_root_memory_soft_limit = root_mem_tracker->soft_limit(); + TserverMetricsInfoPB tserver_root_memory_consumption_metric; + tserver_root_memory_consumption_metric.set_name("tserver_root_memory_consumption"); + tserver_root_memory_consumption_metric.set_value( + std::to_string(tserver_root_memory_consumption)); + result.emplace_back(std::move(tserver_root_memory_consumption_metric)); + TserverMetricsInfoPB tserver_root_memory_limit_metric; + tserver_root_memory_limit_metric.set_name("tserver_root_memory_limit"); + tserver_root_memory_limit_metric.set_value(std::to_string(tserver_root_memory_limit)); + result.emplace_back(std::move(tserver_root_memory_limit_metric)); + TserverMetricsInfoPB tserver_root_memory_soft_limit_metric; + tserver_root_memory_soft_limit_metric.set_name("tserver_root_memory_soft_limit"); + tserver_root_memory_soft_limit_metric.set_value(std::to_string(tserver_root_memory_soft_limit)); + result.emplace_back(std::move(tserver_root_memory_soft_limit_metric)); + + return result; +} + void TabletServer::SetCronLeaderLease(MonoTime cron_leader_lease_end) { SharedObject().SetCronLeaderLease(cron_leader_lease_end); } diff --git a/src/yb/tserver/tablet_server.h b/src/yb/tserver/tablet_server.h index 6fba940366ef..ea8d6cf05d83 100644 --- a/src/yb/tserver/tablet_server.h +++ b/src/yb/tserver/tablet_server.h @@ -377,6 +377,8 @@ class TabletServer : public DbServerBase, public TabletServerIf { Result> GetLocalTabletsMetadata() const override; + Result> GetMetrics() const override; + void TEST_SetIsCronLeader(bool is_cron_leader); struct PgClientServiceHolder { diff --git a/src/yb/tserver/tablet_server_interface.h b/src/yb/tserver/tablet_server_interface.h index 418dfb71ef42..c3671efd8e53 100644 --- a/src/yb/tserver/tablet_server_interface.h +++ b/src/yb/tserver/tablet_server_interface.h @@ -114,6 +114,7 @@ class TabletServerIf : public LocalTabletServer { tserver::PgYCQLStatementStatsResponsePB* resp) const = 0; virtual Result> GetLocalTabletsMetadata() const = 0; + virtual Result> GetMetrics() const = 0; }; } // namespace tserver diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index cad64566a70e..274746da6642 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -3103,6 +3103,19 @@ void TabletServiceImpl::CheckTserverTabletHealth(const CheckTserverTabletHealthR context.RespondSuccess(); } +void TabletServiceImpl::GetMetrics(const GetMetricsRequestPB* req, + GetMetricsResponsePB* resp, + rpc::RpcContext context) { + auto result = server_->GetMetrics(); + if (!result.ok()) { + SetupErrorAndRespond(resp->mutable_error(), result.status(), &context); + return; + } + vector metrics = result.get(); + *resp->mutable_metrics() = {metrics.begin(), metrics.end()}; + context.RespondSuccess(); +} + void TabletServiceImpl::GetLockStatus(const GetLockStatusRequestPB* req, GetLockStatusResponsePB* resp, rpc::RpcContext context) { diff --git a/src/yb/tserver/tablet_service.h b/src/yb/tserver/tablet_service.h index 286c84389686..bc1fcf0a7525 100644 --- a/src/yb/tserver/tablet_service.h +++ b/src/yb/tserver/tablet_service.h @@ -192,6 +192,10 @@ class TabletServiceImpl : public TabletServerServiceIf, public ReadTabletProvide GetLockStatusResponsePB* resp, rpc::RpcContext context) override; + void GetMetrics(const GetMetricsRequestPB* req, + GetMetricsResponsePB* resp, + rpc::RpcContext context) override; + // Method to cancel a given transaction. If the passed in request has a status tablet id, a cancel // transaction request is sent to that status tablet alone. Else, the request is broadcast to all // status tablets hosted at this server. diff --git a/src/yb/tserver/tserver_service.proto b/src/yb/tserver/tserver_service.proto index 19a53dc6aa0a..5b4ca24824f3 100644 --- a/src/yb/tserver/tserver_service.proto +++ b/src/yb/tserver/tserver_service.proto @@ -136,6 +136,8 @@ service TabletServerService { returns (AcquireObjectLockResponsePB); rpc ReleaseObjectLocks(ReleaseObjectLockRequestPB) returns (ReleaseObjectLockResponsePB); + + rpc GetMetrics(GetMetricsRequestPB) returns (GetMetricsResponsePB); } // Note: Either among transactions_by_tablet or transaction_ids should be set. Both the fields @@ -182,6 +184,15 @@ message GetLockStatusResponsePB { repeated TabletLockInfoPB tablet_lock_infos = 4; } +message GetMetricsRequestPB { + +} + +message GetMetricsResponsePB { + repeated TserverMetricsInfoPB metrics = 1; + optional TabletServerErrorPB error = 2; +} + message GetLogLocationRequestPB { } diff --git a/src/yb/yql/pggate/pg_client.cc b/src/yb/yql/pggate/pg_client.cc index 56fba06e66d7..eba7bc33a250 100644 --- a/src/yb/yql/pggate/pg_client.cc +++ b/src/yb/yql/pggate/pg_client.cc @@ -1227,6 +1227,15 @@ class PgClient::Impl : public BigDataFetcher { return resp; } + Result ServersMetrics() { + tserver::PgServersMetricsRequestPB req; + tserver::PgServersMetricsResponsePB resp; + + RETURN_NOT_OK(proxy_->ServersMetrics(req, &resp, PrepareController())); + RETURN_NOT_OK(ResponseStatus(resp)); + return resp; + } + private: std::string LogPrefix() const { return Format("Session id $0: ", session_id_); @@ -1546,6 +1555,10 @@ Result PgClient::TabletsMetadata() { return impl_->TabletsMetadata(); } +Result PgClient::ServersMetrics() { + return impl_->ServersMetrics(); +} + void PerformExchangeFuture::wait() const { if (!value_) { value_ = MakePerformResult(data_.get(), data_->CompletePerform()); diff --git a/src/yb/yql/pggate/pg_client.h b/src/yb/yql/pggate/pg_client.h index 19fffdc03773..4c5d05a78833 100644 --- a/src/yb/yql/pggate/pg_client.h +++ b/src/yb/yql/pggate/pg_client.h @@ -248,6 +248,8 @@ class PgClient { Result TabletsMetadata(); + Result ServersMetrics(); + using ActiveTransactionCallback = LWFunction; Status EnumerateActiveTransactions( diff --git a/src/yb/yql/pggate/pg_session.cc b/src/yb/yql/pggate/pg_session.cc index da9f4e4918e9..9d96cd2561d7 100644 --- a/src/yb/yql/pggate/pg_session.cc +++ b/src/yb/yql/pggate/pg_session.cc @@ -1054,4 +1054,8 @@ Result PgSession::TabletsMetadata() { return pg_client_.TabletsMetadata(); } +Result PgSession::ServersMetrics() { + return pg_client_.ServersMetrics(); +} + } // namespace yb::pggate diff --git a/src/yb/yql/pggate/pg_session.h b/src/yb/yql/pggate/pg_session.h index 64b24375a2a9..8a274999120a 100644 --- a/src/yb/yql/pggate/pg_session.h +++ b/src/yb/yql/pggate/pg_session.h @@ -282,6 +282,8 @@ class PgSession : public RefCountedThreadSafe { Result TabletsMetadata(); + Result ServersMetrics(); + private: Result DoLoadTable( const PgObjectId& table_id, bool fail_on_cache_hit, diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index 975682e05618..e9fab22b3db4 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -2281,6 +2281,10 @@ Result PgApiImpl::TabletsMetadata() { return pg_session_->TabletsMetadata(); } +Result PgApiImpl::ServersMetrics() { + return pg_session_->ServersMetrics(); +} + void PgApiImpl::ClearSessionState() { pg_session_->InvalidateForeignKeyReferenceCache(); pg_session_->DropBufferedOperations(); diff --git a/src/yb/yql/pggate/pggate.h b/src/yb/yql/pggate/pggate.h index 209500d2f1d7..e5dcca25d51c 100644 --- a/src/yb/yql/pggate/pggate.h +++ b/src/yb/yql/pggate/pggate.h @@ -799,6 +799,8 @@ class PgApiImpl { Result TabletsMetadata(); + Result ServersMetrics(); + bool IsCronLeader() const; [[nodiscard]] uint64_t GetCurrentReadTimePoint() const; diff --git a/src/yb/yql/pggate/ybc_pg_typedefs.h b/src/yb/yql/pggate/ybc_pg_typedefs.h index 526d91441b12..8543372830ef 100644 --- a/src/yb/yql/pggate/ybc_pg_typedefs.h +++ b/src/yb/yql/pggate/ybc_pg_typedefs.h @@ -776,6 +776,19 @@ typedef struct PgTabletsDescriptor { size_t partition_key_end_len; } YBCPgTabletsDescriptor; +typedef struct MetricsInfo { + const char* name; + const char* value; +} YBCMetricsInfo; + +typedef struct PgServerMetricsInfo { + const char* uuid; + YBCMetricsInfo* metrics; + const size_t metrics_count; + const char* status; + const char* error; +} YBCPgServerMetricsInfo; + typedef struct PgExplicitRowLockParams { int rowmark; int pg_wait_policy; diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index 59e7f55a6127..81fb8274205d 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -2637,6 +2637,42 @@ YBCStatus YBCLocalTablets(YBCPgTabletsDescriptor** tablets, size_t* count) { return YBCStatusOK(); } +YBCStatus YBCServersMetrics(YBCPgServerMetricsInfo** servers_metrics_info, size_t* count) { + const auto result = pgapi->ServersMetrics(); + if (!result.ok()) { + return ToYBCStatus(result.status()); + } + const auto& servers_metrics = result.get().servers_metrics(); + *count = servers_metrics.size(); + if (!servers_metrics.empty()) { + *servers_metrics_info = static_cast( + YBCPAlloc(sizeof(YBCPgServerMetricsInfo) * servers_metrics.size())); + YBCPgServerMetricsInfo* dest = *servers_metrics_info; + for (const auto& server_metrics_info : servers_metrics) { + size_t metrics_count = server_metrics_info.metrics().size(); + YBCMetricsInfo* metrics = + static_cast( + YBCPAlloc(sizeof(YBCMetricsInfo) * metrics_count)); + + int metrics_idx = 0; + for (const auto& metrics_info : server_metrics_info.metrics()) { + metrics[metrics_idx].name = YBCPAllocStdString(metrics_info.name()); + metrics[metrics_idx].value = YBCPAllocStdString(metrics_info.value()); + metrics_idx++; + } + new (dest) YBCPgServerMetricsInfo { + .uuid = YBCPAllocStdString(server_metrics_info.uuid()), + .metrics = metrics, + .metrics_count = metrics_count, + .status = YBCPAllocStdString(PgMetricsInfoStatus_Name(server_metrics_info.status())), + .error = YBCPAllocStdString(server_metrics_info.error()), + }; + ++dest; + } + } + return YBCStatusOK(); +} + bool YBCIsCronLeader() { return pgapi->IsCronLeader(); } uint64_t YBCPgGetCurrentReadTimePoint() { diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index 6dee3f9056f6..4d871f75fdd4 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -905,6 +905,8 @@ void YBCStoreTServerAshSamples( YBCStatus YBCLocalTablets(YBCPgTabletsDescriptor** tablets, size_t* count); +YBCStatus YBCServersMetrics(YBCPgServerMetricsInfo** serverMetricsInfo, size_t* count); + uint64_t YBCPgGetCurrentReadTimePoint(); YBCStatus YBCRestoreReadTimePoint(uint64_t read_time_point_handle); diff --git a/src/yb/yql/pgwrapper/ysql_migrations/V58__23542__yb_servers_metrics.sql b/src/yb/yql/pgwrapper/ysql_migrations/V58__23542__yb_servers_metrics.sql new file mode 100644 index 000000000000..dfe4d5ee4aa0 --- /dev/null +++ b/src/yb/yql/pgwrapper/ysql_migrations/V58__23542__yb_servers_metrics.sql @@ -0,0 +1,35 @@ +BEGIN; + SET LOCAL yb_non_ddl_txn_for_sys_tables_allowed TO true; + + INSERT INTO pg_catalog.pg_proc ( + oid, proname, pronamespace, proowner, prolang, procost, prorows, provariadic, protransform, + prokind, prosecdef, proleakproof, proisstrict, proretset, provolatile, proparallel, + pronargs, pronargdefaults, prorettype, proargtypes, proallargtypes, proargmodes, + proargnames, proargdefaults, protrftypes, prosrc, probin, proconfig, proacl + ) VALUES ( + 8072, 'yb_servers_metrics', 11, 10, 12, 1, 10, 0, '-', 'f', false, + false, true, true, 'v', 'r', 0, 0, 2249, '', '{25,3802,25,25}', + '{o,o,o,o}', '{uuid,metrics,status,error}', + NULL, NULL, 'yb_servers_metrics', NULL, NULL, NULL) + ON CONFLICT DO NOTHING; + + -- Create dependency records for everything we (possibly) created. + -- Since pg_depend has no OID or unique constraint, using PL/pgSQL instead. + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT FROM pg_catalog.pg_depend + WHERE refclassid = 1255 AND refobjid = 8072 + ) THEN + INSERT INTO pg_catalog.pg_depend ( + classid, objid, objsubid, refclassid, refobjid, refobjsubid, deptype + ) VALUES + (0, 0, 0, 1255, 8072, 0, 'p'); + END IF; + END $$; +COMMIT; + +-- Creating the system view yb_local_tablets +CREATE OR REPLACE VIEW pg_catalog.yb_servers_metrics WITH (use_initdb_acl = true) AS + SELECT * + FROM yb_servers_metrics();