From 17bde242f86aba8dc64b0b5bb52c8c513e0d1cdc Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Mon, 24 Apr 2023 14:19:41 -0700 Subject: [PATCH 1/5] workload: fix performance regresion from accessing query execution mode sync.RWMutex RLock() became a hot path for essentially a read-only value. The execution mode of a pool and query doesn't change after program initialization. Release note: None --- pkg/workload/pgx_helpers.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go index cd99512f97f4..0d883dbaf0d7 100644 --- a/pkg/workload/pgx_helpers.go +++ b/pkg/workload/pgx_helpers.go @@ -36,8 +36,11 @@ type MultiConnPool struct { // preparedStatements is a map from name to SQL. The statements in the map // are prepared whenever a new connection is acquired from the pool. preparedStatements map[string]string - method pgx.QueryExecMode } + + // NOTE(seanc@): method is on the hot-path, therefore make all reads to the + // query exec mode a dirty read. + method pgx.QueryExecMode } // MultiConnPoolCfg encapsulates the knobs passed to NewMultiConnPool. @@ -203,7 +206,7 @@ func NewMultiConnPool( if !ok { return nil, errors.Errorf("unknown method %s", cfg.Method) } - m.mu.method = queryMode + m.method = queryMode for i := range urls { connsPerPool := distributeMax(connsPerURL[i], maxConnsPerPool) @@ -287,9 +290,7 @@ func (m *MultiConnPool) Close() { // Method returns the query execution mode of the connection pool. func (m *MultiConnPool) Method() pgx.QueryExecMode { - m.mu.Lock() - defer m.mu.Unlock() - return m.mu.method + return m.method } // WarmupConns warms up numConns connections across all pools contained within From ee93d72ab145eb7e8697b4dc9edd1f30dca1856e Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 25 Apr 2023 10:51:09 -0700 Subject: [PATCH 2/5] storage: fix setting shared creator ID The `AtLeast` version check was incorrect and we weren't setting the Creator ID. The code is improved to also set the Creator ID when the version is bumped. We are missing unit tests for shared storage but there is currently no way to use a test / in-memory shared store; we will add tests when we address this. Epic: none Release note: None --- pkg/roachpb/version.go | 4 ++-- pkg/roachpb/version_test.go | 8 +++++++- pkg/storage/pebble.go | 15 ++++++++++----- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/roachpb/version.go b/pkg/roachpb/version.go index 782b4ca3523e..6fb5baaf773c 100644 --- a/pkg/roachpb/version.go +++ b/pkg/roachpb/version.go @@ -49,9 +49,9 @@ func (v Version) LessEq(otherV Version) bool { return v.Equal(otherV) || v.Less(otherV) } -// AtLeast returns true if the receiver is greater than or requal to the parameter. +// AtLeast returns true if the receiver is greater than or equal to the parameter. func (v Version) AtLeast(otherV Version) bool { - return !otherV.Less(v) + return !v.Less(otherV) } // String implements the fmt.Stringer interface. diff --git a/pkg/roachpb/version_test.go b/pkg/roachpb/version_test.go index 29328e88b70b..162443cb5971 100644 --- a/pkg/roachpb/version_test.go +++ b/pkg/roachpb/version_test.go @@ -16,7 +16,7 @@ import ( "github.com/kr/pretty" ) -func TestVersionLess(t *testing.T) { +func TestVersionCmp(t *testing.T) { v := func(major, minor, patch, internal int32) Version { return Version{ Major: major, @@ -51,6 +51,12 @@ func TestVersionLess(t *testing.T) { if a, e := test.v1.Less(test.v2), test.less; a != e { t.Errorf("expected %s < %s? %t; got %t", pretty.Sprint(test.v1), pretty.Sprint(test.v2), e, a) } + if a, e := test.v1.Equal(test.v2), test.v1 == test.v2; a != e { + t.Errorf("expected %s = %s? %t; got %t", pretty.Sprint(test.v1), pretty.Sprint(test.v2), e, a) + } + if a, e := test.v1.AtLeast(test.v2), test.v1 == test.v2 || !test.less; a != e { + t.Errorf("expected %s >= %s? %t; got %t", pretty.Sprint(test.v1), pretty.Sprint(test.v2), e, a) + } }) } } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 41ee3d36a79d..e714f8982821 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -843,14 +843,10 @@ func (p *Pebble) SetStoreID(ctx context.Context, storeID int32) error { if p == nil { return nil } - if p.storeIDPebbleLog != nil { - p.storeIDPebbleLog.Set(ctx, storeID) - } + p.storeIDPebbleLog.Set(ctx, storeID) // Note that SetCreatorID only does something if shared storage is configured // in the pebble options. The version gate protects against accidentally // setting the creator ID on an older store. - // TODO(radu): we don't yet have a complete story about how we will transition - // an existing store to use shared storage. if storeID != base.TempStoreID && p.minVersion.AtLeast(clusterversion.ByKey(clusterversion.V23_1SetPebbleCreatorID)) { if err := p.db.SetCreatorID(uint64(storeID)); err != nil { return err @@ -1999,6 +1995,15 @@ func (p *Pebble) SetMinVersion(version roachpb.Version) error { return err } + // Set the shared object creator ID if the version is high enough. See SetStoreID(). + if version.AtLeast(clusterversion.ByKey(clusterversion.V23_1SetPebbleCreatorID)) { + if storeID := p.storeIDPebbleLog.Get(); storeID != 0 && storeID != base.TempStoreID { + if err := p.db.SetCreatorID(uint64(storeID)); err != nil { + return err + } + } + } + // Pebble has a concept of format major versions, similar to cluster // versions. Backwards incompatible changes to Pebble's on-disk // format are gated behind new format major versions. Bumping the From 17128ce119239333feed0432b2ad19ba96f6a51b Mon Sep 17 00:00:00 2001 From: maryliag Date: Thu, 20 Apr 2023 16:15:34 -0400 Subject: [PATCH 3/5] server: use stats activity tables on sql stats endpoint With the new sql stats tables that contain the top 500 rows based on most used column, this PR updates the calls on the sql stats endpoint to use the new flow: ```mermaid flowchart TD; A[Compare the TS on ACTIVITY Table with the Requested TS] --> B{Is the requested time period completely on the table?} B -- Yes --> C[SELECT on ACTIVITY table] C --> D{Had results?} D -- Yes --> E[Return RESULTS] D -- No --> F[SELECT on PERSISTED table] B -- No ----> F F --> G{Had results?} G -- Yes --> E G -- No --> H[SELECT on COMBINED table] H --> E ``` Part Of: #101948 A following PR will deal when selecting a column that is not one of the ones selected to generate the activity tables. Release note (performance improvement): SQL Activity endpoints now use first a table with the top data for the most used cases. If there is no data available, it used the previous flow with persisted data and if that is also empty, uses in-memory. --- pkg/server/combined_statement_stats.go | 551 +++++++++++++----- .../src/statementsTable/statementsTable.tsx | 8 +- 2 files changed, 420 insertions(+), 139 deletions(-) diff --git a/pkg/server/combined_statement_stats.go b/pkg/server/combined_statement_stats.go index b8652baca123..0e394d7162d4 100644 --- a/pkg/server/combined_statement_stats.go +++ b/pkg/server/combined_statement_stats.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -78,6 +79,7 @@ func getCombinedStatementStats( settings *cluster.Settings, testingKnobs *sqlstats.TestingKnobs, ) (*serverpb.StatementsResponse, error) { + var err error showInternal := SQLStatsShowInternal.Get(&settings.SV) whereClause, orderAndLimit, args := getCombinedStatementsQueryClausesAndArgs( req, testingKnobs, showInternal, settings) @@ -87,13 +89,29 @@ func getCombinedStatementStats( if !settings.Version.IsActive(ctx, clusterversion.V23_1AddSQLStatsComputedIndexes) { tableSuffix = "_v22_2" } + // Check if the activity tables contains all the data required for the selected period from the request. + activityHasAllData := false + reqStartTime := getTimeFromSeconds(req.Start) + if settings.Version.IsActive(ctx, clusterversion.V23_1AddSystemActivityTables) { + activityHasAllData, err = activityTablesHaveFullData(ctx, ie, testingKnobs, reqStartTime) + if err != nil { + log.Errorf(ctx, "Error on activityTablesHaveFullData: %s", err) + } + } var statements []serverpb.StatementsResponse_CollectedStatementStatistics var transactions []serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics - var err error if req.FetchMode == nil || req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly { - transactions, err = collectCombinedTransactions(ctx, ie, whereClause, args, orderAndLimit, testingKnobs, tableSuffix) + transactions, err = collectCombinedTransactions( + ctx, + ie, + whereClause, + args, + orderAndLimit, + testingKnobs, + activityHasAllData, + tableSuffix) if err != nil { return nil, serverError(ctx, err) } @@ -102,16 +120,37 @@ func getCombinedStatementStats( if req.FetchMode != nil && req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly { // If we're fetching for txns, the client still expects statement stats for // stmts in the txns response. - statements, err = collectStmtsForTxns(ctx, ie, req, transactions, testingKnobs, tableSuffix) + statements, err = collectStmtsForTxns( + ctx, + ie, + req, + transactions, + testingKnobs, + activityHasAllData, + tableSuffix) } else { - statements, err = collectCombinedStatements(ctx, ie, whereClause, args, orderAndLimit, testingKnobs, tableSuffix) + statements, err = collectCombinedStatements( + ctx, + ie, + whereClause, + args, + orderAndLimit, + testingKnobs, + activityHasAllData, + tableSuffix) } if err != nil { return nil, serverError(ctx, err) } - stmtsRunTime, txnsRunTime, err := getTotalRuntimeSecs(ctx, req, ie, testingKnobs, tableSuffix) + stmtsRunTime, txnsRunTime, err := getTotalRuntimeSecs( + ctx, + req, + ie, + testingKnobs, + activityHasAllData, + tableSuffix) if err != nil { return nil, serverError(ctx, err) @@ -129,11 +168,62 @@ func getCombinedStatementStats( return response, nil } +func activityTablesHaveFullData( + ctx context.Context, + ie *sql.InternalExecutor, + testingKnobs *sqlstats.TestingKnobs, + reqStartTime *time.Time, +) (result bool, err error) { + var auxDate time.Time + dateFormat := "2006-01-02 15:04:05.00" + auxDate, err = time.Parse(dateFormat, timeutil.Now().String()) + + queryWithPlaceholders := ` +SELECT + COALESCE(min(aggregated_ts), '%s') +FROM crdb_internal.statement_activity +%s +` + + it, err := ie.QueryIteratorEx( + ctx, + "activity-min-ts", + nil, + sessiondata.NodeUserSessionDataOverride, + fmt.Sprintf(queryWithPlaceholders, auxDate.Format(dateFormat), testingKnobs.GetAOSTClause())) + + if err != nil { + return false, err + } + ok, err := it.Next(ctx) + if err != nil { + return false, err + } + if !ok { + return false, errors.New("expected one row but got none on activityTablesHaveFullData") + } + + var row tree.Datums + if row = it.Cur(); row == nil { + return false, errors.New("unexpected null row on activityTablesHaveFullData") + } + + defer func() { + err = closeIterator(it, err) + }() + + minAggregatedTs := tree.MustBeDTimestampTZ(row[0]).Time + hasData := !minAggregatedTs.Equal(auxDate) && (reqStartTime.After(minAggregatedTs) || reqStartTime.Equal(minAggregatedTs)) + + return hasData, nil +} + func getTotalRuntimeSecs( ctx context.Context, req *serverpb.CombinedStatementsStatsRequest, ie *sql.InternalExecutor, testingKnobs *sqlstats.TestingKnobs, + activityTableHasAllData bool, tableSuffix string, ) (stmtsRuntime float32, txnsRuntime float32, err error) { var buffer strings.Builder @@ -164,7 +254,7 @@ COALESCE( (statistics -> 'statistics' ->> 'cnt')::FLOAT ) , 0) -FROM crdb_internal.%s_statistics%s%s +FROM %s %s ` @@ -174,7 +264,7 @@ FROM crdb_internal.%s_statistics%s%s fmt.Sprintf(`%s-total-runtime`, table), nil, sessiondata.NodeUserSessionDataOverride, - fmt.Sprintf(queryWithPlaceholders, table, `_persisted`, tableSuffix, whereClause), + fmt.Sprintf(queryWithPlaceholders, table, whereClause), args...) if err != nil { @@ -193,56 +283,60 @@ FROM crdb_internal.%s_statistics%s%s return 0, errors.New("unexpected null row on getTotalRuntimeSecs") } - // If the total runtime is 0 there were no results from the persisted table, - // so we retrieve the data from the combined view with data in-memory. - if tree.MustBeDFloat(row[0]) == 0 { - err = closeIterator(it, err) - if err != nil { - return 0, err - } - it, err = ie.QueryIteratorEx( - ctx, - fmt.Sprintf(`%s-total-runtime-with-memory`, table), - nil, - sessiondata.NodeUserSessionDataOverride, - fmt.Sprintf(queryWithPlaceholders, table, ``, ``, whereClause), - args...) - - if err != nil { - return 0, err - } - ok, err = it.Next(ctx) - if err != nil { - return 0, err - } - if !ok { - return 0, errors.New("expected one row but got none on getTotalRuntimeSecs") - } - - if row = it.Cur(); row == nil { - return 0, errors.New("unexpected null row on getTotalRuntimeSecs") - } - } - defer func() { err = closeIterator(it, err) }() return float32(tree.MustBeDFloat(row[0])), nil - } + stmtsRuntime = 0 if req.FetchMode == nil || req.FetchMode.StatsType != serverpb.CombinedStatementsStatsRequest_TxnStatsOnly { - stmtsRuntime, err = getRuntime("statement") - if err != nil { - return 0, 0, err + if activityTableHasAllData { + stmtsRuntime, err = getRuntime("crdb_internal.statement_activity") + if err != nil { + return 0, 0, err + } + } + // If there are no results from the activity table, retrieve the data from the persisted table. + if stmtsRuntime == 0 { + stmtsRuntime, err = getRuntime(fmt.Sprintf("crdb_internal.statement_statistics_persisted%s", tableSuffix)) + if err != nil { + return 0, 0, err + } + } + // If there are no results from the persisted table, retrieve the data from the combined view + // with data in-memory. + if stmtsRuntime == 0 { + stmtsRuntime, err = getRuntime("crdb_internal.statement_statistics") + if err != nil { + return 0, 0, err + } } } + txnsRuntime = 0 if req.FetchMode == nil || req.FetchMode.StatsType != serverpb.CombinedStatementsStatsRequest_StmtStatsOnly { - txnsRuntime, err = getRuntime("transaction") - if err != nil { - return 0, 0, err + if activityTableHasAllData { + txnsRuntime, err = getRuntime("crdb_internal.transaction_activity") + if err != nil { + return 0, 0, err + } + } + // If there are no results from the activity table, retrieve the data from the persisted table. + if txnsRuntime == 0 { + txnsRuntime, err = getRuntime(fmt.Sprintf("crdb_internal.transaction_statistics_persisted%s", tableSuffix)) + if err != nil { + return 0, 0, err + } + } + // If there are no results from the persisted table, retrieve the data from the combined view + // with data in-memory. + if txnsRuntime == 0 { + txnsRuntime, err = getRuntime("crdb_internal.transaction_statistics") + if err != nil { + return 0, 0, err + } } } @@ -398,6 +492,7 @@ func collectCombinedStatements( args []interface{}, orderAndLimit string, testingKnobs *sqlstats.TestingKnobs, + activityTableHasAllData bool, tableSuffix string, ) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) { aostClause := testingKnobs.GetAOSTClause() @@ -411,45 +506,69 @@ SELECT max(aggregated_ts) as aggregated_ts, crdb_internal.merge_stats_metadata(array_agg(metadata)) as metadata, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics -FROM %s%s %s +FROM %s %s GROUP BY fingerprint_id, app_name ) %s %s` - query := fmt.Sprintf( - queryFormat, - `crdb_internal.statement_statistics_persisted`, - tableSuffix, - whereClause, - aostClause, - orderAndLimit) - it, err := ie.QueryIteratorEx(ctx, "combined-stmts-by-interval", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) - - if err != nil { - return nil, serverError(ctx, err) - } - + var it isql.Rows + var err error defer func() { err = closeIterator(it, err) }() + if activityTableHasAllData { + it, err = getIterator( + ctx, + ie, + queryFormat, + "crdb_internal.statement_activity", + "combined-stmts-activity-by-interval", + whereClause, + args, + aostClause, + orderAndLimit) + if err != nil { + return nil, serverError(ctx, err) + } + } + + // If there are no results from the activity table, retrieve the data from the persisted table. + if it == nil || !it.HasResults() { + if it != nil { + err = closeIterator(it, err) + } + it, err = getIterator( + ctx, + ie, + queryFormat, + fmt.Sprintf("crdb_internal.statement_statistics_persisted%s", tableSuffix), + "combined-stmts-persisted-by-interval", + whereClause, + args, + aostClause, + orderAndLimit) + if err != nil { + return nil, serverError(ctx, err) + } + } + // If there are no results from the persisted table, retrieve the data from the combined view // with data in-memory. if !it.HasResults() { err = closeIterator(it, err) - - query = fmt.Sprintf( + it, err = getIterator( + ctx, + ie, queryFormat, - `crdb_internal.statement_statistics`, - "", + "crdb_internal.statement_statistics", + "combined-stmts-with-memory-by-interval", whereClause, + args, aostClause, orderAndLimit) - it, err = ie.QueryIteratorEx(ctx, "combined-stmts-by-interval-with-memory", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) if err != nil { return nil, serverError(ctx, err) } @@ -520,6 +639,34 @@ GROUP BY return statements, nil } +func getIterator( + ctx context.Context, + ie *sql.InternalExecutor, + queryFormat string, + table string, + queryInfo string, + whereClause string, + args []interface{}, + aostClause string, + orderAndLimit string, +) (isql.Rows, error) { + + query := fmt.Sprintf( + queryFormat, + table, + whereClause, + aostClause, + orderAndLimit) + + it, err := ie.QueryIteratorEx(ctx, queryInfo, nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return it, serverError(ctx, err) + } + + return it, nil +} + func collectCombinedTransactions( ctx context.Context, ie *sql.InternalExecutor, @@ -527,6 +674,7 @@ func collectCombinedTransactions( args []interface{}, orderAndLimit string, testingKnobs *sqlstats.TestingKnobs, + activityTableHasAllData bool, tableSuffix string, ) ([]serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics, error) { aostClause := testingKnobs.GetAOSTClause() @@ -539,46 +687,68 @@ SELECT fingerprint_id, max(metadata), crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics -FROM %s%s %s +FROM %s %s GROUP BY app_name, fingerprint_id ) %s %s` - query := fmt.Sprintf( - queryFormat, - `crdb_internal.transaction_statistics_persisted`, - tableSuffix, - whereClause, - aostClause, - orderAndLimit) - - it, err := ie.QueryIteratorEx(ctx, "combined-txns-by-interval", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) - - if err != nil { - return nil, serverError(ctx, err) - } - + var it isql.Rows + var err error defer func() { err = closeIterator(it, err) }() + if activityTableHasAllData { + it, err = getIterator( + ctx, + ie, + queryFormat, + "crdb_internal.transaction_activity", + "combined-txns-activity-by-interval", + whereClause, + args, + aostClause, + orderAndLimit) + if err != nil { + return nil, serverError(ctx, err) + } + } + + // If there are no results from the activity table, retrieve the data from the persisted table. + if it == nil || !it.HasResults() { + if it != nil { + err = closeIterator(it, err) + } + it, err = getIterator( + ctx, + ie, + queryFormat, + fmt.Sprintf("crdb_internal.transaction_statistics_persisted%s", tableSuffix), + "combined-txns-persisted-by-interval", + whereClause, + args, + aostClause, + orderAndLimit) + if err != nil { + return nil, serverError(ctx, err) + } + } // If there are no results from the persisted table, retrieve the data from the combined view // with data in-memory. if !it.HasResults() { err = closeIterator(it, err) - - query = fmt.Sprintf( + it, err = getIterator( + ctx, + ie, queryFormat, - `crdb_internal.transaction_statistics`, - "", + "crdb_internal.transaction_statistics", + "combined-txns-with-memory-by-interval", whereClause, + args, aostClause, orderAndLimit) - it, err = ie.QueryIteratorEx(ctx, "combined-txn-by-interval-with-memory", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) if err != nil { return nil, serverError(ctx, err) } @@ -641,6 +811,7 @@ func collectStmtsForTxns( req *serverpb.CombinedStatementsStatsRequest, transactions []serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics, testingKnobs *sqlstats.TestingKnobs, + activityTableHasAllData bool, tableSuffix string, ) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) { @@ -653,7 +824,7 @@ SELECT crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics, app_name -FROM %s%s %s +FROM %s %s GROUP BY fingerprint_id, transaction_fingerprint_id, @@ -661,24 +832,43 @@ GROUP BY ` const expectedNumDatums = 5 + var it isql.Rows + var err error + var query string - query := fmt.Sprintf(queryFormat, "crdb_internal.statement_statistics_persisted", tableSuffix, whereClause) + if activityTableHasAllData { + query = fmt.Sprintf(queryFormat, "crdb_internal.statement_activity", whereClause) + it, err = ie.QueryIteratorEx(ctx, "stmts-activity-for-txn", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return nil, serverError(ctx, err) + } + } - it, err := ie.QueryIteratorEx(ctx, "stmts-for-txn", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) + // If there are no results from the activity table, retrieve the data from the persisted table. + if it == nil || !it.HasResults() { + if it != nil { + err = closeIterator(it, err) + } + query = fmt.Sprintf( + queryFormat, + fmt.Sprintf("crdb_internal.statement_statistics_persisted%s", tableSuffix), + whereClause) + it, err = ie.QueryIteratorEx(ctx, "stmts-persisted-for-txn", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) - if err != nil { - return nil, serverError(ctx, err) + if err != nil { + return nil, serverError(ctx, err) + } } // If there are no results from the persisted table, retrieve the data from the combined view // with data in-memory. if !it.HasResults() { err = closeIterator(it, err) + query = fmt.Sprintf(queryFormat, "crdb_internal.statement_statistics", whereClause) - query = fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics`, "", whereClause) - - it, err = ie.QueryIteratorEx(ctx, "stmts-for-txn-with-memory", nil, + it, err = ie.QueryIteratorEx(ctx, "stmts-with-memory-for-txn", nil, sessiondata.NodeUserSessionDataOverride, query, args...) if err != nil { @@ -787,16 +977,39 @@ func getStatementDetails( if !settings.Version.IsActive(ctx, clusterversion.V23_1AddSQLStatsComputedIndexes) { tableSuffix = "_v22_2" } + // Check if the activity tables have data within the selected period. + activityHasData := false + reqStartTime := getTimeFromSeconds(req.Start) + if settings.Version.IsActive(ctx, clusterversion.V23_1AddSystemActivityTables) { + activityHasData, err = activityTablesHaveFullData(ctx, ie, testingKnobs, reqStartTime) + if err != nil { + return nil, serverError(ctx, err) + } + } - statementTotal, err := getTotalStatementDetails(ctx, ie, whereClause, args, tableSuffix) + statementTotal, err := getTotalStatementDetails(ctx, ie, whereClause, args, activityHasData, tableSuffix) if err != nil { return nil, serverError(ctx, err) } - statementStatisticsPerAggregatedTs, err := getStatementDetailsPerAggregatedTs(ctx, ie, whereClause, args, limit, tableSuffix) + statementStatisticsPerAggregatedTs, err := getStatementDetailsPerAggregatedTs( + ctx, + ie, + whereClause, + args, + limit, + activityHasData, + tableSuffix) if err != nil { return nil, serverError(ctx, err) } - statementStatisticsPerPlanHash, err := getStatementDetailsPerPlanHash(ctx, ie, whereClause, args, limit, tableSuffix) + statementStatisticsPerPlanHash, err := getStatementDetailsPerPlanHash( + ctx, + ie, + whereClause, + args, + limit, + activityHasData, + tableSuffix) if err != nil { return nil, serverError(ctx, err) } @@ -900,6 +1113,7 @@ func getTotalStatementDetails( ie *sql.InternalExecutor, whereClause string, args []interface{}, + activityTableHasAllData bool, tableSuffix string, ) (serverpb.StatementDetailsResponse_CollectedStatementSummary, error) { const expectedNumDatums = 4 @@ -909,22 +1123,39 @@ func getTotalStatementDetails( array_agg(app_name) as app_names, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics, encode(fingerprint_id, 'hex') as fingerprint_id - FROM %s%s %s + FROM %s %s GROUP BY fingerprint_id LIMIT 1` - query := fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics_persisted`, tableSuffix, whereClause) + var row tree.Datums + var err error + var query string - row, err := ie.QueryRowEx(ctx, "combined-stmts-details-total", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) - if err != nil { - return statement, serverError(ctx, err) + if activityTableHasAllData { + query = fmt.Sprintf(queryFormat, "crdb_internal.statement_activity", whereClause) + row, err = ie.QueryRowEx(ctx, "combined-stmts-activity-details-total", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return statement, serverError(ctx, err) + } + } + // If there are no results from the activity table, retrieve the data from the persisted table. + if row == nil || row.Len() == 0 { + query = fmt.Sprintf( + queryFormat, + fmt.Sprintf("crdb_internal.statement_statistics_persisted%s", tableSuffix), + whereClause) + row, err = ie.QueryRowEx(ctx, "combined-stmts-persisted-details-total", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return statement, serverError(ctx, err) + } } // If there are no results from the persisted table, retrieve the data from the combined view // with data in-memory. if row.Len() == 0 { - query = fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics`, "", whereClause) + query = fmt.Sprintf(queryFormat, "crdb_internal.statement_statistics", whereClause) row, err = ie.QueryRowEx(ctx, "combined-stmts-details-total-with-memory", nil, sessiondata.NodeUserSessionDataOverride, query, args...) if err != nil { @@ -981,6 +1212,7 @@ func getStatementDetailsPerAggregatedTs( whereClause string, args []interface{}, limit int64, + activityTableHasAllData bool, tableSuffix string, ) ([]serverpb.StatementDetailsResponse_CollectedStatementGroupedByAggregatedTs, error) { const expectedNumDatums = 3 @@ -988,35 +1220,58 @@ func getStatementDetailsPerAggregatedTs( aggregated_ts, crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics - FROM %s%s %s + FROM %s %s GROUP BY aggregated_ts ORDER BY aggregated_ts ASC LIMIT $%d` - query := fmt.Sprintf( - queryFormat, - `crdb_internal.statement_statistics_persisted`, - tableSuffix, - whereClause, - len(args)+1) + var it isql.Rows + var err error + var query string + defer func() { + err = closeIterator(it, err) + }() args = append(args, limit) - it, err := ie.QueryIteratorEx(ctx, "combined-stmts-details-by-aggregated-timestamp", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) + if activityTableHasAllData { + query = fmt.Sprintf( + queryFormat, + "crdb_internal.statement_activity", + whereClause, + len(args)) - if err != nil { - return nil, serverError(ctx, err) + it, err = ie.QueryIteratorEx(ctx, "combined-stmts-activity-details-by-aggregated-timestamp", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + + if err != nil { + return nil, serverError(ctx, err) + } } - defer func() { - err = closeIterator(it, err) - }() + // If there are no results from the activity table, retrieve the data from the persisted table. + if it == nil || !it.HasResults() { + if it != nil { + err = closeIterator(it, err) + } + query = fmt.Sprintf( + queryFormat, + fmt.Sprintf("crdb_internal.statement_statistics_persisted%s", tableSuffix), + whereClause, + len(args)) + + it, err = ie.QueryIteratorEx(ctx, "combined-stmts-persisted-details-by-aggregated-timestamp", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + + if err != nil { + return nil, serverError(ctx, err) + } + } // If there are no results from the persisted table, retrieve the data from the combined view // with data in-memory. if !it.HasResults() { err = closeIterator(it, err) - query = fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics`, "", whereClause, len(args)) + query = fmt.Sprintf(queryFormat, "crdb_internal.statement_statistics", whereClause, len(args)) it, err = ie.QueryIteratorEx(ctx, "combined-stmts-details-by-aggregated-timestamp-with-memory", nil, sessiondata.NodeUserSessionDataOverride, query, args...) if err != nil { @@ -1139,6 +1394,7 @@ func getStatementDetailsPerPlanHash( whereClause string, args []interface{}, limit int64, + activityTableHasAllData bool, tableSuffix string, ) ([]serverpb.StatementDetailsResponse_CollectedStatementGroupedByPlanHash, error) { expectedNumDatums := 5 @@ -1148,36 +1404,55 @@ func getStatementDetailsPerPlanHash( crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics, index_recommendations - FROM %s%s %s + FROM %s %s GROUP BY plan_hash, plan_gist, index_recommendations LIMIT $%d` - query := fmt.Sprintf( - queryFormat, - `crdb_internal.statement_statistics_persisted`, - tableSuffix, - whereClause, - len(args)+1) args = append(args, limit) - - it, err := ie.QueryIteratorEx(ctx, "combined-stmts-details-by-plan-hash", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) - - if err != nil { - return nil, serverError(ctx, err) - } - + var it isql.Rows + var err error + var query string defer func() { err = closeIterator(it, err) }() + if activityTableHasAllData { + query = fmt.Sprintf( + queryFormat, + "crdb_internal.statement_activity", + whereClause, + len(args)) + it, err = ie.QueryIteratorEx(ctx, "combined-stmts-activity-details-by-plan-hash", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return nil, serverError(ctx, err) + } + } + + // If there are no results from the activity table, retrieve the data from the persisted table. + if it == nil || !it.HasResults() { + if it != nil { + err = closeIterator(it, err) + } + query = fmt.Sprintf( + queryFormat, + fmt.Sprintf("crdb_internal.statement_statistics_persisted%s", tableSuffix), + whereClause, + len(args)) + it, err = ie.QueryIteratorEx(ctx, "combined-stmts-persisted-details-by-plan-hash", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return nil, serverError(ctx, err) + } + } + // If there are no results from the persisted table, retrieve the data from the combined view // with data in-memory. if !it.HasResults() { err = closeIterator(it, err) - query = fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics`, "", whereClause, len(args)) + query = fmt.Sprintf(queryFormat, "crdb_internal.statement_statistics", whereClause, len(args)) it, err = ie.QueryIteratorEx(ctx, "combined-stmts-details-by-plan-hash-with-memory", nil, sessiondata.NodeUserSessionDataOverride, query, args...) if err != nil { diff --git a/pkg/ui/workspaces/cluster-ui/src/statementsTable/statementsTable.tsx b/pkg/ui/workspaces/cluster-ui/src/statementsTable/statementsTable.tsx index b7f4e111e006..972923f03d64 100644 --- a/pkg/ui/workspaces/cluster-ui/src/statementsTable/statementsTable.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/statementsTable/statementsTable.tsx @@ -104,7 +104,13 @@ export function shortStatement( function formatStringArray(databases: string): string { try { // Case where the database is returned as an array in a string form. - return JSON.parse(databases).join(", "); + const d = JSON.parse(databases); + try { + // Case where the database is returned as an array of array in a string form. + return JSON.parse(d).join(", "); + } catch (e) { + return d.join(", "); + } } catch (e) { // Case where the database is a single value as a string. return databases; From 4dcfcf1d8c345c7edb7d7b214bac336ac18525d7 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 25 Apr 2023 18:03:44 -0700 Subject: [PATCH 4/5] copy: fix vectorized copy for INT2 and INT4 This commit fixes a recently introduced bug where we forgot to implement the special behavior in the vec handler for INT2 and INT4 types which would then could lead to a runtime crash if such a type is used in the schema. Int types are special because the vectorized engine handles them precisely (i.e. it uses int16, int32, and int64 accordingly) whereas the row engine always internally defaults to int64. Release note (bug fix): In alpha and beta 23.1.0 releases CockroachDB could crash when evaluating COPY command in some cases when the schema had INT2 and / or INT4 type, and this is now fixed. --- pkg/col/coldataext/vec_handler.go | 34 ++++++++++++++++++++------- pkg/sql/copy/copy_in_test.go | 6 ++++- pkg/sql/sem/tree/parse_string.go | 25 ++++++++++++++++---- pkg/sql/sem/tree/parse_string_test.go | 2 ++ 4 files changed, 54 insertions(+), 13 deletions(-) diff --git a/pkg/col/coldataext/vec_handler.go b/pkg/col/coldataext/vec_handler.go index 27990ddfa1b0..247af5c56de3 100644 --- a/pkg/col/coldataext/vec_handler.go +++ b/pkg/col/coldataext/vec_handler.go @@ -37,7 +37,14 @@ func MakeVecHandler(vec coldata.Vec) tree.ValueHandler { case types.DecimalFamily: v.decimals = vec.Decimal() case types.IntFamily: - v.ints = vec.Int64() + switch vec.Type().Width() { + case 16: + v.int16s = vec.Int16() + case 32: + v.int32s = vec.Int32() + default: + v.ints = vec.Int64() + } case types.FloatFamily: v.floats = vec.Float64() case types.TimestampTZFamily: @@ -55,13 +62,12 @@ func MakeVecHandler(vec coldata.Vec) tree.ValueHandler { } type vecHandler struct { - nulls *coldata.Nulls - bools coldata.Bools - bytes *coldata.Bytes - decimals coldata.Decimals - // TODO(cucaroach): implement small int types - //int16s coldata.Int16s - //int32s coldata.Int32s + nulls *coldata.Nulls + bools coldata.Bools + bytes *coldata.Bytes + decimals coldata.Decimals + int16s coldata.Int16s + int32s coldata.Int32s ints coldata.Int64s floats coldata.Float64s timestamps coldata.Times @@ -134,6 +140,18 @@ func (v *vecHandler) Float(f float64) { v.row++ } +// Int16 is part of the tree.ValueHandler interface. +func (v *vecHandler) Int16(i int16) { + v.int16s[v.row] = i + v.row++ +} + +// Int32 is part of the tree.ValueHandler interface. +func (v *vecHandler) Int32(i int32) { + v.int32s[v.row] = i + v.row++ +} + // Int is part of the tree.ValueHandler interface. func (v *vecHandler) Int(i int64) { v.ints[v.row] = i diff --git a/pkg/sql/copy/copy_in_test.go b/pkg/sql/copy/copy_in_test.go index bc7f5a53807e..524f3a3447aa 100644 --- a/pkg/sql/copy/copy_in_test.go +++ b/pkg/sql/copy/copy_in_test.go @@ -160,6 +160,8 @@ func TestCopyFromRandom(t *testing.T) { cs TEXT COLLATE en_us_u_ks_level2, o BOOL, i INT, + i2 INT2, + i4 INT4, f FLOAT, e DECIMAL, t TIME, @@ -184,7 +186,7 @@ func TestCopyFromRandom(t *testing.T) { t.Fatal(err) } - stmt, err := txn.Prepare(pq.CopyInSchema("d", "t", "id", "n", "cs", "o", "i", "f", "e", "t", "ttz", "ts", "s", "b", "u", "ip", "tz", "geography", "geometry", "box2d")) + stmt, err := txn.Prepare(pq.CopyInSchema("d", "t", "id", "n", "cs", "o", "i", "i2", "i4", "f", "e", "t", "ttz", "ts", "s", "b", "u", "ip", "tz", "geography", "geometry", "box2d")) if err != nil { t.Fatal(err) } @@ -196,6 +198,8 @@ func TestCopyFromRandom(t *testing.T) { types.MakeCollatedString(types.String, "en_us_u_ks_level2"), types.Bool, types.Int, + types.Int2, + types.Int4, types.Float, types.Decimal, types.Time, diff --git a/pkg/sql/sem/tree/parse_string.go b/pkg/sql/sem/tree/parse_string.go index b11076d54269..a1507d0fa66c 100644 --- a/pkg/sql/sem/tree/parse_string.go +++ b/pkg/sql/sem/tree/parse_string.go @@ -170,6 +170,8 @@ type ValueHandler interface { // Decimal returns a pointer into the vec for in place construction. Decimal() *apd.Decimal Float(f float64) + Int16(i int16) + Int32(i int32) Int(i int64) Duration(d duration.Duration) JSON(j json.JSON) @@ -219,10 +221,25 @@ func ParseAndRequireStringHandler( } case types.IntFamily: var i int64 - if i, err = strconv.ParseInt(s, 0, 64); err == nil { - vh.Int(i) - } else { - err = MakeParseError(s, types.Int, err) + switch t.Width() { + case 16: + if i, err = strconv.ParseInt(s, 0, 16); err == nil { + vh.Int16(int16(i)) + } else { + err = MakeParseError(s, t, err) + } + case 32: + if i, err = strconv.ParseInt(s, 0, 32); err == nil { + vh.Int32(int32(i)) + } else { + err = MakeParseError(s, t, err) + } + default: + if i, err = strconv.ParseInt(s, 0, 64); err == nil { + vh.Int(i) + } else { + err = MakeParseError(s, t, err) + } } case types.JsonFamily: var j json.JSON diff --git a/pkg/sql/sem/tree/parse_string_test.go b/pkg/sql/sem/tree/parse_string_test.go index f89c26a106ad..e351319b8913 100644 --- a/pkg/sql/sem/tree/parse_string_test.go +++ b/pkg/sql/sem/tree/parse_string_test.go @@ -151,6 +151,8 @@ func (a *anyHandler) Bool(b bool) { a.val = b } func (a *anyHandler) Bytes(b []byte) { a.val = b } func (a *anyHandler) Decimal() *apd.Decimal { return &a.dec } func (a *anyHandler) Float(f float64) { a.val = f } +func (a *anyHandler) Int16(i int16) { a.val = i } +func (a *anyHandler) Int32(i int32) { a.val = i } func (a *anyHandler) Int(i int64) { a.val = i } func (a *anyHandler) Duration(d duration.Duration) { a.val = d } func (a *anyHandler) JSON(j json.JSON) { a.val = j } From f13f93a43af5e0473369e3b4fc461db8ad849408 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Mon, 24 Apr 2023 14:26:42 -0700 Subject: [PATCH 5/5] workload: remove use of pgx v5's tracelogger In benchmarking workloads, pgx v5's tracelog incurs too much CPU overhead and GC pressure and wasn't being used. Release note: None Epic: none --- pkg/workload/BUILD.bazel | 1 - pkg/workload/pgx_helpers.go | 41 ------------------------------------- 2 files changed, 42 deletions(-) diff --git a/pkg/workload/BUILD.bazel b/pkg/workload/BUILD.bazel index 9f498cbf37b0..2061f1ff4c5e 100644 --- a/pkg/workload/BUILD.bazel +++ b/pkg/workload/BUILD.bazel @@ -31,7 +31,6 @@ go_library( "@com_github_jackc_pgx_v5//:pgx", "@com_github_jackc_pgx_v5//pgconn", "@com_github_jackc_pgx_v5//pgxpool", - "@com_github_jackc_pgx_v5//tracelog", "@com_github_lib_pq//:pq", "@com_github_spf13_pflag//:pflag", "@org_golang_x_sync//errgroup", diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go index 0d883dbaf0d7..16d04725e7ee 100644 --- a/pkg/workload/pgx_helpers.go +++ b/pkg/workload/pgx_helpers.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/errors" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/jackc/pgx/v5/tracelog" "golang.org/x/sync/errgroup" ) @@ -87,9 +86,6 @@ type MultiConnPoolCfg struct { // max number of connections per pool. A value less than 0 skips the // connection warmup phase. WarmupConns int - - // LogLevel specifies the log level (default: warn) - LogLevel tracelog.LogLevel } // NewMultiConnPoolCfgFromFlags constructs a new MultiConnPoolCfg object based @@ -126,35 +122,6 @@ var stringToMethod = map[string]pgx.QueryExecMode{ "simple": pgx.QueryExecModeSimpleProtocol, } -// pgxLogger implements the pgx.Logger interface. -type pgxLogger struct{} - -var _ tracelog.Logger = pgxLogger{} - -// Log implements the pgx.Logger interface. -func (p pgxLogger) Log( - ctx context.Context, level tracelog.LogLevel, msg string, data map[string]interface{}, -) { - if ctx.Err() != nil { - // Don't log anything from pgx if the context was canceled by the workload - // runner. It would result in spam at the end of every workload. - return - } - if strings.Contains(msg, "restart transaction") { - // Our workloads have a lot of contention, so "restart transaction" messages - // are expected and noisy. - return - } - // data may contain error with "restart transaction" -- skip those as well. - if data != nil { - ev := data["err"] - if err, ok := ev.(error); ok && strings.Contains(err.Error(), "restart transaction") { - return - } - } - log.VInfof(ctx, log.Level(level), "pgx logger [%s]: %s logParams=%v", level.String(), msg, data) -} - // NewMultiConnPool creates a new MultiConnPool. // // Each URL gets one or more pools, and each pool has at most MaxConnsPerPool @@ -168,10 +135,6 @@ func NewMultiConnPool( m := &MultiConnPool{} m.mu.preparedStatements = map[string]string{} - logLevel := tracelog.LogLevelWarn - if cfg.LogLevel != 0 { - logLevel = cfg.LogLevel - } maxConnLifetime := 300 * time.Second if cfg.MaxConnLifetime > 0 { maxConnLifetime = cfg.MaxConnLifetime @@ -241,10 +204,6 @@ func NewMultiConnPool( connCfg := poolCfg.ConnConfig connCfg.DefaultQueryExecMode = queryMode - connCfg.Tracer = &tracelog.TraceLog{ - Logger: &pgxLogger{}, - LogLevel: logLevel, - } p, err := pgxpool.NewWithConfig(ctx, poolCfg) if err != nil { return nil, err