Skip to content

Commit

Permalink
[#21751] YSQL, ASH: Sampling of wait events
Browse files Browse the repository at this point in the history
Summary:
This diff implements sampling of wait events based on the reservoir
sampling algorithm (https://en.wikipedia.org/wiki/Reservoir_sampling)

We take random `ysql_yb_ash_sample_size` number of events from
each component - YSQL, YCQL and TServer.

When inserting YSQL samples into the circular buffer, we don't know the
the actual number of valid samples, so we cannot calculate the sample
weight. So, after inserting all the samples in the circular buffer, we go back
and update the sample weight of all the recently inserted samples.

The fields flush_and_compaction_wait_states and raft_log_appender_wait_states
in PgActiveSessionHistoryResponsePB are deprecated and the background wait
states are now part of tserver_wait_states

The datatype of sample_weight in ybc_pg_typedefs.h is updated to float which is
used is all other places.

Upgrade/Rollback safety: The updated protobuf is only for pg to local tserver
communication.
Jira: DB-10625

Test Plan: ./yb_build.sh --java-test TestYbAsh#testSampleSize

Reviewers: jason, amitanand

Reviewed By: jason, amitanand

Subscribers: yql, hbhanawat, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D35804
  • Loading branch information
abhinab-yb committed Jul 2, 2024
1 parent 6551e45 commit bafa1cb
Show file tree
Hide file tree
Showing 15 changed files with 202 additions and 69 deletions.
53 changes: 52 additions & 1 deletion java/yb-pgsql/src/test/java/org/yb/pgsql/TestYbAsh.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@
import java.sql.ResultSet;
import java.sql.Statement;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -280,4 +285,50 @@ public void testCatalogRequests() throws Exception {
assertGreaterThan(res1, 0);
}
}

/**
* Test that we don't capture more than 'ysql_yb_ash_sample_size' number of samples
*/
@Test
public void testSampleSize() throws Exception {
final int sample_size = 3;
setAshConfigAndRestartCluster(ASH_SAMPLING_INTERVAL, sample_size);
try (Statement statement = connection.createStatement()) {
statement.execute("CREATE TABLE test_table(k INT, v TEXT)");
}
final int NUM_THREADS = 5;
final int NUM_INSERTS_PER_THREAD = 100;
ExecutorService ecs = Executors.newFixedThreadPool(NUM_THREADS);
List<Future<?>> futures = new ArrayList<>();
for (int i = 1; i <= NUM_THREADS; ++i) {
final int threadIndex = i;
Future<?> future = ecs.submit(() -> {
try (Statement statement = connection.createStatement()) {
for (int j = 0; j < NUM_INSERTS_PER_THREAD; ++j) {
statement.execute(String.format("INSERT INTO test_table VALUES(%d, 'v-%d')",
threadIndex, j));
}
} catch (Exception e) {
fail(e.getMessage());
}
});
futures.add(future);
}
for (Future<?> future : futures) {
future.get();
}
ecs.shutdown();
ecs.awaitTermination(30, TimeUnit.SECONDS);
try (Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SELECT sample_time, wait_event_component, " +
"count(*) FROM " + ASH_VIEW + " GROUP BY sample_time, wait_event_component");
while (rs.next()) {
assertLessThanOrEqualTo(rs.getLong("count"), Long.valueOf(sample_size));
}
rs = statement.executeQuery("SELECT sample_weight FROM " + ASH_VIEW);
while (rs.next()) {
assertGreaterThanOrEqualTo(rs.getDouble("sample_weight"), Double.valueOf(1.0));
}
}
}
}
13 changes: 5 additions & 8 deletions src/postgres/src/backend/storage/ipc/procarray.c
Original file line number Diff line number Diff line change
Expand Up @@ -4140,16 +4140,12 @@ void
YbStorePgAshSamples(TimestampTz sample_time)
{
int i;
int samples_stored = 0;
int samples_considered = 0;

ProcArrayStruct *arrayP = procArray;

LWLockAcquire(ProcArrayLock, LW_SHARED);

/*
* TODO: Add sampling logic to take random samples instead of
* the first 'N'.
*/
for (i = 0; i < arrayP->numProcs; ++i)
{
int pgprocno = arrayP->pgprocnos[i];
Expand All @@ -4172,10 +4168,11 @@ YbStorePgAshSamples(TimestampTz sample_time)
YbAshShouldIgnoreWaitEvent(proc->wait_event_info))
continue;

if (YbAshStoreSample(proc, arrayP->numProcs, sample_time,
&samples_stored) == 0)
break;
YbAshMaybeIncludeSample(proc, arrayP->numProcs, sample_time,
&samples_considered);
}

LWLockRelease(ProcArrayLock);

YbAshFillSampleWeight(samples_considered);
}
89 changes: 67 additions & 22 deletions src/postgres/src/backend/utils/misc/yb_ash.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ static void yb_ash_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
char *completionTag);

