Skip to content

Commit

Permalink
[#4599] YSQL: Fix calculation for transaction counts in YSQL metrics.
Browse files Browse the repository at this point in the history
Summary:
The metrics tracked by `handler_latency_yb_ysqlserver_SQLProcessor_Transactions` counts number of single-shard dml transactions that have been committed. That metric has been renamed to `handler_latency_yb_ysqlserver_SQLProcessor_Single_Shard_Transactions`.

The `handler_latency_yb_ysqlserver_SQLProcessor_Transactions` will now track all type of transactions including single-sharded, distributed, single-statement, transaction blocks, dml and ddl types.
A non-empty transaction block will be counted as a single transaction upon successfully commit (ie. no rollback).

Test Plan: Extended TestYSQLMetrics.java, manually tested using the metrics UI.

Reviewers: mihnea

Reviewed By: mihnea

Subscribers: yql

Differential Revision: https://phabricator.dev.yugabyte.com/D10292
  • Loading branch information
emhna committed Feb 5, 2021
1 parent e6b5bcc commit f4465f7
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 26 deletions.
16 changes: 12 additions & 4 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,12 @@ public class BasePgSQLTest extends BaseMiniClusterTest {
protected static final String INSERT_STMT_METRIC = METRIC_PREFIX + "InsertStmt";
protected static final String DELETE_STMT_METRIC = METRIC_PREFIX + "DeleteStmt";
protected static final String UPDATE_STMT_METRIC = METRIC_PREFIX + "UpdateStmt";
protected static final String BEGIN_STMT_METRIC = METRIC_PREFIX + "BeginStmt";
protected static final String COMMIT_STMT_METRIC = METRIC_PREFIX + "CommitStmt";
protected static final String ROLLBACK_STMT_METRIC = METRIC_PREFIX + "RollbackStmt";
protected static final String OTHER_STMT_METRIC = METRIC_PREFIX + "OtherStmts";
protected static final String SINGLE_SHARD_TRANSACTIONS_METRIC =
METRIC_PREFIX + "Single_Shard_Transactions";
protected static final String TRANSACTIONS_METRIC = METRIC_PREFIX + "Transactions";
protected static final String AGGREGATE_PUSHDOWNS_METRIC = METRIC_PREFIX + "AggregatePushdowns";

Expand Down Expand Up @@ -632,19 +637,22 @@ private long verifyQuery(Statement statement,

/** Time execution of a query. */
protected long verifyStatementMetric(
Statement statement, String query, String metricName,
int queryMetricDelta, int txnMetricDelta, boolean validStmt) throws Exception {
Statement statement, String query, String metricName, int queryMetricDelta,
int singleShardTxnMetricDelta, int txnMetricDelta, boolean validStmt) throws Exception {
return verifyQuery(
statement, query, validStmt,
new MetricCountChecker(
SINGLE_SHARD_TRANSACTIONS_METRIC, this::getMetric, singleShardTxnMetricDelta),
new MetricCountChecker(TRANSACTIONS_METRIC, this::getMetric, txnMetricDelta),
new MetricCountChecker(metricName, this::getMetric, queryMetricDelta));
}

protected void verifyStatementTxnMetric(
Statement statement, String query, int txnMetricDelta) throws Exception {
Statement statement, String query, int singleShardTxnMetricDelta) throws Exception {
verifyQuery(
statement, query,true,
new MetricCountChecker(TRANSACTIONS_METRIC, this::getMetric, txnMetricDelta));
new MetricCountChecker(
SINGLE_SHARD_TRANSACTIONS_METRIC, this::getMetric, singleShardTxnMetricDelta));
}

protected void verifyStatementMetricRows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private void verifyAggregatePushdownMetric(Statement statement,
String stmt,
boolean pushdown_expected) throws Exception {
long elapsedMillis = verifyStatementMetric(statement, stmt, AGGREGATE_PUSHDOWNS_METRIC,
pushdown_expected ? 1 : 0, 1, true);
pushdown_expected ? 1 : 0, 1, 1, true);
assertTrue(
String.format("Query took %d ms! Expected %d ms at most", elapsedMillis, maxTotalMillis),
elapsedMillis <= maxTotalMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1729,6 +1729,7 @@ private void verifyPushdown(Statement stmt) throws Exception {
query,
AGGREGATE_PUSHDOWNS_METRIC,
1 /* queryMetricDelta */,
1 /* singleShardTxnMetricDelta */,
1 /* txnMetricDelta */,
true /* validStmt */
);
Expand Down
2 changes: 1 addition & 1 deletion java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSelect.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ private void verifyStatementPushdownMetric(Statement statement,
String stmt,
boolean pushdown_expected) throws Exception {
verifyStatementMetric(statement, stmt, AGGREGATE_PUSHDOWNS_METRIC,
pushdown_expected ? 1 : 0, 1, true);
pushdown_expected ? 1 : 0, 1, 1, true);
}

private Long getCountForTable(String metricName, String tableName) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ public void testSingleRowNoTransaction() throws Exception {
"UPDATE test SET v = 3 WHERE k = 1", 1);

// Verify JDBC single-row prepared statements use non-txn path.
long oldTxnValue = getMetricCounter(TRANSACTIONS_METRIC);
long oldTxnValue = getMetricCounter(SINGLE_SHARD_TRANSACTIONS_METRIC);

PreparedStatement insertStatement =
connection.prepareStatement("INSERT INTO test VALUES (?, ?)");
Expand All @@ -759,7 +759,7 @@ public void testSingleRowNoTransaction() throws Exception {
updateStatement.setInt(2, 1);
updateStatement.executeUpdate();

long newTxnValue = getMetricCounter(TRANSACTIONS_METRIC);
long newTxnValue = getMetricCounter(SINGLE_SHARD_TRANSACTIONS_METRIC);
assertEquals(oldTxnValue, newTxnValue);
}

Expand Down
156 changes: 142 additions & 14 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestYSQLMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,42 +38,170 @@ public void testMetrics() throws Exception {

// DDL is non-txn.
verifyStatementMetric(statement, "CREATE TABLE test (k int PRIMARY KEY, v int)",
OTHER_STMT_METRIC, 1, 0, true);
OTHER_STMT_METRIC, 1, 0, 1, true);

// Select uses txn.
verifyStatementMetric(statement, "SELECT * FROM test",
SELECT_STMT_METRIC, 1, 1, true);
SELECT_STMT_METRIC, 1, 1, 1, true);

// Non-txn insert.
verifyStatementMetric(statement, "INSERT INTO test VALUES (1, 1)",
INSERT_STMT_METRIC, 1, 0, true);
INSERT_STMT_METRIC, 1, 0, 1, true);
// Txn insert.
statement.execute("BEGIN");
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "INSERT INTO test VALUES (2, 2)",
INSERT_STMT_METRIC, 1, 1, true);
statement.execute("END");
INSERT_STMT_METRIC, 1, 1, 0, true);
verifyStatementMetric(statement, "END",
COMMIT_STMT_METRIC, 1, 0, 1, true);

