Skip to content

Commit

Permalink
[#23542] YSQL: Add new YSQL function yb_servers_metrics() to fetch me…
Browse files Browse the repository at this point in the history
…trics such as cpu/memory usage from all nodes in cluster

Summary:
To enable adaptive parallelism in voyager, https://docs.google.com/document/d/1beD7zNtpmfYflXV1hVJ9mq_uqyCTJ9Es4titPEksSNE/edit#heading=h.3c3bf00hwf, a YSQL function yb_servers_metrics() is added
which will fetch certain metrics for all nodes in the cluster. This allows voyager to  monitor the state of the cluster, and adapt the parallelism while importing data to target YB cluster. A YSQL API is needed in
order to provide deployment-agnostic API (not having to fetch metrics for YBA/YBM/on-prem using different mechanisms).

Additionally, made a few changes to `MetricsSnapshotter`
- Introduced a function for GetCpuUsageInInterval(int ms).
- made the GetCpuUsage function static.
- Introduced a `GetMemoryUsage` function to get memory usage (from proc/meminfo for linux and sysctl for macos)

Sample output:

```
yugabyte=# select uuid, jsonb_pretty(metrics), status, error from yb_servers_metrics();
               uuid               |                    jsonb_pretty                     | status | error
----------------------------------+-----------------------------------------------------+--------+-------
 bf98c74dd7044b34943c5bff7bd3d0d1 | {                                                  +| OK     |
                                  |     "memory_free": "0",                            +|        |
                                  |     "memory_total": "17179869184",                 +|        |
                                  |     "cpu_usage_user": "0.135827",                  +|        |
                                  |     "cpu_usage_system": "0.118110",                +|        |
                                  |     "memory_available": "0",                       +|        |
                                  |     "tserver_root_memory_limit": "11166914969",    +|        |
                                  |     "tserver_root_memory_soft_limit": "9491877723",+|        |
                                  |     "tserver_root_memory_consumption": "52346880"  +|        |
                                  | }                                                   |        |
 d105c3a6128640f5a25cc74435e48ae3 | {                                                  +| OK     |
                                  |     "memory_free": "0",                            +|        |
                                  |     "memory_total": "17179869184",                 +|        |
                                  |     "cpu_usage_user": "0.135189",                  +|        |
                                  |     "cpu_usage_system": "0.119284",                +|        |
                                  |     "memory_available": "0",                       +|        |
                                  |     "tserver_root_memory_limit": "11166914969",    +|        |
                                  |     "tserver_root_memory_soft_limit": "9491877723",+|        |
                                  |     "tserver_root_memory_consumption": "55074816"  +|        |
                                  | }                                                   |        |
 a321e13e5bf24060a764b35894cd4070 | {                                                  +| OK     |
                                  |     "memory_free": "0",                            +|        |
                                  |     "memory_total": "17179869184",                 +|        |
                                  |     "cpu_usage_user": "0.135827",                  +|        |
                                  |     "cpu_usage_system": "0.118110",                +|        |
                                  |     "memory_available": "0",                       +|        |
                                  |     "tserver_root_memory_limit": "11166914969",    +|        |
                                  |     "tserver_root_memory_soft_limit": "9491877723",+|        |
                                  |     "tserver_root_memory_consumption": "62062592"  +|        |
                                  | }                                                   |        |
```

**Upgrade/Rollback safety:**
This is a new YSQL function, so there won't be any prior users of this function. In case of an upgrade/rollback, the sql migration (that adds the function to pg_proc) will only run when the upgrade is being finalized (i.e. after all tservers are updated). Hence, it will not be possible to get errors due to a subset of tservers not being upgraded because the function itself will not be available to call.

Test Plan: ./yb_build.sh --java-test 'org.yb.pgsql.TestYbServersMetrics#testYBServersMetricsFunction'

Reviewers: asaha, djiang, telgersma

Reviewed By: djiang, telgersma

Subscribers: hbhanawat, yql, ybase, amakala

Differential Revision: https://phorge.dev.yugabyte.com/D37267
  • Loading branch information
makalaaneesh committed Sep 20, 2024
1 parent 294b7bb commit 872b59e
Show file tree
Hide file tree
Showing 31 changed files with 624 additions and 13 deletions.
131 changes: 131 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestYbServersMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

package org.yb.pgsql;

import com.google.common.net.HostAndPort;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yb.minicluster.MiniYBClusterBuilder;
import org.yb.YBTestRunner;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;

import static org.yb.AssertionWrappers.*;

@RunWith(value = YBTestRunner.class)
public class TestYbServersMetrics extends BasePgSQLTest {
private static final Logger LOG = LoggerFactory.getLogger(TestYbServersMetrics.class);
private static final int NUM_TSERVERS = 3;
private static final int RF = 3;
private static ArrayList<String> expectedKeys = new ArrayList<String>(Arrays.asList(
"memory_free", "memory_available", "memory_total",
"tserver_root_memory_limit", "tserver_root_memory_soft_limit",
"tserver_root_memory_consumption",
"cpu_usage_user", "cpu_usage_system"));

@Override
public ConnectionBuilder getConnectionBuilder() {
ConnectionBuilder cb = new ConnectionBuilder(miniCluster);
cb.setLoadBalance(true);
return cb;
}

@Override
protected void customizeMiniClusterBuilder(MiniYBClusterBuilder builder){
super.customizeMiniClusterBuilder(builder);
builder.numTservers(NUM_TSERVERS);
builder.replicationFactor(RF);
builder.tserverHeartbeatTimeoutMs(7000);
}

private void assertYbServersMetricsOutput(int expectedRows, int expectedStatusOkRows,
int tserverNo) throws Exception{
ConnectionBuilder b = getConnectionBuilder();
if (tserverNo >= 0){
b = b.withTServer(tserverNo);
}
Connection conn = b.connect();
try {
Statement st = conn.createStatement();
final long startTimeMillis = System.currentTimeMillis();
ResultSet rs = st.executeQuery("select * from yb_servers_metrics()");
final long result = System.currentTimeMillis() - startTimeMillis;
// There is a timeout of 5000ms for each RPC call to tserver.
assertLessThan(result, Long.valueOf(6000));
int row_count = 0;
int ok_count = 0;
List<String> errors = new ArrayList<String>();
while (rs.next()) {
String uuid = rs.getString(1);
String metrics = rs.getString(2);
String status = rs.getString(3);
String error = rs.getString(4);
if (status.equals("OK")) {
++ok_count;
JSONObject metricsJson = new JSONObject(metrics);
ArrayList<String> metricKeys = new ArrayList<String>(metricsJson.keySet());
assertTrue("Expected keys are not present. Present keys are:"
+ metricKeys,
metricKeys.containsAll(expectedKeys));
} else {
assertEquals("{}", metrics);
errors.add(error);
}
++row_count;
}
assertEquals("Unexpected tservers count", expectedRows, row_count);
assertEquals("Unexpected OK tserver count. Errors: "+ errors, expectedStatusOkRows, ok_count);
} catch (SQLException e) {
throw new RuntimeException("Failed to execute yb_servers_metrics query", e);
} finally {
conn.close();
}
}

@Test
public void testYBServersMetricsFunction() throws Exception {
assertYbServersMetricsOutput(NUM_TSERVERS, NUM_TSERVERS, -1);

// add a new tserver
miniCluster.startTServer(getTServerFlags());
assertTrue(miniCluster.waitForTabletServers(4));
waitForTServerHeartbeat();
assertYbServersMetricsOutput(NUM_TSERVERS + 1, NUM_TSERVERS + 1, -1);

// kill a tserver
// closing root connection to avoid potential errors during clean up.
connection.close();
List<HostAndPort> tserverList = new ArrayList<>(miniCluster.getTabletServers().keySet());
HostAndPort tserver = tserverList.get(tserverList.size() - 1);
miniCluster.killTabletServerOnHostPort(tserver);
// Initially we will get NUM_TSERVERS + 1 rows, with one of them having status as "ERROR"
//killed last tserver, so connect to first
assertYbServersMetricsOutput(NUM_TSERVERS + 1, NUM_TSERVERS, 0);

// After the tserver is removed and updated in cache,
// we will get NUM_TSERVERS rows, with all of them having status as "OK"
Thread.sleep(2 * miniCluster.getClusterParameters().getTServerHeartbeatTimeoutMs());
assertYbServersMetricsOutput(NUM_TSERVERS, NUM_TSERVERS, 0);
}

}
4 changes: 4 additions & 0 deletions src/postgres/src/backend/catalog/yb_system_views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ CREATE VIEW yb_query_diagnostics_status AS
SELECT *
FROM yb_get_query_diagnostics_status();

CREATE VIEW yb_servers_metrics AS
SELECT *
FROM yb_servers_metrics();

CREATE VIEW pg_roles AS
SELECT
rolname,
Expand Down
100 changes: 100 additions & 0 deletions src/postgres/src/backend/utils/misc/pg_yb_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
#include "utils/spccache.h"
#include "utils/syscache.h"
#include "utils/uuid.h"
#include "utils/jsonb.h"
#include "fmgr.h"
#include "funcapi.h"
#include "mb/pg_wchar.h"
Expand Down Expand Up @@ -3548,6 +3549,105 @@ yb_local_tablets(PG_FUNCTION_ARGS)
return (Datum) 0;
}

