Skip to content

Commit

Permalink
[#23788] YSQL, QueryDiagnostics: Fixing issues in pg_stat_statements …
Browse files Browse the repository at this point in the history
…when no query executed

Summary:
- Fixes issue where pgss_copy.query_len being 0 leads to test failures, especially in ASAN builds, when a diagnostics bundle is triggered but no query is executed.
- Adds logic to handle cases where pg_stat_statements_reset() is called during the bundle's duration, dumping only counters data with an empty query string.
- Resolves problem of losing the last character in the pg_stat_statements.csv file.
- Modifies the behavior of GetAshRangeIndexes() to append the description "No data available in ASH for the given time range" instead of overwriting the existing catalog description.
- Removes unnecessary AddinShmemInitLock lock to improve efficiency in this scenario.
- Even in the case :-
```
bundle started.
query executed
pgss reset
query executed
bundle ended
```
`pg_stat_statements was reset, query string not available;` warning is displayed through yb_query_diagnostics_status. This is intentional as if we were to implement a check for last_time_query_bundled against last_time_reset, it would require a `GetCurrentTimestamp()` call per bundled query, which could be expensive.
Jira: DB-12692

Test Plan:
./yb_build.sh --java-test TestYbQueryDiagnostics#checkAshData
./yb_build.sh --java-test TestYbQueryDiagnostics#checkPgssData

Reviewers: asaha, telgersma

Reviewed By: asaha, telgersma

Subscribers: yql

Differential Revision: https://phorge.dev.yugabyte.com/D37895
  • Loading branch information
IshanChhangani committed Sep 25, 2024
1 parent 35b12d2 commit 008f885
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 71 deletions.
177 changes: 168 additions & 9 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestYbQueryDiagnostics.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -28,6 +29,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import org.json.JSONObject;
import org.junit.Before;
Expand Down Expand Up @@ -57,6 +59,11 @@ public QueryDiagnosticsParams(int diagnosticsInterval, int explainSampleRate,
}

private static final int ASH_SAMPLING_INTERVAL_MS = 500;
private static final String noQueriesExecutedWarning = "No query executed;";
private static final String pgssResetWarning =
"pg_stat_statements was reset, query string not available;";
private static final String permissionDeniedWarning =
"Failed to create query diagnostics directory, Permission denied;";

@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -293,9 +300,7 @@ public void testYbQueryDiagnosticsStatus() throws Exception {
assertQueryDiagnosticsStatus(resultSet,
fileErrorBundlePath.getParent() /* expectedViewPath */,
"Error" /* expectedStatus */,
"Failed to create query " +
"diagnostics directory, " +
"Permission denied;" /* expectedDescription */,
permissionDeniedWarning /* expectedDescription */,
fileErrorRunParams);

resultSet = statement.executeQuery(
Expand Down Expand Up @@ -417,8 +422,15 @@ public void checkAshData() throws Exception {
long sleep_time_s = diagnosticsInterval + 1;

try (Statement statement = connection.createStatement()) {
String queryId = getQueryIdFromPgStatStatements(statement, "PREPARE%");
statement.execute("SELECT pg_sleep(0.5)");

String queryId = getQueryIdFromPgStatStatements(statement, "%pg_sleep%");
Path bundleDataPath = runQueryDiagnostics(statement, queryId, params);

/* Protects from "No query executed;" warning */
statement.execute("SELECT pg_sleep(0.1)");

/* sleep for bundle expiry */
statement.execute("SELECT pg_sleep(" + sleep_time_s + ")");
Path ashPath = bundleDataPath.resolve("active_session_history.csv");

Expand Down Expand Up @@ -462,7 +474,7 @@ public void checkAshData() throws Exception {

@Test
public void checkPgssData() throws Exception {
int diagnosticsInterval = 10;
int diagnosticsInterval = (5 * ASH_SAMPLING_INTERVAL_MS) / 1000; /* convert to seconds */
QueryDiagnosticsParams queryDiagnosticsParams = new QueryDiagnosticsParams(
diagnosticsInterval,
100 /* explainSampleRate */,
Expand All @@ -471,6 +483,9 @@ public void checkPgssData() throws Exception {
false /* explainDebug */,
0 /* bindVarQueryMinDuration */);

/* sleep time is diagnosticsInterval + 1 sec to ensure that the bundle has expired */
long sleep_time_s = diagnosticsInterval + 1;

try (Statement statement = connection.createStatement()) {
statement.execute("SELECT pg_sleep(0.5)");

Expand All @@ -481,10 +496,8 @@ public void checkPgssData() throws Exception {
statement.execute("SELECT * from pg_class");
statement.execute("SELECT pg_sleep(0.2)");

/*
* Thread sleeps for diagnosticsInterval + 1 sec to ensure that the bundle has expired
*/
Thread.sleep((diagnosticsInterval + 1) * 1000);
/* sleep for bundle expiry */
statement.execute("SELECT pg_sleep(" + sleep_time_s + ")");

Path pgssPath = bundleDataPath.resolve("pg_stat_statements.csv");

Expand Down Expand Up @@ -518,4 +531,150 @@ public void checkPgssData() throws Exception {
Math.abs(Float.parseFloat(tokens[6]) - expectedMeanTime), epsilon);
}
}

private void runBundleWithQueries(Statement statement, String queryId,
QueryDiagnosticsParams queryDiagnosticsParams,
String[] queries, String warning) throws Exception {
/* sleep time is diagnosticsInterval + 1 sec to ensure that the bundle has expired */
long sleep_time_s = queryDiagnosticsParams.diagnosticsInterval + 1;

Path bundleDataPath = runQueryDiagnostics(statement, queryId, queryDiagnosticsParams);

for (String query : queries) {
statement.execute(query);
}

/*
* Sleep for the bundle expiry duration.
* This also prevents from "No data available in ASH for the given time range;" warning.
*/
statement.execute("SELECT pg_sleep(" + sleep_time_s + ")");

/* Select the last executed bundle */
ResultSet resultSet = statement.executeQuery("SELECT * " +
"FROM yb_query_diagnostics_status " +
"ORDER BY start_time DESC");
if (!resultSet.next())
fail("yb_query_diagnostics_status view does not have expected data");

assertQueryDiagnosticsStatus(resultSet,
bundleDataPath /* expectedViewPath */,
"Success" /* expectedStatus */,
warning /* expectedDescription */,
queryDiagnosticsParams);

if (!warning.equals(noQueriesExecutedWarning)) {
Path pgssPath = bundleDataPath.resolve("pg_stat_statements.csv");
assertTrue("pg_stat_statements file does not exist", Files.exists(pgssPath));
assertGreaterThan("pg_stat_statements.csv file is empty",
Files.size(pgssPath) , 0L);

/* Read the pg_stat_statements.csv file */
List<String> pgssData = Files.readAllLines(pgssPath);
String[] tokens = pgssData.get(1).split(",");

/* Ensure that the query string in pg_stat_statements is empty as expected */
assertEquals("pg_stat_statements query is incorrect", "\"\"", tokens[1]);
}
}

@Test
public void testPgssResetBetweenDiagnostics() throws Exception {
int diagnosticsInterval = (5 * ASH_SAMPLING_INTERVAL_MS) / 1000; /* convert to seconds */
QueryDiagnosticsParams queryDiagnosticsParams = new QueryDiagnosticsParams(
diagnosticsInterval,
100 /* explainSampleRate */,
true /* explainAnalyze */,
true /* explainDist */,
false /* explainDebug */,
0 /* bindVarQueryMinDuration */);

try (Statement statement = connection.createStatement()) {
/*
* If pg_stat_statements resets during the bundle creation process,
* the query string in the pg_stat_statements output file will not be available.
* A warning will be included in the description field of the catalog view
* to indicate the same.
*/
String queryId = getQueryIdFromPgStatStatements(statement, "PREPARE%");

/* Test different scenarios of pgss reset */

/* reset */
runBundleWithQueries(statement, queryId, queryDiagnosticsParams, new String[] {
"SELECT pg_stat_statements_reset()",
}, noQueriesExecutedWarning);

/* statement -> reset */
runBundleWithQueries(statement, queryId, queryDiagnosticsParams, new String[] {
"EXECUTE stmt('var1', 1, 1.1)",
"SELECT pg_stat_statements_reset()",
}, pgssResetWarning);

/* reset -> statement */
runBundleWithQueries(statement, queryId, queryDiagnosticsParams, new String[] {
"SELECT pg_stat_statements_reset()",
"EXECUTE stmt('var2', 2, 2.2)"
}, pgssResetWarning);

/*
* statement -> reset -> statement
*
* Note that this also emits pgssResetWarning, although a statement is executed
* after the reset. This is intentional as if we were to implement a check for
* last_time_query_bundled against last_time_reset, it would require a
* GetCurrentTimestamp() call per bundled query, which could be expensive.
*/
runBundleWithQueries(statement, queryId, queryDiagnosticsParams, new String[] {
"EXECUTE stmt('var1', 1, 1.1)",
"SELECT pg_stat_statements_reset()",
"EXECUTE stmt('var2', 2, 2.2)"
}, pgssResetWarning);
}
}

@Test
public void emptyBundle() throws Exception {
int diagnosticsInterval = (5 * ASH_SAMPLING_INTERVAL_MS) / 1000; /* convert to seconds */
QueryDiagnosticsParams params = new QueryDiagnosticsParams(
diagnosticsInterval,
100 /* explainSampleRate */,
true /* explainAnalyze */,
true /* explainDist */,
false /* explainDebug */,
0 /* bindVarQueryMinDuration */);

/* sleep time is diagnosticsInterval + 1 sec to ensure that the bundle has expired */
long sleep_time_s = diagnosticsInterval + 1;

try (Statement statement = connection.createStatement()) {
/* Run query diagnostics on the prepared stmt */
String queryId = getQueryIdFromPgStatStatements(statement, "PREPARE%");
Path bundleDataPath = runQueryDiagnostics(statement, queryId, params);

/* sleep for bundle expiry */
statement.execute("SELECT pg_sleep(" + sleep_time_s + ")");

/* Check that the bundle is empty */
try (Stream<Path> files = Files.list(bundleDataPath)) {
if (files.findAny().isPresent()) {
fail("The bundle directory is not empty, even though no queries were fired");
}
} catch (IOException e) {
fail("Failed to list files in the bundle directory: " + e.getMessage());
}

/* Check that the bundle is empty in the view */
ResultSet resultSet = statement.executeQuery(
"SELECT * FROM yb_query_diagnostics_status");
if (!resultSet.next())
fail("yb_query_diagnostics_status view does not have expected data");

assertQueryDiagnosticsStatus(resultSet,
bundleDataPath /* expectedViewPath */,
"Success" /* expectedStatus */,
noQueriesExecutedWarning /* expectedDescription */,
params);
}
}
}
8 changes: 5 additions & 3 deletions src/postgres/contrib/pg_stat_statements/pg_stat_statements.c
Original file line number Diff line number Diff line change
Expand Up @@ -1757,6 +1757,9 @@ pgss_store(const char *query, uint64 queryId,
Datum
pg_stat_statements_reset(PG_FUNCTION_ARGS)
{
if (YBIsQueryDiagnosticsEnabled())
*yb_pgss_last_reset_time = GetCurrentTimestamp();

if (!pgss || !pgss_hash)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
Expand Down Expand Up @@ -3820,7 +3823,6 @@ yb_track_nested_queries(void)
return pgss_track == PGSS_TRACK_ALL;
}


static void
YbGetPgssNormalizedQueryText(Size query_offset, int query_len, char *normalized_query)
{
Expand All @@ -3830,7 +3832,7 @@ YbGetPgssNormalizedQueryText(Size query_offset, int query_len, char *normalized_
qbuffer = qtext_load_file(&qbuffer_size);
memcpy(normalized_query, qtext_fetch(query_offset, query_len,
qbuffer, qbuffer_size), query_len);
normalized_query[query_len - 1] = '\0'; /* Ensure null-termination */
normalized_query[query_len] = '\0'; /* Ensure null-termination */

free(qbuffer);
}
}
8 changes: 3 additions & 5 deletions src/postgres/src/backend/utils/misc/yb_ash.c
Original file line number Diff line number Diff line change
Expand Up @@ -1244,11 +1244,9 @@ GetAshRangeIndexes(TimestampTz start_time, TimestampTz end_time, int64 query_id,
/* Time range is not there in the buffer */
if (start_time > buffer_max_time || end_time < buffer_min_time)
{
const char *message = (end_time < buffer_min_time) ?
"ASH circular buffer has wrapped around, " \
"Unable to fetch ASH data" :
"No data available in ASH for the given time range";
snprintf(description, YB_QD_DESCRIPTION_LEN, "%s; ", message);
AppendToDescription(description, (end_time < buffer_min_time) ?
"ASH circular buffer has wrapped around, unable to fetch ASH data;" :
"No data available in ASH for the given time range;");
return;
}

Expand Down
Loading

0 comments on commit 008f885

Please sign in to comment.