// Limit query on complex view (issue #3811).
verifyStatementMetric(statement, "SELECT * FROM information_schema.key_column_usage LIMIT 1",
SELECT_STMT_METRIC, 1, 1, true);
SELECT_STMT_METRIC, 1, 1, 1, true);

// Non-txn update.
verifyStatementMetric(statement, "UPDATE test SET v = 2 WHERE k = 1",
UPDATE_STMT_METRIC, 1, 0, true);
UPDATE_STMT_METRIC, 1, 0, 1, true);
// Txn update.
verifyStatementMetric(statement, "UPDATE test SET v = 3",
UPDATE_STMT_METRIC, 1, 1, true);
UPDATE_STMT_METRIC, 1, 1, 1, true);

// Non-txn delete.
verifyStatementMetric(statement, "DELETE FROM test WHERE k = 2",
DELETE_STMT_METRIC, 1, 0, true);
DELETE_STMT_METRIC, 1, 0, 1, true);
// Txn delete.
verifyStatementMetric(statement, "DELETE FROM test",
DELETE_STMT_METRIC, 1, 1, true);
DELETE_STMT_METRIC, 1, 1, 1, true);

// Invalid statement should not update metrics.
verifyStatementMetric(statement, "INSERT INTO invalid_table VALUES (1)",
INSERT_STMT_METRIC, 0, 0, false);
INSERT_STMT_METRIC, 0, 0, 0, false);

// DML queries transaction block.
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "INSERT INTO test VALUES (3, 3)",
INSERT_STMT_METRIC, 1, 1, 0, true);
verifyStatementMetric(statement, "INSERT INTO test VALUES (4, 4)",
INSERT_STMT_METRIC, 1, 1, 0, true);
verifyStatementMetric(statement, "END",
COMMIT_STMT_METRIC, 1, 0, 1, true);

// DDL queries transaction block.
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "CREATE TABLE test2 (a int)",
OTHER_STMT_METRIC, 1, 0, 1, true);
verifyStatementMetric(statement, "DROP TABLE test2",
OTHER_STMT_METRIC, 1, 0, 1, true);
verifyStatementMetric(statement, "END",
COMMIT_STMT_METRIC, 1, 0, 0, true);

// Set session variable in transaction block.
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "SET yb_debug_report_error_stacktrace=true;",
OTHER_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "COMMIT",
COMMIT_STMT_METRIC, 1, 0, 1, true);