static Datum GetMetricsAsJsonbDatum(YBCMetricsInfo* metrics, size_t metricsCount){
JsonbParseState *state = NULL;
JsonbValue result;
JsonbValue key;
JsonbValue value;
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
for (int j = 0; j < metricsCount; j++) {
key.type = jbvString;
key.val.string.val = (char *)metrics[j].name;
key.val.string.len = strlen(metrics[j].name);
pushJsonbValue(&state, WJB_KEY, &key);

value.type = jbvString;
value.val.string.val = (char *)metrics[j].value;
value.val.string.len = strlen(metrics[j].value);
pushJsonbValue(&state, WJB_VALUE, &value);
}
result = *pushJsonbValue(&state, WJB_END_OBJECT, NULL);
Jsonb *jsonb = JsonbValueToJsonb(&result);
return JsonbPGetDatum(jsonb);
}

Datum
yb_servers_metrics(PG_FUNCTION_ARGS)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
int i;
#define YB_SERVERS_METRICS_COLS 4

/* only superuser and yb_db_admin can query this function */
if (!superuser() && !IsYbDbAdminUser(GetUserId()))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("only superusers and yb_db_admin can query yb_servers_metrics"))));

/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));

if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not " \
"allowed in this context")));

/*
* Switch context to construct returned data structures and store
* returned values from tserver.
*/
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);