static const unsigned char *get_yql_endpoint_tserver_uuid();
static void copy_pgproc_sample_fields(PGPROC *proc);
static void copy_non_pgproc_sample_fields(float8 sample_weight, TimestampTz sample_time);
static void YbAshMaybeReplaceSample(PGPROC *proc, int num_procs, TimestampTz sample_time,
int samples_considered);
static void copy_pgproc_sample_fields(PGPROC *proc, int index);
static void copy_non_pgproc_sample_fields(TimestampTz sample_time, int index);
static void YbAshIncrementCircularBufferIndex(void);
static YBCAshSample *YbAshGetNextCircularBufferSlot(void);

Expand Down Expand Up @@ -668,34 +670,52 @@ YbAshIncrementCircularBufferIndex(void)
yb_ash->index = 0;
}

/*
* Returns true if another sample should be stored in the circular buffer.
*/
bool
YbAshStoreSample(PGPROC *proc, int num_procs, TimestampTz sample_time,
int *samples_stored)
static void
YbAshMaybeReplaceSample(PGPROC *proc, int num_procs, TimestampTz sample_time,
int samples_considered)
{
int random_index;
int replace_index;

random_index = YBCGetRandomUniformInt(1, samples_considered);

if (random_index > yb_ash_sample_size)
return;

/*
* If there are less samples available than the sample size, the sample
* weight must be 1.
* -1 because yb_ash->index points to where the next sample should
* be stored.
*/
float8 sample_weight = Max(num_procs, yb_ash_sample_size) * 1.0 / yb_ash_sample_size;
replace_index = yb_ash->index - (yb_ash_sample_size - random_index) - 1;

copy_pgproc_sample_fields(proc);
copy_non_pgproc_sample_fields(sample_weight, sample_time);
if (replace_index < 0)
replace_index += yb_ash->max_entries;

YbAshIncrementCircularBufferIndex();
YbAshStoreSample(proc, num_procs, sample_time, replace_index);
}

if (++(*samples_stored) == yb_ash_sample_size)
return false;
void
YbAshMaybeIncludeSample(PGPROC *proc, int num_procs, TimestampTz sample_time,
int *samples_considered)
{
if (++(*samples_considered) <= yb_ash_sample_size)
YbAshStoreSample(proc, num_procs, sample_time, yb_ash->index);
else
YbAshMaybeReplaceSample(proc, num_procs, sample_time, *samples_considered);
}

return true;
void
YbAshStoreSample(PGPROC *proc, int num_procs, TimestampTz sample_time, int index)
{
copy_pgproc_sample_fields(proc, index);
copy_non_pgproc_sample_fields(sample_time, index);
YbAshIncrementCircularBufferIndex();
}

static void
copy_pgproc_sample_fields(PGPROC *proc)
copy_pgproc_sample_fields(PGPROC *proc, int index)
{
YBCAshSample *cb_sample = &yb_ash->circular_buffer[yb_ash->index];
YBCAshSample *cb_sample = &yb_ash->circular_buffer[index];

LWLockAcquire(&proc->yb_ash_metadata_lock, LW_SHARED);
memcpy(&cb_sample->metadata, &proc->yb_ash_metadata, sizeof(YBCAshMetadata));
Expand All @@ -704,10 +724,11 @@ copy_pgproc_sample_fields(PGPROC *proc)
cb_sample->encoded_wait_event_code = proc->wait_event_info;
}

/* We don't fill the sample weight here. Check YbAshFillSampleWeight */
static void
copy_non_pgproc_sample_fields(float8 sample_weight, TimestampTz sample_time)
copy_non_pgproc_sample_fields(TimestampTz sample_time, int index)
{
YBCAshSample *cb_sample = &yb_ash->circular_buffer[yb_ash->index];
YBCAshSample *cb_sample = &yb_ash->circular_buffer[index];

/* yql_endpoint_tserver_uuid is constant for all PG samples */
if (get_yql_endpoint_tserver_uuid())
Expand All @@ -719,10 +740,34 @@ copy_non_pgproc_sample_fields(float8 sample_weight, TimestampTz sample_time)
cb_sample->rpc_request_id = 0;
/* TODO(asaha): Add aux info to circular buffer once it's available */
cb_sample->aux_info[0] = '\0';
cb_sample->sample_weight = sample_weight;
cb_sample->sample_time = sample_time;
}

/*
* While inserting samples into the circular buffer, we don't know the actual
* number of samples considered. So after inserting all the samples, we go back
* and update the sample weight
*/
void
YbAshFillSampleWeight(int samples_considered)
{
int samples_inserted;
float sample_weight;
int index;

samples_inserted = Min(samples_considered, yb_ash_sample_size);
sample_weight = Max(samples_considered, yb_ash_sample_size) * 1.0 / yb_ash_sample_size;
index = yb_ash->index - 1;

while (samples_inserted--)
{
if (index < 0)
index += yb_ash->max_entries;

yb_ash->circular_buffer[index--].sample_weight = sample_weight;
}
}