// DML/DDL queries transaction block.
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "INSERT INTO test VALUES (5, 5)",
INSERT_STMT_METRIC, 1, 1, 0, true);
verifyStatementMetric(statement, "CREATE TABLE test2 (a int)",
OTHER_STMT_METRIC, 1, 0, 1, true);
verifyStatementMetric(statement, "INSERT INTO test VALUES (6, 6)",
INSERT_STMT_METRIC, 1, 1, 0, true);
verifyStatementMetric(statement, "UPDATE test SET k = 600 WHERE k = 6",
UPDATE_STMT_METRIC, 1, 1, 0, true);
verifyStatementMetric(statement, "ALTER TABLE test2 ADD COLUMN b INT",
OTHER_STMT_METRIC, 1, 0, 1, true);
verifyStatementMetric(statement, "DROP TABLE test2",
OTHER_STMT_METRIC, 1, 0, 1, true);
verifyStatementMetric(statement, "COMMIT",
COMMIT_STMT_METRIC, 1, 0, 1, true);

// DML/DDL queries transaction block with rollback.
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "INSERT INTO test VALUES (7, 7)",
INSERT_STMT_METRIC, 1, 1, 0, true);
verifyStatementMetric(statement, "CREATE TABLE test2 (a int)",
OTHER_STMT_METRIC, 1, 0, 1, true);
verifyStatementMetric(statement, "INSERT INTO test VALUES (8, 8)",
INSERT_STMT_METRIC, 1, 1, 0, true);
verifyStatementMetric(statement, "UPDATE test SET k = 800 WHERE k = 8",
UPDATE_STMT_METRIC, 1, 1, 0, true);
verifyStatementMetric(statement, "ALTER TABLE test2 ADD COLUMN b INT",
OTHER_STMT_METRIC, 1, 0, 1, true);
verifyStatementMetric(statement, "DROP TABLE test2",
OTHER_STMT_METRIC, 1, 0, 1, true);
verifyStatementMetric(statement, "ROLLBACK",
ROLLBACK_STMT_METRIC, 1, 0, 0, true);

// Nested transaction block.
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "CREATE TABLE test2 (a int)",
OTHER_STMT_METRIC, 1, 0, 1, true);
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "INSERT INTO test VALUES (9, 9)",
INSERT_STMT_METRIC, 1, 1, 0, true);
verifyStatementMetric(statement, "END",
COMMIT_STMT_METRIC, 1, 0, 1, true);

// Nested transaction block with empty inner transaction block.
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "DELETE FROM test WHERE k = 9;",
DELETE_STMT_METRIC, 1, 1, 0, true);
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "END",
COMMIT_STMT_METRIC, 1, 0, 1, true);

// Invalid transaction block.
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "INSERT INTO invalid_table VALUES (1)",
INSERT_STMT_METRIC, 0, 0, 0, false);
verifyStatementMetric(statement, "END",
COMMIT_STMT_METRIC, 1, 0, 0, true);

// Empty transaction block.
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "END",
COMMIT_STMT_METRIC, 1, 0, 0, true);

// Empty transaction block with DML execution prior to BEGIN.
verifyStatementMetric(statement, "INSERT INTO test VALUES (10, 10)",
INSERT_STMT_METRIC, 1, 0, 1, true);
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "END",
COMMIT_STMT_METRIC, 1, 0, 0, true);

// Empty nested transaction block.
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "END",
COMMIT_STMT_METRIC, 1, 0, 0, true);

// Extra COMMIT statement.
verifyStatementMetric(statement, "BEGIN",
BEGIN_STMT_METRIC, 1, 0, 0, true);
verifyStatementMetric(statement, "INSERT INTO test VALUES (11, 11)",
INSERT_STMT_METRIC, 1, 1, 0, true);
verifyStatementMetric(statement, "COMMIT",
COMMIT_STMT_METRIC, 1, 0, 1, true);
verifyStatementMetric(statement, "COMMIT",
COMMIT_STMT_METRIC, 1, 0, 0, true);
}

@Test
Expand Down Expand Up @@ -101,12 +229,12 @@ public void testMetricRows() throws Exception {

verifyStatementMetricRows(
stmt, "INSERT INTO test VALUES (6, 6), (7, 7)",
TRANSACTIONS_METRIC, 1, 2);
SINGLE_SHARD_TRANSACTIONS_METRIC, 1, 2);

// Single row transaction.
verifyStatementMetricRows(
stmt, "INSERT INTO test VALUES (8, 8)",
TRANSACTIONS_METRIC, 0, 0);
SINGLE_SHARD_TRANSACTIONS_METRIC, 0, 0);