/* Build a tuple descriptor */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
ereport(ERROR,
(errmsg_internal("return type must be a row type")));

tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;

YBCPgServerMetricsInfo *servers_metrics_info = NULL;
size_t num_servers = 0;
HandleYBStatus(YBCServersMetrics(&servers_metrics_info, &num_servers));

for (i = 0; i < num_servers; ++i)
{
YBCPgServerMetricsInfo *metricsInfo = (YBCPgServerMetricsInfo *)servers_metrics_info + i;
Datum values[YB_SERVERS_METRICS_COLS];
bool nulls[YB_SERVERS_METRICS_COLS];

memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));

values[0] = CStringGetTextDatum(metricsInfo->uuid);
values[1] = GetMetricsAsJsonbDatum(metricsInfo->metrics, metricsInfo->metrics_count);
values[2] = CStringGetTextDatum(metricsInfo->status);
values[3] = CStringGetTextDatum(metricsInfo->error);

tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}

#undef YB_SERVERS_METRICS_COLS

/* clean up and return the tuplestore */
tuplestore_donestoring(tupstore);

MemoryContextSwitchTo(oldcontext);

return (Datum) 0;
}

/*---------------------------------------------------------------------------*/
/* Deterministic DETAIL order */
/*---------------------------------------------------------------------------*/
Expand Down
2 changes: 1 addition & 1 deletion src/postgres/src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* If you increment it, make sure you didn't forget to add a new SQL migration
* (see pg_yb_migration.dat and src/yb/yql/pgwrapper/ysql_migrations/README.md)
*/
#define YB_LAST_USED_OID 8071
#define YB_LAST_USED_OID 8072