/*
* Returns a pointer to the circular buffer slot where the sample should be
* inserted and increments the index.
Expand Down
8 changes: 6 additions & 2 deletions src/postgres/src/include/yb_ash.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@ extern void YbAshSetSessionId(uint64 session_id);
extern void YbAshSetDatabaseId(Oid database_id);
extern bool YbAshShouldIgnoreWaitEvent(uint32 wait_event_info);

extern bool YbAshStoreSample(PGPROC *proc, int num_procs,
extern void YbAshMaybeIncludeSample(PGPROC *proc, int num_procs,
TimestampTz sample_time,
int *samples_considered);
extern void YbAshStoreSample(PGPROC *proc, int num_procs,
TimestampTz sample_time,
int *samples_stored);
int index);
extern void YbAshFillSampleWeight(int samples_considered);

extern bool yb_enable_ash_check_hook(bool *newval,
void **extra,
Expand Down
10 changes: 10 additions & 0 deletions src/yb/ash/wait_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ DEFINE_RUNTIME_PG_PREVIEW_FLAG(bool, yb_enable_ash, false,
"and various background activities. This does nothing if "
"ysql_yb_enable_ash_infra is disabled.");

DEFINE_NON_RUNTIME_PG_FLAG(int32, yb_ash_circular_buffer_size, 16 * 1024,
"Size (in KiBs) of ASH circular buffer that stores the samples");

DEFINE_RUNTIME_PG_FLAG(int32, yb_ash_sampling_interval_ms, 1000,
"Time (in milliseconds) between two consecutive sampling events");
DEPRECATE_FLAG(int32, ysql_yb_ash_sampling_interval, "2024_03");

DEFINE_RUNTIME_PG_FLAG(int32, yb_ash_sample_size, 500,
"Number of samples captured from each component per sampling event");

DEFINE_test_flag(bool, export_wait_state_names, yb::kIsDebug,
"Exports wait-state name as a human understandable string.");
DEFINE_test_flag(bool, trace_ash_wait_code_updates, yb::kIsDebug,
Expand Down
5 changes: 3 additions & 2 deletions src/yb/integration-tests/wait_states-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ using namespace std::literals;

DECLARE_bool(ysql_yb_ash_enable_infra);
DECLARE_bool(ysql_yb_enable_ash);
DECLARE_int32(ysql_yb_ash_sample_size);

DECLARE_bool(allow_index_table_read_write);
DECLARE_int32(client_read_write_timeout_ms);
Expand Down Expand Up @@ -329,8 +330,7 @@ void WaitStateTestCheckMethodCounts::UpdateCounts(
std::lock_guard lock(mutex_);
VLOG(1) << "Received " << resp.ShortDebugString();
for (auto& container :
{resp.tserver_wait_states(), resp.cql_wait_states(), resp.flush_and_compaction_wait_states(),
resp.raft_log_appender_wait_states()}) {
{resp.tserver_wait_states(), resp.cql_wait_states()}) {
for (auto& entry : container.wait_states()) {
VLOG(2) << "Entry " << ++idx << " : " << yb::ToString(entry);
const auto& method =
Expand Down Expand Up @@ -363,6 +363,7 @@ void WaitStateTestCheckMethodCounts::DoAshCalls(std::atomic<bool>& stop) {
req.set_fetch_flush_and_compaction_states(true);
req.set_fetch_raft_log_appender_states(true);
req.set_fetch_cql_states(true);
req.set_sample_size(FLAGS_ysql_yb_ash_sample_size);
tserver::PgActiveSessionHistoryResponsePB resp;
rpc::RpcController controller;
while (!stop) {
Expand Down
8 changes: 6 additions & 2 deletions src/yb/tserver/pg_client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ message PgCancelTransactionResponsePB {
}

message PgActiveSessionHistoryRequestPB {
uint32 sample_size = 7;
bool fetch_tserver_states = 1;
bool fetch_flush_and_compaction_states = 2;
bool fetch_cql_states = 3;
Expand All @@ -876,13 +877,16 @@ message PgActiveSessionHistoryRequestPB {
message WaitStatesPB {
repeated WaitStateInfoPB wait_states = 1;
uint32 component = 2;
float sample_weight = 3;
}

message PgActiveSessionHistoryResponsePB {
AppStatusPB status = 1;
WaitStatesPB tserver_wait_states = 2;
WaitStatesPB flush_and_compaction_wait_states = 3;
WaitStatesPB raft_log_appender_wait_states = 4;
// flush_and_compaction_wait_states and raft_log_appender_wait_states
// are included in tserver_wait_states
WaitStatesPB DEPRECATED_flush_and_compaction_wait_states = 3;
WaitStatesPB DEPRECATED_raft_log_appender_wait_states = 4;
WaitStatesPB cql_wait_states = 5;
}

Expand Down
Loading

0 comments on commit bafa1cb

Please sign in to comment.