verifyStatementMetricRows(
stmt, "DELETE FROM test",
Expand Down
62 changes: 62 additions & 0 deletions src/postgres/contrib/yb_pg_metrics/yb_pg_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ typedef enum statementType
Commit,
Rollback,
Other,
Single_Shard_Transaction,
Transaction,
AggregatePushdown,
kMaxStatementType
Expand All @@ -73,6 +74,19 @@ static int statement_nesting_level = 0;
*/
static int block_nesting_level = 0;

/*
* Flag to determine whether a transaction block has been entered.
*/
static bool is_inside_transaction_block = false;

/*
* Flag to determine whether a DML or Other statement type has been executed.
* Multiple statements will count as a single transaction within a transaction block.
* DDL statements which are autonomous will be counted as its own transaction
* even within a transaction block.
*/
static bool is_statement_executed = false;

char *metric_node_name = NULL;
struct WebserverWrapper *webserver = NULL;
int port = 0;
Expand Down Expand Up @@ -157,6 +171,8 @@ set_metric_names(void)
strcpy(ybpgm_table[Commit].name, YSQL_METRIC_PREFIX "CommitStmt");
strcpy(ybpgm_table[Rollback].name, YSQL_METRIC_PREFIX "RollbackStmt");
strcpy(ybpgm_table[Other].name, YSQL_METRIC_PREFIX "OtherStmts");
strcpy(ybpgm_table[Single_Shard_Transaction].name,
YSQL_METRIC_PREFIX "Single_Shard_Transactions");
strcpy(ybpgm_table[Transaction].name, YSQL_METRIC_PREFIX "Transactions");
strcpy(ybpgm_table[AggregatePushdown].name, YSQL_METRIC_PREFIX "AggregatePushdowns");
}
Expand Down Expand Up @@ -529,6 +545,8 @@ ybpgm_ExecutorEnd(QueryDesc *queryDesc)
break;
}

is_statement_executed = true;

/* Collecting metric.
* - Only processing metric for top level statement in top level portal.
* For example, CURSOR execution can have many nested portal and nested statement. The metric
Expand All @@ -546,6 +564,9 @@ ybpgm_ExecutorEnd(QueryDesc *queryDesc)
ybpgm_Store(type, time, rows_count);

if (!queryDesc->estate->es_yb_is_single_row_modify_txn)
ybpgm_Store(Single_Shard_Transaction, time, rows_count);

if (!is_inside_transaction_block)
ybpgm_Store(Transaction, time, rows_count);

if (IsA(queryDesc->planstate, AggState) &&
Expand Down Expand Up @@ -660,6 +681,47 @@ ybpgm_ProcessUtility(PlannedStmt *pstmt, const char *queryString,

INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_SUBTRACT(end, start);

bool is_catalog_version_increment = false;
bool is_breaking_catalog_change = false;
if (IsTransactionalDdlStatement(pstmt,
&is_catalog_version_increment,
&is_breaking_catalog_change))
{
ybpgm_Store(Transaction, INSTR_TIME_GET_MICROSEC(end), 0);
}
else if (type == Other)
{
is_statement_executed = true;
}

if (type == Begin && !is_inside_transaction_block)
{
is_inside_transaction_block = true;
is_statement_executed = false;
}
if (type == Rollback)
{
is_inside_transaction_block = false;
is_statement_executed = false;
}
/*
* TODO: Once savepoint and rollback to specific transaction are supported,
* transaction block counter needs to be revisited.
* Current logic is to increment non-empty transaction block by 1
* if non-DDL statement types executed prior to committing.
*/
if (type == Commit) {
if (strcmp(completionTag, "ROLLBACK") != 0 &&
is_inside_transaction_block &&
is_statement_executed)
{
ybpgm_Store(Transaction, INSTR_TIME_GET_MICROSEC(end), 0);
}
is_inside_transaction_block = false;
is_statement_executed = false;
}

ybpgm_Store(type, INSTR_TIME_GET_MICROSEC(end), 0 /* rows */);
}
else
Expand Down
6 changes: 3 additions & 3 deletions src/postgres/src/backend/utils/misc/pg_yb_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -923,9 +923,9 @@ YBDecrementDdlNestingLevel(bool success,
}
}

static bool IsTransactionalDdlStatement(PlannedStmt *pstmt,
bool *is_catalog_version_increment,
bool *is_breaking_catalog_change)
bool IsTransactionalDdlStatement(PlannedStmt *pstmt,
bool *is_catalog_version_increment,
bool *is_breaking_catalog_change)
{
/* Assume the worst. */
*is_catalog_version_increment = true;
Expand Down
Loading

0 comments on commit f4465f7

Please sign in to comment.