extern bool IsSystemRelation(Relation relation);
extern bool IsToastRelation(Relation relation);
Expand Down
9 changes: 9 additions & 0 deletions src/postgres/src/include/catalog/pg_proc.dat
Original file line number Diff line number Diff line change
Expand Up @@ -10536,4 +10536,13 @@
proparallel => 'r', prorettype => 'void', proargtypes => 'bool',
prosrc => 'binary_upgrade_set_next_tablegroup_default' },

{ oid => '8072',
descr => 'Get metrics of all nodes',
proname => 'yb_servers_metrics', prorows => '10',
proretset => 't', provolatile => 'v', proparallel => 'r',
prorettype => 'record', proargtypes => '',
proallargtypes => '{text,jsonb,text,text}',
proargnames => '{uuid,metrics,status,error}',
proargmodes => '{o,o,o,o}',
prosrc => 'yb_servers_metrics'},
]
4 changes: 2 additions & 2 deletions src/postgres/src/include/catalog/pg_yb_migration.dat
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
[

# For better version control conflict detection, list latest migration filename
# here: V57__23312__binary_upgrade_set_next_tablegroup_default.sql
{ major => '57', minor => '0', name => '<baseline>', time_applied => '_null_' }
# here: V58__23542__yb_servers_metrics.sql
{ major => '58', minor => '0', name => '<baseline>', time_applied => '_null_' }

]
5 changes: 5 additions & 0 deletions src/postgres/src/test/regress/expected/yb_pg_rules.out
Original file line number Diff line number Diff line change
Expand Up @@ -2451,6 +2451,11 @@ yb_query_diagnostics_status| SELECT yb_get_query_diagnostics_status.status,
yb_get_query_diagnostics_status.explain_params,
yb_get_query_diagnostics_status.path
FROM yb_get_query_diagnostics_status() yb_get_query_diagnostics_status(status, description, query_id, start_time, diagnostics_interval_sec, bind_var_query_min_duration_ms, explain_params, path);
yb_servers_metrics| SELECT yb_servers_metrics.uuid,
yb_servers_metrics.metrics,
yb_servers_metrics.status,
yb_servers_metrics.error
FROM yb_servers_metrics() yb_servers_metrics(uuid, metrics, status, error);
yb_terminated_queries| SELECT d.datname AS databasename,
s.backend_pid,
s.query_text,
Expand Down
5 changes: 5 additions & 0 deletions src/yb/common/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -711,3 +711,8 @@ message CDCSDKStreamCreateOptionsPB {
optional
CDCSDKDynamicTablesOption cdcsdk_dynamic_tables_option = 1 [default = DYNAMIC_TABLES_ENABLED];
}

message TserverMetricsInfoPB {
required string name = 1;
required string value = 2;
}
5 changes: 5 additions & 0 deletions src/yb/master/master_tserver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,5 +219,10 @@ Result<std::vector<tablet::TabletStatusPB>> MasterTabletServer::GetLocalTabletsM
return STATUS_FORMAT(InternalError, "Unexpected call of GetLocalTabletsMetadata()");
}

Result<std::vector<TserverMetricsInfoPB>> MasterTabletServer::GetMetrics() const {
LOG(DFATAL) << "Unexpected call of GetMetrics()";
return STATUS_FORMAT(InternalError, "Unexpected call of GetMetrics()");
}

} // namespace master
} // namespace yb
2 changes: 2 additions & 0 deletions src/yb/master/master_tserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class MasterTabletServer : public tserver::TabletServerIf,

virtual Result<std::vector<tablet::TabletStatusPB>> GetLocalTabletsMetadata() const override;

virtual Result<std::vector<TserverMetricsInfoPB>> GetMetrics() const override;

private:
Master* master_ = nullptr;
scoped_refptr<MetricEntity> metric_entity_;
Expand Down
Loading

0 comments on commit 872b59e

Please sign in to comment.