From bafa1cb209930de705e4b3f1e7f1ea790449d584 Mon Sep 17 00:00:00 2001 From: Abhinab Saha Date: Thu, 13 Jun 2024 11:17:53 +0530 Subject: [PATCH] [#21751] YSQL, ASH: Sampling of wait events Summary: This diff implements sampling of wait events based on the reservoir sampling algorithm (https://en.wikipedia.org/wiki/Reservoir_sampling) We take random `ysql_yb_ash_sample_size` number of events from each component - YSQL, YCQL and TServer. When inserting YSQL samples into the circular buffer, we don't know the the actual number of valid samples, so we cannot calculate the sample weight. So, after inserting all the samples in the circular buffer, we go back and update the sample weight of all the recently inserted samples. The fields flush_and_compaction_wait_states and raft_log_appender_wait_states in PgActiveSessionHistoryResponsePB are deprecated and the background wait states are now part of tserver_wait_states The datatype of sample_weight in ybc_pg_typedefs.h is updated to float which is used is all other places. Upgrade/Rollback safety: The updated protobuf is only for pg to local tserver communication. Jira: DB-10625 Test Plan: ./yb_build.sh --java-test TestYbAsh#testSampleSize Reviewers: jason, amitanand Reviewed By: jason, amitanand Subscribers: yql, hbhanawat, ybase Differential Revision: https://phorge.dev.yugabyte.com/D35804 --- .../src/test/java/org/yb/pgsql/TestYbAsh.java | 53 ++++++++++- .../src/backend/storage/ipc/procarray.c | 13 ++- src/postgres/src/backend/utils/misc/yb_ash.c | 89 ++++++++++++++----- src/postgres/src/include/yb_ash.h | 8 +- src/yb/ash/wait_state.cc | 10 +++ src/yb/integration-tests/wait_states-itest.cc | 5 +- src/yb/tserver/pg_client.proto | 8 +- src/yb/tserver/pg_client_service.cc | 50 ++++++++--- src/yb/yql/pggate/pg_client.cc | 2 + src/yb/yql/pggate/util/ybc_util.cc | 5 ++ src/yb/yql/pggate/util/ybc_util.h | 1 + src/yb/yql/pggate/ybc_pg_typedefs.h | 2 +- src/yb/yql/pggate/ybc_pggate.cc | 9 +- src/yb/yql/pgwrapper/pg_mini-test.cc | 6 +- src/yb/yql/pgwrapper/pg_wrapper.cc | 10 --- 15 files changed, 202 insertions(+), 69 deletions(-) diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYbAsh.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYbAsh.java index a0e6f5f7fea0..0647cd3fb51c 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYbAsh.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestYbAsh.java @@ -19,9 +19,14 @@ import java.sql.ResultSet; import java.sql.Statement; +import java.util.ArrayList; import java.util.Collections; -import java.util.concurrent.TimeUnit; +import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; @@ -280,4 +285,50 @@ public void testCatalogRequests() throws Exception { assertGreaterThan(res1, 0); } } + + /** + * Test that we don't capture more than 'ysql_yb_ash_sample_size' number of samples + */ + @Test + public void testSampleSize() throws Exception { + final int sample_size = 3; + setAshConfigAndRestartCluster(ASH_SAMPLING_INTERVAL, sample_size); + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE TABLE test_table(k INT, v TEXT)"); + } + final int NUM_THREADS = 5; + final int NUM_INSERTS_PER_THREAD = 100; + ExecutorService ecs = Executors.newFixedThreadPool(NUM_THREADS); + List> futures = new ArrayList<>(); + for (int i = 1; i <= NUM_THREADS; ++i) { + final int threadIndex = i; + Future future = ecs.submit(() -> { + try (Statement statement = connection.createStatement()) { + for (int j = 0; j < NUM_INSERTS_PER_THREAD; ++j) { + statement.execute(String.format("INSERT INTO test_table VALUES(%d, 'v-%d')", + threadIndex, j)); + } + } catch (Exception e) { + fail(e.getMessage()); + } + }); + futures.add(future); + } + for (Future future : futures) { + future.get(); + } + ecs.shutdown(); + ecs.awaitTermination(30, TimeUnit.SECONDS); + try (Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery("SELECT sample_time, wait_event_component, " + + "count(*) FROM " + ASH_VIEW + " GROUP BY sample_time, wait_event_component"); + while (rs.next()) { + assertLessThanOrEqualTo(rs.getLong("count"), Long.valueOf(sample_size)); + } + rs = statement.executeQuery("SELECT sample_weight FROM " + ASH_VIEW); + while (rs.next()) { + assertGreaterThanOrEqualTo(rs.getDouble("sample_weight"), Double.valueOf(1.0)); + } + } + } } diff --git a/src/postgres/src/backend/storage/ipc/procarray.c b/src/postgres/src/backend/storage/ipc/procarray.c index ea6214193c89..add7411c5b5f 100644 --- a/src/postgres/src/backend/storage/ipc/procarray.c +++ b/src/postgres/src/backend/storage/ipc/procarray.c @@ -4140,16 +4140,12 @@ void YbStorePgAshSamples(TimestampTz sample_time) { int i; - int samples_stored = 0; + int samples_considered = 0; ProcArrayStruct *arrayP = procArray; LWLockAcquire(ProcArrayLock, LW_SHARED); - /* - * TODO: Add sampling logic to take random samples instead of - * the first 'N'. - */ for (i = 0; i < arrayP->numProcs; ++i) { int pgprocno = arrayP->pgprocnos[i]; @@ -4172,10 +4168,11 @@ YbStorePgAshSamples(TimestampTz sample_time) YbAshShouldIgnoreWaitEvent(proc->wait_event_info)) continue; - if (YbAshStoreSample(proc, arrayP->numProcs, sample_time, - &samples_stored) == 0) - break; + YbAshMaybeIncludeSample(proc, arrayP->numProcs, sample_time, + &samples_considered); } LWLockRelease(ProcArrayLock); + + YbAshFillSampleWeight(samples_considered); } diff --git a/src/postgres/src/backend/utils/misc/yb_ash.c b/src/postgres/src/backend/utils/misc/yb_ash.c index c3b633714969..5e64580a29fc 100644 --- a/src/postgres/src/backend/utils/misc/yb_ash.c +++ b/src/postgres/src/backend/utils/misc/yb_ash.c @@ -123,8 +123,10 @@ static void yb_ash_ProcessUtility(PlannedStmt *pstmt, const char *queryString, char *completionTag); static const unsigned char *get_yql_endpoint_tserver_uuid(); -static void copy_pgproc_sample_fields(PGPROC *proc); -static void copy_non_pgproc_sample_fields(float8 sample_weight, TimestampTz sample_time); +static void YbAshMaybeReplaceSample(PGPROC *proc, int num_procs, TimestampTz sample_time, + int samples_considered); +static void copy_pgproc_sample_fields(PGPROC *proc, int index); +static void copy_non_pgproc_sample_fields(TimestampTz sample_time, int index); static void YbAshIncrementCircularBufferIndex(void); static YBCAshSample *YbAshGetNextCircularBufferSlot(void); @@ -668,34 +670,52 @@ YbAshIncrementCircularBufferIndex(void) yb_ash->index = 0; } -/* - * Returns true if another sample should be stored in the circular buffer. - */ -bool -YbAshStoreSample(PGPROC *proc, int num_procs, TimestampTz sample_time, - int *samples_stored) +static void +YbAshMaybeReplaceSample(PGPROC *proc, int num_procs, TimestampTz sample_time, + int samples_considered) { + int random_index; + int replace_index; + + random_index = YBCGetRandomUniformInt(1, samples_considered); + + if (random_index > yb_ash_sample_size) + return; + /* - * If there are less samples available than the sample size, the sample - * weight must be 1. + * -1 because yb_ash->index points to where the next sample should + * be stored. */ - float8 sample_weight = Max(num_procs, yb_ash_sample_size) * 1.0 / yb_ash_sample_size; + replace_index = yb_ash->index - (yb_ash_sample_size - random_index) - 1; - copy_pgproc_sample_fields(proc); - copy_non_pgproc_sample_fields(sample_weight, sample_time); + if (replace_index < 0) + replace_index += yb_ash->max_entries; - YbAshIncrementCircularBufferIndex(); + YbAshStoreSample(proc, num_procs, sample_time, replace_index); +} - if (++(*samples_stored) == yb_ash_sample_size) - return false; +void +YbAshMaybeIncludeSample(PGPROC *proc, int num_procs, TimestampTz sample_time, + int *samples_considered) +{ + if (++(*samples_considered) <= yb_ash_sample_size) + YbAshStoreSample(proc, num_procs, sample_time, yb_ash->index); + else + YbAshMaybeReplaceSample(proc, num_procs, sample_time, *samples_considered); +} - return true; +void +YbAshStoreSample(PGPROC *proc, int num_procs, TimestampTz sample_time, int index) +{ + copy_pgproc_sample_fields(proc, index); + copy_non_pgproc_sample_fields(sample_time, index); + YbAshIncrementCircularBufferIndex(); } static void -copy_pgproc_sample_fields(PGPROC *proc) +copy_pgproc_sample_fields(PGPROC *proc, int index) { - YBCAshSample *cb_sample = &yb_ash->circular_buffer[yb_ash->index]; + YBCAshSample *cb_sample = &yb_ash->circular_buffer[index]; LWLockAcquire(&proc->yb_ash_metadata_lock, LW_SHARED); memcpy(&cb_sample->metadata, &proc->yb_ash_metadata, sizeof(YBCAshMetadata)); @@ -704,10 +724,11 @@ copy_pgproc_sample_fields(PGPROC *proc) cb_sample->encoded_wait_event_code = proc->wait_event_info; } +/* We don't fill the sample weight here. Check YbAshFillSampleWeight */ static void -copy_non_pgproc_sample_fields(float8 sample_weight, TimestampTz sample_time) +copy_non_pgproc_sample_fields(TimestampTz sample_time, int index) { - YBCAshSample *cb_sample = &yb_ash->circular_buffer[yb_ash->index]; + YBCAshSample *cb_sample = &yb_ash->circular_buffer[index]; /* yql_endpoint_tserver_uuid is constant for all PG samples */ if (get_yql_endpoint_tserver_uuid()) @@ -719,10 +740,34 @@ copy_non_pgproc_sample_fields(float8 sample_weight, TimestampTz sample_time) cb_sample->rpc_request_id = 0; /* TODO(asaha): Add aux info to circular buffer once it's available */ cb_sample->aux_info[0] = '\0'; - cb_sample->sample_weight = sample_weight; cb_sample->sample_time = sample_time; } +/* + * While inserting samples into the circular buffer, we don't know the actual + * number of samples considered. So after inserting all the samples, we go back + * and update the sample weight + */ +void +YbAshFillSampleWeight(int samples_considered) +{ + int samples_inserted; + float sample_weight; + int index; + + samples_inserted = Min(samples_considered, yb_ash_sample_size); + sample_weight = Max(samples_considered, yb_ash_sample_size) * 1.0 / yb_ash_sample_size; + index = yb_ash->index - 1; + + while (samples_inserted--) + { + if (index < 0) + index += yb_ash->max_entries; + + yb_ash->circular_buffer[index--].sample_weight = sample_weight; + } +} + /* * Returns a pointer to the circular buffer slot where the sample should be * inserted and increments the index. diff --git a/src/postgres/src/include/yb_ash.h b/src/postgres/src/include/yb_ash.h index 2240d6675b0b..3bda5ddcf4d8 100644 --- a/src/postgres/src/include/yb_ash.h +++ b/src/postgres/src/include/yb_ash.h @@ -54,9 +54,13 @@ extern void YbAshSetSessionId(uint64 session_id); extern void YbAshSetDatabaseId(Oid database_id); extern bool YbAshShouldIgnoreWaitEvent(uint32 wait_event_info); -extern bool YbAshStoreSample(PGPROC *proc, int num_procs, +extern void YbAshMaybeIncludeSample(PGPROC *proc, int num_procs, + TimestampTz sample_time, + int *samples_considered); +extern void YbAshStoreSample(PGPROC *proc, int num_procs, TimestampTz sample_time, - int *samples_stored); + int index); +extern void YbAshFillSampleWeight(int samples_considered); extern bool yb_enable_ash_check_hook(bool *newval, void **extra, diff --git a/src/yb/ash/wait_state.cc b/src/yb/ash/wait_state.cc index 2b2cd56734fe..9a493c91ff31 100644 --- a/src/yb/ash/wait_state.cc +++ b/src/yb/ash/wait_state.cc @@ -53,6 +53,16 @@ DEFINE_RUNTIME_PG_PREVIEW_FLAG(bool, yb_enable_ash, false, "and various background activities. This does nothing if " "ysql_yb_enable_ash_infra is disabled."); +DEFINE_NON_RUNTIME_PG_FLAG(int32, yb_ash_circular_buffer_size, 16 * 1024, + "Size (in KiBs) of ASH circular buffer that stores the samples"); + +DEFINE_RUNTIME_PG_FLAG(int32, yb_ash_sampling_interval_ms, 1000, + "Time (in milliseconds) between two consecutive sampling events"); +DEPRECATE_FLAG(int32, ysql_yb_ash_sampling_interval, "2024_03"); + +DEFINE_RUNTIME_PG_FLAG(int32, yb_ash_sample_size, 500, + "Number of samples captured from each component per sampling event"); + DEFINE_test_flag(bool, export_wait_state_names, yb::kIsDebug, "Exports wait-state name as a human understandable string."); DEFINE_test_flag(bool, trace_ash_wait_code_updates, yb::kIsDebug, diff --git a/src/yb/integration-tests/wait_states-itest.cc b/src/yb/integration-tests/wait_states-itest.cc index 91e286d3260b..c86d3a4c1871 100644 --- a/src/yb/integration-tests/wait_states-itest.cc +++ b/src/yb/integration-tests/wait_states-itest.cc @@ -49,6 +49,7 @@ using namespace std::literals; DECLARE_bool(ysql_yb_ash_enable_infra); DECLARE_bool(ysql_yb_enable_ash); +DECLARE_int32(ysql_yb_ash_sample_size); DECLARE_bool(allow_index_table_read_write); DECLARE_int32(client_read_write_timeout_ms); @@ -329,8 +330,7 @@ void WaitStateTestCheckMethodCounts::UpdateCounts( std::lock_guard lock(mutex_); VLOG(1) << "Received " << resp.ShortDebugString(); for (auto& container : - {resp.tserver_wait_states(), resp.cql_wait_states(), resp.flush_and_compaction_wait_states(), - resp.raft_log_appender_wait_states()}) { + {resp.tserver_wait_states(), resp.cql_wait_states()}) { for (auto& entry : container.wait_states()) { VLOG(2) << "Entry " << ++idx << " : " << yb::ToString(entry); const auto& method = @@ -363,6 +363,7 @@ void WaitStateTestCheckMethodCounts::DoAshCalls(std::atomic& stop) { req.set_fetch_flush_and_compaction_states(true); req.set_fetch_raft_log_appender_states(true); req.set_fetch_cql_states(true); + req.set_sample_size(FLAGS_ysql_yb_ash_sample_size); tserver::PgActiveSessionHistoryResponsePB resp; rpc::RpcController controller; while (!stop) { diff --git a/src/yb/tserver/pg_client.proto b/src/yb/tserver/pg_client.proto index da594418cac6..ad04f8f60381 100644 --- a/src/yb/tserver/pg_client.proto +++ b/src/yb/tserver/pg_client.proto @@ -865,6 +865,7 @@ message PgCancelTransactionResponsePB { } message PgActiveSessionHistoryRequestPB { + uint32 sample_size = 7; bool fetch_tserver_states = 1; bool fetch_flush_and_compaction_states = 2; bool fetch_cql_states = 3; @@ -876,13 +877,16 @@ message PgActiveSessionHistoryRequestPB { message WaitStatesPB { repeated WaitStateInfoPB wait_states = 1; uint32 component = 2; + float sample_weight = 3; } message PgActiveSessionHistoryResponsePB { AppStatusPB status = 1; WaitStatesPB tserver_wait_states = 2; - WaitStatesPB flush_and_compaction_wait_states = 3; - WaitStatesPB raft_log_appender_wait_states = 4; + // flush_and_compaction_wait_states and raft_log_appender_wait_states + // are included in tserver_wait_states + WaitStatesPB DEPRECATED_flush_and_compaction_wait_states = 3; + WaitStatesPB DEPRECATED_raft_log_appender_wait_states = 4; WaitStatesPB cql_wait_states = 5; } diff --git a/src/yb/tserver/pg_client_service.cc b/src/yb/tserver/pg_client_service.cc index 8c963bfe4aba..ea7874b08693 100644 --- a/src/yb/tserver/pg_client_service.cc +++ b/src/yb/tserver/pg_client_service.cc @@ -1389,22 +1389,34 @@ class PgClientServiceImpl::Impl { call.wait_state().aux_info().method() == "Perform"))); } + void MaybeIncludeSample( + tserver::WaitStatesPB* resp, const WaitStateInfoPB& wait_state_pb, int sample_size, + int& samples_considered) { + if (++samples_considered <= sample_size) { + resp->add_wait_states()->CopyFrom(wait_state_pb); + } else { + int random_index = RandomUniformInt(1, samples_considered); + if (random_index <= sample_size) { + resp->mutable_wait_states(random_index - 1)->CopyFrom(wait_state_pb); + } + } + } + void PopulateWaitStates( const PgActiveSessionHistoryRequestPB& req, const yb::rpc::RpcConnectionPB& conn, - tserver::WaitStatesPB* resp) { + tserver::WaitStatesPB* resp, int sample_size, int& samples_considered) { for (const auto& call : conn.calls_in_flight()) { if (ShouldIgnoreCall(req, call)) { VLOG(3) << "Ignoring " << call.wait_state().DebugString(); continue; } - auto* wait_state = resp->add_wait_states(); - wait_state->CopyFrom(call.wait_state()); + MaybeIncludeSample(resp, call.wait_state(), sample_size, samples_considered); } } void GetRpcsWaitStates( const PgActiveSessionHistoryRequestPB& req, ash::Component component, - tserver::WaitStatesPB* resp) { + tserver::WaitStatesPB* resp, int sample_size, int& samples_considered) { auto* messenger = tablet_server_.GetMessenger(component); if (!messenger) { LOG_WITH_FUNC(ERROR) << "got no messenger for " << yb::ToString(component); @@ -1424,11 +1436,11 @@ class PgClientServiceImpl::Impl { WARN_NOT_OK(messenger->DumpRunningRpcs(dump_req, &dump_resp), "DumpRunningRpcs failed"); for (const auto& conn : dump_resp.inbound_connections()) { - PopulateWaitStates(req, conn, resp); + PopulateWaitStates(req, conn, resp, sample_size, samples_considered); } if (dump_resp.has_local_calls()) { - PopulateWaitStates(req, dump_resp.local_calls(), resp); + PopulateWaitStates(req, dump_resp.local_calls(), resp, sample_size, samples_considered); } VLOG(3) << __PRETTY_FUNCTION__ << " wait-states: " << yb::ToString(resp->wait_states()); @@ -1436,7 +1448,7 @@ class PgClientServiceImpl::Impl { void AddWaitStatesToResponse( const ash::WaitStateTracker& tracker, bool export_wait_state_names, - tserver::WaitStatesPB* resp) { + tserver::WaitStatesPB* resp, int sample_size, int& samples_considered) { Result local_uuid = Uuid::FromHexStringBigEndian(instance_id_); DCHECK_OK(local_uuid); resp->set_component(yb::to_underlying(ash::Component::kTServer)); @@ -1452,7 +1464,7 @@ class PgClientServiceImpl::Impl { if (local_uuid) { local_uuid->ToBytes(wait_state_pb.mutable_metadata()->mutable_yql_endpoint_tserver_uuid()); } - resp->add_wait_states()->CopyFrom(wait_state_pb); + MaybeIncludeSample(resp, wait_state_pb, sample_size, samples_considered); } VLOG(2) << "Tracker call sending " << resp->DebugString(); } @@ -1460,25 +1472,35 @@ class PgClientServiceImpl::Impl { Status ActiveSessionHistory( const PgActiveSessionHistoryRequestPB& req, PgActiveSessionHistoryResponsePB* resp, rpc::RpcContext* context) { + int tserver_samples_considered = 0; + int cql_samples_considered = 0; + int sample_size = req.sample_size(); if (req.fetch_tserver_states()) { - GetRpcsWaitStates(req, ash::Component::kTServer, resp->mutable_tserver_wait_states()); + GetRpcsWaitStates(req, ash::Component::kTServer, resp->mutable_tserver_wait_states(), + sample_size, tserver_samples_considered); AddWaitStatesToResponse( ash::SharedMemoryPgPerformTracker(), req.export_wait_state_code_as_string(), - resp->mutable_tserver_wait_states()); + resp->mutable_tserver_wait_states(), sample_size, tserver_samples_considered); } if (req.fetch_flush_and_compaction_states()) { AddWaitStatesToResponse( ash::FlushAndCompactionWaitStatesTracker(), req.export_wait_state_code_as_string(), - resp->mutable_flush_and_compaction_wait_states()); + resp->mutable_tserver_wait_states(), sample_size, tserver_samples_considered); } if (req.fetch_raft_log_appender_states()) { AddWaitStatesToResponse( ash::RaftLogWaitStatesTracker(), req.export_wait_state_code_as_string(), - resp->mutable_raft_log_appender_wait_states()); + resp->mutable_tserver_wait_states(), sample_size, tserver_samples_considered); } if (req.fetch_cql_states()) { - GetRpcsWaitStates(req, ash::Component::kYCQL, resp->mutable_cql_wait_states()); - } + GetRpcsWaitStates(req, ash::Component::kYCQL, resp->mutable_cql_wait_states(), + sample_size, cql_samples_considered); + } + float tserver_sample_weight = + std::max(tserver_samples_considered, sample_size) * 1.0 / sample_size; + float cql_sample_weight = std::max(cql_samples_considered, sample_size) * 1.0 / sample_size; + resp->mutable_tserver_wait_states()->set_sample_weight(tserver_sample_weight); + resp->mutable_cql_wait_states()->set_sample_weight(cql_sample_weight); return Status::OK(); } diff --git a/src/yb/yql/pggate/pg_client.cc b/src/yb/yql/pggate/pg_client.cc index a47de76a14f4..20f33cd91c4a 100644 --- a/src/yb/yql/pggate/pg_client.cc +++ b/src/yb/yql/pggate/pg_client.cc @@ -65,6 +65,7 @@ DECLARE_bool(TEST_ash_fetch_wait_states_for_raft_log); DECLARE_bool(TEST_ash_fetch_wait_states_for_rocksdb_flush_and_compaction); DECLARE_bool(TEST_export_wait_state_names); DECLARE_bool(ysql_enable_db_catalog_version_mode); +DECLARE_int32(ysql_yb_ash_sample_size); extern int yb_locks_min_txn_age; extern int yb_locks_max_transactions; @@ -1134,6 +1135,7 @@ class PgClient::Impl : public BigDataFetcher { req.set_fetch_cql_states(true); req.set_ignore_ash_and_perform_calls(true); req.set_export_wait_state_code_as_string(FLAGS_TEST_export_wait_state_names); + req.set_sample_size(FLAGS_ysql_yb_ash_sample_size); tserver::PgActiveSessionHistoryResponsePB resp; RETURN_NOT_OK(proxy_->ActiveSessionHistory(req, &resp, PrepareController())); diff --git a/src/yb/yql/pggate/util/ybc_util.cc b/src/yb/yql/pggate/util/ybc_util.cc index 416783f17879..2cc47c05e48e 100644 --- a/src/yb/yql/pggate/util/ybc_util.cc +++ b/src/yb/yql/pggate/util/ybc_util.cc @@ -487,6 +487,11 @@ uint8_t YBCGetQueryIdForCatalogRequests() { return static_cast(ash::FixedQueryId::kQueryIdForCatalogRequests); } +// Get a random integer between a and b +int YBCGetRandomUniformInt(int a, int b) { + return RandomUniformInt(a, b); +} + int YBCGetCallStackFrames(void** result, int max_depth, int skip_count) { return google::GetStackTrace(result, max_depth, skip_count); } diff --git a/src/yb/yql/pggate/util/ybc_util.h b/src/yb/yql/pggate/util/ybc_util.h index 6379a9681524..a26dcc0e683a 100644 --- a/src/yb/yql/pggate/util/ybc_util.h +++ b/src/yb/yql/pggate/util/ybc_util.h @@ -336,6 +336,7 @@ const char* YBCGetWaitEventClass(uint32_t wait_event_info); const char* YBCGetWaitEventComponent(uint32_t wait_event_info); const char* YBCGetWaitEventType(uint32_t wait_event_info); uint8_t YBCGetQueryIdForCatalogRequests(); +int YBCGetRandomUniformInt(int a, int b); int YBCGetCallStackFrames(void** result, int max_depth, int skip_count); diff --git a/src/yb/yql/pggate/ybc_pg_typedefs.h b/src/yb/yql/pggate/ybc_pg_typedefs.h index 2451c0ea1c40..420fec7a68cd 100644 --- a/src/yb/yql/pggate/ybc_pg_typedefs.h +++ b/src/yb/yql/pggate/ybc_pg_typedefs.h @@ -727,7 +727,7 @@ typedef struct AshSample { // If a certain number of samples are available and we capture a portion of // them, the sample weight is the reciprocal of the captured portion or 1, // whichever is maximum. - double sample_weight; + float sample_weight; // Timestamp when the sample was captured. uint64_t sample_time; diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index e374ef08a970..52deda0c60b4 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -348,7 +348,7 @@ void AshCopyAuxInfo( void AshCopyTServerSample( YBCAshSample* cb_sample, uint32_t component, const WaitStateInfoPB& tserver_sample, - uint64_t sample_time) { + uint64_t sample_time, float sample_weight) { auto* cb_metadata = &cb_sample->metadata; const auto& tserver_metadata = tserver_sample.metadata(); @@ -358,7 +358,7 @@ void AshCopyTServerSample( cb_sample->rpc_request_id = tserver_metadata.rpc_request_id(); cb_sample->encoded_wait_event_code = AshEncodeWaitStateCodeWithComponent(component, tserver_sample.wait_state_code()); - cb_sample->sample_weight = 1; // TODO: Change this once sampling is done at tserver side + cb_sample->sample_weight = sample_weight; cb_sample->sample_time = sample_time; std::memcpy(cb_metadata->root_request_id, @@ -381,7 +381,8 @@ void AshCopyTServerSamples( YBCAshGetNextCircularBufferSlot get_cb_slot_fn, const tserver::WaitStatesPB& samples, uint64_t sample_time) { for (const auto& sample : samples.wait_states()) { - AshCopyTServerSample(get_cb_slot_fn(), samples.component(), sample, sample_time); + AshCopyTServerSample(get_cb_slot_fn(), samples.component(), sample, sample_time, + samples.sample_weight()); } } @@ -2325,8 +2326,6 @@ void YBCStoreTServerAshSamples( LOG(ERROR) << result.status(); } else { AshCopyTServerSamples(get_cb_slot_fn, result->tserver_wait_states(), sample_time); - AshCopyTServerSamples(get_cb_slot_fn, result->flush_and_compaction_wait_states(), sample_time); - AshCopyTServerSamples(get_cb_slot_fn, result->raft_log_appender_wait_states(), sample_time); AshCopyTServerSamples(get_cb_slot_fn, result->cql_wait_states(), sample_time); } } diff --git a/src/yb/yql/pgwrapper/pg_mini-test.cc b/src/yb/yql/pgwrapper/pg_mini-test.cc index 22081b8f6692..46f26a5f4745 100644 --- a/src/yb/yql/pgwrapper/pg_mini-test.cc +++ b/src/yb/yql/pgwrapper/pg_mini-test.cc @@ -122,6 +122,7 @@ DECLARE_uint64(pg_client_heartbeat_interval_ms); DECLARE_bool(ysql_yb_ash_enable_infra); DECLARE_bool(ysql_yb_enable_ash); +DECLARE_int32(ysql_yb_ash_sample_size); METRIC_DECLARE_entity(tablet); METRIC_DECLARE_gauge_uint64(aborted_transactions_pending_cleanup); @@ -455,18 +456,19 @@ TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(Ash), PgMiniAshTest) { req.set_fetch_tserver_states(true); req.set_fetch_flush_and_compaction_states(true); req.set_fetch_cql_states(true); + req.set_sample_size(FLAGS_ysql_yb_ash_sample_size); tserver::PgActiveSessionHistoryResponsePB resp; rpc::RpcController controller; std::unordered_map method_counts; int calls_without_aux_info_details = 0; for (int i = 0; i < kNumCalls; ++i) { ASSERT_OK(pg_proxy->ActiveSessionHistory(req, &resp, &controller)); - VLOG(1) << "Call " << i << " got " << yb::ToString(resp); + VLOG(0) << "Call " << i << " got " << yb::ToString(resp); controller.Reset(); SleepFor(10ms); int idx = 0; for (auto& entry : resp.tserver_wait_states().wait_states()) { - VLOG(2) << "Entry " << ++idx << " : " << yb::ToString(entry); + VLOG(0) << "Entry " << ++idx << " : " << yb::ToString(entry); if (entry.has_aux_info() && entry.aux_info().has_method()) { ++method_counts[entry.aux_info().method()]; } else { diff --git a/src/yb/yql/pgwrapper/pg_wrapper.cc b/src/yb/yql/pgwrapper/pg_wrapper.cc index 75b30a12617f..2ff1b33c4afc 100644 --- a/src/yb/yql/pgwrapper/pg_wrapper.cc +++ b/src/yb/yql/pgwrapper/pg_wrapper.cc @@ -305,16 +305,6 @@ DEFINE_validator(ysql_yb_xcluster_consistency_level, &ValidateXclusterConsistenc DEFINE_NON_RUNTIME_string(ysql_conn_mgr_warmup_db, "yugabyte", "Database for which warmup needs to be done."); -DEFINE_NON_RUNTIME_PG_FLAG(int32, yb_ash_circular_buffer_size, 16 * 1024, - "Size (in KiBs) of ASH circular buffer that stores the samples"); - -DEFINE_RUNTIME_PG_FLAG(int32, yb_ash_sampling_interval_ms, 1000, - "Time (in milliseconds) between two consecutive sampling events"); -DEPRECATE_FLAG(int32, ysql_yb_ash_sampling_interval, "2024_03"); - -DEFINE_RUNTIME_PG_FLAG(int32, yb_ash_sample_size, 500, - "Number of samples captured from each component per sampling event"); - DEFINE_NON_RUNTIME_string(ysql_cron_database_name, "yugabyte", "Database in which pg_cron metadata is kept.");