diff --git a/pkg/col/coldataext/vec_handler.go b/pkg/col/coldataext/vec_handler.go index 27990ddfa1b0..247af5c56de3 100644 --- a/pkg/col/coldataext/vec_handler.go +++ b/pkg/col/coldataext/vec_handler.go @@ -37,7 +37,14 @@ func MakeVecHandler(vec coldata.Vec) tree.ValueHandler { case types.DecimalFamily: v.decimals = vec.Decimal() case types.IntFamily: - v.ints = vec.Int64() + switch vec.Type().Width() { + case 16: + v.int16s = vec.Int16() + case 32: + v.int32s = vec.Int32() + default: + v.ints = vec.Int64() + } case types.FloatFamily: v.floats = vec.Float64() case types.TimestampTZFamily: @@ -55,13 +62,12 @@ func MakeVecHandler(vec coldata.Vec) tree.ValueHandler { } type vecHandler struct { - nulls *coldata.Nulls - bools coldata.Bools - bytes *coldata.Bytes - decimals coldata.Decimals - // TODO(cucaroach): implement small int types - //int16s coldata.Int16s - //int32s coldata.Int32s + nulls *coldata.Nulls + bools coldata.Bools + bytes *coldata.Bytes + decimals coldata.Decimals + int16s coldata.Int16s + int32s coldata.Int32s ints coldata.Int64s floats coldata.Float64s timestamps coldata.Times @@ -134,6 +140,18 @@ func (v *vecHandler) Float(f float64) { v.row++ } +// Int16 is part of the tree.ValueHandler interface. +func (v *vecHandler) Int16(i int16) { + v.int16s[v.row] = i + v.row++ +} + +// Int32 is part of the tree.ValueHandler interface. +func (v *vecHandler) Int32(i int32) { + v.int32s[v.row] = i + v.row++ +} + // Int is part of the tree.ValueHandler interface. func (v *vecHandler) Int(i int64) { v.ints[v.row] = i diff --git a/pkg/roachpb/version.go b/pkg/roachpb/version.go index 782b4ca3523e..6fb5baaf773c 100644 --- a/pkg/roachpb/version.go +++ b/pkg/roachpb/version.go @@ -49,9 +49,9 @@ func (v Version) LessEq(otherV Version) bool { return v.Equal(otherV) || v.Less(otherV) } -// AtLeast returns true if the receiver is greater than or requal to the parameter. +// AtLeast returns true if the receiver is greater than or equal to the parameter. func (v Version) AtLeast(otherV Version) bool { - return !otherV.Less(v) + return !v.Less(otherV) } // String implements the fmt.Stringer interface. diff --git a/pkg/roachpb/version_test.go b/pkg/roachpb/version_test.go index 29328e88b70b..162443cb5971 100644 --- a/pkg/roachpb/version_test.go +++ b/pkg/roachpb/version_test.go @@ -16,7 +16,7 @@ import ( "github.com/kr/pretty" ) -func TestVersionLess(t *testing.T) { +func TestVersionCmp(t *testing.T) { v := func(major, minor, patch, internal int32) Version { return Version{ Major: major, @@ -51,6 +51,12 @@ func TestVersionLess(t *testing.T) { if a, e := test.v1.Less(test.v2), test.less; a != e { t.Errorf("expected %s < %s? %t; got %t", pretty.Sprint(test.v1), pretty.Sprint(test.v2), e, a) } + if a, e := test.v1.Equal(test.v2), test.v1 == test.v2; a != e { + t.Errorf("expected %s = %s? %t; got %t", pretty.Sprint(test.v1), pretty.Sprint(test.v2), e, a) + } + if a, e := test.v1.AtLeast(test.v2), test.v1 == test.v2 || !test.less; a != e { + t.Errorf("expected %s >= %s? %t; got %t", pretty.Sprint(test.v1), pretty.Sprint(test.v2), e, a) + } }) } } diff --git a/pkg/server/combined_statement_stats.go b/pkg/server/combined_statement_stats.go index b8652baca123..0e394d7162d4 100644 --- a/pkg/server/combined_statement_stats.go +++ b/pkg/server/combined_statement_stats.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -78,6 +79,7 @@ func getCombinedStatementStats( settings *cluster.Settings, testingKnobs *sqlstats.TestingKnobs, ) (*serverpb.StatementsResponse, error) { + var err error showInternal := SQLStatsShowInternal.Get(&settings.SV) whereClause, orderAndLimit, args := getCombinedStatementsQueryClausesAndArgs( req, testingKnobs, showInternal, settings) @@ -87,13 +89,29 @@ func getCombinedStatementStats( if !settings.Version.IsActive(ctx, clusterversion.V23_1AddSQLStatsComputedIndexes) { tableSuffix = "_v22_2" } + // Check if the activity tables contains all the data required for the selected period from the request. + activityHasAllData := false + reqStartTime := getTimeFromSeconds(req.Start) + if settings.Version.IsActive(ctx, clusterversion.V23_1AddSystemActivityTables) { + activityHasAllData, err = activityTablesHaveFullData(ctx, ie, testingKnobs, reqStartTime) + if err != nil { + log.Errorf(ctx, "Error on activityTablesHaveFullData: %s", err) + } + } var statements []serverpb.StatementsResponse_CollectedStatementStatistics var transactions []serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics - var err error if req.FetchMode == nil || req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly { - transactions, err = collectCombinedTransactions(ctx, ie, whereClause, args, orderAndLimit, testingKnobs, tableSuffix) + transactions, err = collectCombinedTransactions( + ctx, + ie, + whereClause, + args, + orderAndLimit, + testingKnobs, + activityHasAllData, + tableSuffix) if err != nil { return nil, serverError(ctx, err) } @@ -102,16 +120,37 @@ func getCombinedStatementStats( if req.FetchMode != nil && req.FetchMode.StatsType == serverpb.CombinedStatementsStatsRequest_TxnStatsOnly { // If we're fetching for txns, the client still expects statement stats for // stmts in the txns response. - statements, err = collectStmtsForTxns(ctx, ie, req, transactions, testingKnobs, tableSuffix) + statements, err = collectStmtsForTxns( + ctx, + ie, + req, + transactions, + testingKnobs, + activityHasAllData, + tableSuffix) } else { - statements, err = collectCombinedStatements(ctx, ie, whereClause, args, orderAndLimit, testingKnobs, tableSuffix) + statements, err = collectCombinedStatements( + ctx, + ie, + whereClause, + args, + orderAndLimit, + testingKnobs, + activityHasAllData, + tableSuffix) } if err != nil { return nil, serverError(ctx, err) } - stmtsRunTime, txnsRunTime, err := getTotalRuntimeSecs(ctx, req, ie, testingKnobs, tableSuffix) + stmtsRunTime, txnsRunTime, err := getTotalRuntimeSecs( + ctx, + req, + ie, + testingKnobs, + activityHasAllData, + tableSuffix) if err != nil { return nil, serverError(ctx, err) @@ -129,11 +168,62 @@ func getCombinedStatementStats( return response, nil } +func activityTablesHaveFullData( + ctx context.Context, + ie *sql.InternalExecutor, + testingKnobs *sqlstats.TestingKnobs, + reqStartTime *time.Time, +) (result bool, err error) { + var auxDate time.Time + dateFormat := "2006-01-02 15:04:05.00" + auxDate, err = time.Parse(dateFormat, timeutil.Now().String()) + + queryWithPlaceholders := ` +SELECT + COALESCE(min(aggregated_ts), '%s') +FROM crdb_internal.statement_activity +%s +` + + it, err := ie.QueryIteratorEx( + ctx, + "activity-min-ts", + nil, + sessiondata.NodeUserSessionDataOverride, + fmt.Sprintf(queryWithPlaceholders, auxDate.Format(dateFormat), testingKnobs.GetAOSTClause())) + + if err != nil { + return false, err + } + ok, err := it.Next(ctx) + if err != nil { + return false, err + } + if !ok { + return false, errors.New("expected one row but got none on activityTablesHaveFullData") + } + + var row tree.Datums + if row = it.Cur(); row == nil { + return false, errors.New("unexpected null row on activityTablesHaveFullData") + } + + defer func() { + err = closeIterator(it, err) + }() + + minAggregatedTs := tree.MustBeDTimestampTZ(row[0]).Time + hasData := !minAggregatedTs.Equal(auxDate) && (reqStartTime.After(minAggregatedTs) || reqStartTime.Equal(minAggregatedTs)) + + return hasData, nil +} + func getTotalRuntimeSecs( ctx context.Context, req *serverpb.CombinedStatementsStatsRequest, ie *sql.InternalExecutor, testingKnobs *sqlstats.TestingKnobs, + activityTableHasAllData bool, tableSuffix string, ) (stmtsRuntime float32, txnsRuntime float32, err error) { var buffer strings.Builder @@ -164,7 +254,7 @@ COALESCE( (statistics -> 'statistics' ->> 'cnt')::FLOAT ) , 0) -FROM crdb_internal.%s_statistics%s%s +FROM %s %s ` @@ -174,7 +264,7 @@ FROM crdb_internal.%s_statistics%s%s fmt.Sprintf(`%s-total-runtime`, table), nil, sessiondata.NodeUserSessionDataOverride, - fmt.Sprintf(queryWithPlaceholders, table, `_persisted`, tableSuffix, whereClause), + fmt.Sprintf(queryWithPlaceholders, table, whereClause), args...) if err != nil { @@ -193,56 +283,60 @@ FROM crdb_internal.%s_statistics%s%s return 0, errors.New("unexpected null row on getTotalRuntimeSecs") } - // If the total runtime is 0 there were no results from the persisted table, - // so we retrieve the data from the combined view with data in-memory. - if tree.MustBeDFloat(row[0]) == 0 { - err = closeIterator(it, err) - if err != nil { - return 0, err - } - it, err = ie.QueryIteratorEx( - ctx, - fmt.Sprintf(`%s-total-runtime-with-memory`, table), - nil, - sessiondata.NodeUserSessionDataOverride, - fmt.Sprintf(queryWithPlaceholders, table, ``, ``, whereClause), - args...) - - if err != nil { - return 0, err - } - ok, err = it.Next(ctx) - if err != nil { - return 0, err - } - if !ok { - return 0, errors.New("expected one row but got none on getTotalRuntimeSecs") - } - - if row = it.Cur(); row == nil { - return 0, errors.New("unexpected null row on getTotalRuntimeSecs") - } - } - defer func() { err = closeIterator(it, err) }() return float32(tree.MustBeDFloat(row[0])), nil - } + stmtsRuntime = 0 if req.FetchMode == nil || req.FetchMode.StatsType != serverpb.CombinedStatementsStatsRequest_TxnStatsOnly { - stmtsRuntime, err = getRuntime("statement") - if err != nil { - return 0, 0, err + if activityTableHasAllData { + stmtsRuntime, err = getRuntime("crdb_internal.statement_activity") + if err != nil { + return 0, 0, err + } + } + // If there are no results from the activity table, retrieve the data from the persisted table. + if stmtsRuntime == 0 { + stmtsRuntime, err = getRuntime(fmt.Sprintf("crdb_internal.statement_statistics_persisted%s", tableSuffix)) + if err != nil { + return 0, 0, err + } + } + // If there are no results from the persisted table, retrieve the data from the combined view + // with data in-memory. + if stmtsRuntime == 0 { + stmtsRuntime, err = getRuntime("crdb_internal.statement_statistics") + if err != nil { + return 0, 0, err + } } } + txnsRuntime = 0 if req.FetchMode == nil || req.FetchMode.StatsType != serverpb.CombinedStatementsStatsRequest_StmtStatsOnly { - txnsRuntime, err = getRuntime("transaction") - if err != nil { - return 0, 0, err + if activityTableHasAllData { + txnsRuntime, err = getRuntime("crdb_internal.transaction_activity") + if err != nil { + return 0, 0, err + } + } + // If there are no results from the activity table, retrieve the data from the persisted table. + if txnsRuntime == 0 { + txnsRuntime, err = getRuntime(fmt.Sprintf("crdb_internal.transaction_statistics_persisted%s", tableSuffix)) + if err != nil { + return 0, 0, err + } + } + // If there are no results from the persisted table, retrieve the data from the combined view + // with data in-memory. + if txnsRuntime == 0 { + txnsRuntime, err = getRuntime("crdb_internal.transaction_statistics") + if err != nil { + return 0, 0, err + } } } @@ -398,6 +492,7 @@ func collectCombinedStatements( args []interface{}, orderAndLimit string, testingKnobs *sqlstats.TestingKnobs, + activityTableHasAllData bool, tableSuffix string, ) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) { aostClause := testingKnobs.GetAOSTClause() @@ -411,45 +506,69 @@ SELECT max(aggregated_ts) as aggregated_ts, crdb_internal.merge_stats_metadata(array_agg(metadata)) as metadata, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics -FROM %s%s %s +FROM %s %s GROUP BY fingerprint_id, app_name ) %s %s` - query := fmt.Sprintf( - queryFormat, - `crdb_internal.statement_statistics_persisted`, - tableSuffix, - whereClause, - aostClause, - orderAndLimit) - it, err := ie.QueryIteratorEx(ctx, "combined-stmts-by-interval", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) - - if err != nil { - return nil, serverError(ctx, err) - } - + var it isql.Rows + var err error defer func() { err = closeIterator(it, err) }() + if activityTableHasAllData { + it, err = getIterator( + ctx, + ie, + queryFormat, + "crdb_internal.statement_activity", + "combined-stmts-activity-by-interval", + whereClause, + args, + aostClause, + orderAndLimit) + if err != nil { + return nil, serverError(ctx, err) + } + } + + // If there are no results from the activity table, retrieve the data from the persisted table. + if it == nil || !it.HasResults() { + if it != nil { + err = closeIterator(it, err) + } + it, err = getIterator( + ctx, + ie, + queryFormat, + fmt.Sprintf("crdb_internal.statement_statistics_persisted%s", tableSuffix), + "combined-stmts-persisted-by-interval", + whereClause, + args, + aostClause, + orderAndLimit) + if err != nil { + return nil, serverError(ctx, err) + } + } + // If there are no results from the persisted table, retrieve the data from the combined view // with data in-memory. if !it.HasResults() { err = closeIterator(it, err) - - query = fmt.Sprintf( + it, err = getIterator( + ctx, + ie, queryFormat, - `crdb_internal.statement_statistics`, - "", + "crdb_internal.statement_statistics", + "combined-stmts-with-memory-by-interval", whereClause, + args, aostClause, orderAndLimit) - it, err = ie.QueryIteratorEx(ctx, "combined-stmts-by-interval-with-memory", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) if err != nil { return nil, serverError(ctx, err) } @@ -520,6 +639,34 @@ GROUP BY return statements, nil } +func getIterator( + ctx context.Context, + ie *sql.InternalExecutor, + queryFormat string, + table string, + queryInfo string, + whereClause string, + args []interface{}, + aostClause string, + orderAndLimit string, +) (isql.Rows, error) { + + query := fmt.Sprintf( + queryFormat, + table, + whereClause, + aostClause, + orderAndLimit) + + it, err := ie.QueryIteratorEx(ctx, queryInfo, nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return it, serverError(ctx, err) + } + + return it, nil +} + func collectCombinedTransactions( ctx context.Context, ie *sql.InternalExecutor, @@ -527,6 +674,7 @@ func collectCombinedTransactions( args []interface{}, orderAndLimit string, testingKnobs *sqlstats.TestingKnobs, + activityTableHasAllData bool, tableSuffix string, ) ([]serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics, error) { aostClause := testingKnobs.GetAOSTClause() @@ -539,46 +687,68 @@ SELECT fingerprint_id, max(metadata), crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics -FROM %s%s %s +FROM %s %s GROUP BY app_name, fingerprint_id ) %s %s` - query := fmt.Sprintf( - queryFormat, - `crdb_internal.transaction_statistics_persisted`, - tableSuffix, - whereClause, - aostClause, - orderAndLimit) - - it, err := ie.QueryIteratorEx(ctx, "combined-txns-by-interval", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) - - if err != nil { - return nil, serverError(ctx, err) - } - + var it isql.Rows + var err error defer func() { err = closeIterator(it, err) }() + if activityTableHasAllData { + it, err = getIterator( + ctx, + ie, + queryFormat, + "crdb_internal.transaction_activity", + "combined-txns-activity-by-interval", + whereClause, + args, + aostClause, + orderAndLimit) + if err != nil { + return nil, serverError(ctx, err) + } + } + + // If there are no results from the activity table, retrieve the data from the persisted table. + if it == nil || !it.HasResults() { + if it != nil { + err = closeIterator(it, err) + } + it, err = getIterator( + ctx, + ie, + queryFormat, + fmt.Sprintf("crdb_internal.transaction_statistics_persisted%s", tableSuffix), + "combined-txns-persisted-by-interval", + whereClause, + args, + aostClause, + orderAndLimit) + if err != nil { + return nil, serverError(ctx, err) + } + } // If there are no results from the persisted table, retrieve the data from the combined view // with data in-memory. if !it.HasResults() { err = closeIterator(it, err) - - query = fmt.Sprintf( + it, err = getIterator( + ctx, + ie, queryFormat, - `crdb_internal.transaction_statistics`, - "", + "crdb_internal.transaction_statistics", + "combined-txns-with-memory-by-interval", whereClause, + args, aostClause, orderAndLimit) - it, err = ie.QueryIteratorEx(ctx, "combined-txn-by-interval-with-memory", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) if err != nil { return nil, serverError(ctx, err) } @@ -641,6 +811,7 @@ func collectStmtsForTxns( req *serverpb.CombinedStatementsStatsRequest, transactions []serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics, testingKnobs *sqlstats.TestingKnobs, + activityTableHasAllData bool, tableSuffix string, ) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) { @@ -653,7 +824,7 @@ SELECT crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics, app_name -FROM %s%s %s +FROM %s %s GROUP BY fingerprint_id, transaction_fingerprint_id, @@ -661,24 +832,43 @@ GROUP BY ` const expectedNumDatums = 5 + var it isql.Rows + var err error + var query string - query := fmt.Sprintf(queryFormat, "crdb_internal.statement_statistics_persisted", tableSuffix, whereClause) + if activityTableHasAllData { + query = fmt.Sprintf(queryFormat, "crdb_internal.statement_activity", whereClause) + it, err = ie.QueryIteratorEx(ctx, "stmts-activity-for-txn", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return nil, serverError(ctx, err) + } + } - it, err := ie.QueryIteratorEx(ctx, "stmts-for-txn", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) + // If there are no results from the activity table, retrieve the data from the persisted table. + if it == nil || !it.HasResults() { + if it != nil { + err = closeIterator(it, err) + } + query = fmt.Sprintf( + queryFormat, + fmt.Sprintf("crdb_internal.statement_statistics_persisted%s", tableSuffix), + whereClause) + it, err = ie.QueryIteratorEx(ctx, "stmts-persisted-for-txn", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) - if err != nil { - return nil, serverError(ctx, err) + if err != nil { + return nil, serverError(ctx, err) + } } // If there are no results from the persisted table, retrieve the data from the combined view // with data in-memory. if !it.HasResults() { err = closeIterator(it, err) + query = fmt.Sprintf(queryFormat, "crdb_internal.statement_statistics", whereClause) - query = fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics`, "", whereClause) - - it, err = ie.QueryIteratorEx(ctx, "stmts-for-txn-with-memory", nil, + it, err = ie.QueryIteratorEx(ctx, "stmts-with-memory-for-txn", nil, sessiondata.NodeUserSessionDataOverride, query, args...) if err != nil { @@ -787,16 +977,39 @@ func getStatementDetails( if !settings.Version.IsActive(ctx, clusterversion.V23_1AddSQLStatsComputedIndexes) { tableSuffix = "_v22_2" } + // Check if the activity tables have data within the selected period. + activityHasData := false + reqStartTime := getTimeFromSeconds(req.Start) + if settings.Version.IsActive(ctx, clusterversion.V23_1AddSystemActivityTables) { + activityHasData, err = activityTablesHaveFullData(ctx, ie, testingKnobs, reqStartTime) + if err != nil { + return nil, serverError(ctx, err) + } + } - statementTotal, err := getTotalStatementDetails(ctx, ie, whereClause, args, tableSuffix) + statementTotal, err := getTotalStatementDetails(ctx, ie, whereClause, args, activityHasData, tableSuffix) if err != nil { return nil, serverError(ctx, err) } - statementStatisticsPerAggregatedTs, err := getStatementDetailsPerAggregatedTs(ctx, ie, whereClause, args, limit, tableSuffix) + statementStatisticsPerAggregatedTs, err := getStatementDetailsPerAggregatedTs( + ctx, + ie, + whereClause, + args, + limit, + activityHasData, + tableSuffix) if err != nil { return nil, serverError(ctx, err) } - statementStatisticsPerPlanHash, err := getStatementDetailsPerPlanHash(ctx, ie, whereClause, args, limit, tableSuffix) + statementStatisticsPerPlanHash, err := getStatementDetailsPerPlanHash( + ctx, + ie, + whereClause, + args, + limit, + activityHasData, + tableSuffix) if err != nil { return nil, serverError(ctx, err) } @@ -900,6 +1113,7 @@ func getTotalStatementDetails( ie *sql.InternalExecutor, whereClause string, args []interface{}, + activityTableHasAllData bool, tableSuffix string, ) (serverpb.StatementDetailsResponse_CollectedStatementSummary, error) { const expectedNumDatums = 4 @@ -909,22 +1123,39 @@ func getTotalStatementDetails( array_agg(app_name) as app_names, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics, encode(fingerprint_id, 'hex') as fingerprint_id - FROM %s%s %s + FROM %s %s GROUP BY fingerprint_id LIMIT 1` - query := fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics_persisted`, tableSuffix, whereClause) + var row tree.Datums + var err error + var query string - row, err := ie.QueryRowEx(ctx, "combined-stmts-details-total", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) - if err != nil { - return statement, serverError(ctx, err) + if activityTableHasAllData { + query = fmt.Sprintf(queryFormat, "crdb_internal.statement_activity", whereClause) + row, err = ie.QueryRowEx(ctx, "combined-stmts-activity-details-total", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return statement, serverError(ctx, err) + } + } + // If there are no results from the activity table, retrieve the data from the persisted table. + if row == nil || row.Len() == 0 { + query = fmt.Sprintf( + queryFormat, + fmt.Sprintf("crdb_internal.statement_statistics_persisted%s", tableSuffix), + whereClause) + row, err = ie.QueryRowEx(ctx, "combined-stmts-persisted-details-total", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return statement, serverError(ctx, err) + } } // If there are no results from the persisted table, retrieve the data from the combined view // with data in-memory. if row.Len() == 0 { - query = fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics`, "", whereClause) + query = fmt.Sprintf(queryFormat, "crdb_internal.statement_statistics", whereClause) row, err = ie.QueryRowEx(ctx, "combined-stmts-details-total-with-memory", nil, sessiondata.NodeUserSessionDataOverride, query, args...) if err != nil { @@ -981,6 +1212,7 @@ func getStatementDetailsPerAggregatedTs( whereClause string, args []interface{}, limit int64, + activityTableHasAllData bool, tableSuffix string, ) ([]serverpb.StatementDetailsResponse_CollectedStatementGroupedByAggregatedTs, error) { const expectedNumDatums = 3 @@ -988,35 +1220,58 @@ func getStatementDetailsPerAggregatedTs( aggregated_ts, crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics - FROM %s%s %s + FROM %s %s GROUP BY aggregated_ts ORDER BY aggregated_ts ASC LIMIT $%d` - query := fmt.Sprintf( - queryFormat, - `crdb_internal.statement_statistics_persisted`, - tableSuffix, - whereClause, - len(args)+1) + var it isql.Rows + var err error + var query string + defer func() { + err = closeIterator(it, err) + }() args = append(args, limit) - it, err := ie.QueryIteratorEx(ctx, "combined-stmts-details-by-aggregated-timestamp", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) + if activityTableHasAllData { + query = fmt.Sprintf( + queryFormat, + "crdb_internal.statement_activity", + whereClause, + len(args)) - if err != nil { - return nil, serverError(ctx, err) + it, err = ie.QueryIteratorEx(ctx, "combined-stmts-activity-details-by-aggregated-timestamp", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + + if err != nil { + return nil, serverError(ctx, err) + } } - defer func() { - err = closeIterator(it, err) - }() + // If there are no results from the activity table, retrieve the data from the persisted table. + if it == nil || !it.HasResults() { + if it != nil { + err = closeIterator(it, err) + } + query = fmt.Sprintf( + queryFormat, + fmt.Sprintf("crdb_internal.statement_statistics_persisted%s", tableSuffix), + whereClause, + len(args)) + + it, err = ie.QueryIteratorEx(ctx, "combined-stmts-persisted-details-by-aggregated-timestamp", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + + if err != nil { + return nil, serverError(ctx, err) + } + } // If there are no results from the persisted table, retrieve the data from the combined view // with data in-memory. if !it.HasResults() { err = closeIterator(it, err) - query = fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics`, "", whereClause, len(args)) + query = fmt.Sprintf(queryFormat, "crdb_internal.statement_statistics", whereClause, len(args)) it, err = ie.QueryIteratorEx(ctx, "combined-stmts-details-by-aggregated-timestamp-with-memory", nil, sessiondata.NodeUserSessionDataOverride, query, args...) if err != nil { @@ -1139,6 +1394,7 @@ func getStatementDetailsPerPlanHash( whereClause string, args []interface{}, limit int64, + activityTableHasAllData bool, tableSuffix string, ) ([]serverpb.StatementDetailsResponse_CollectedStatementGroupedByPlanHash, error) { expectedNumDatums := 5 @@ -1148,36 +1404,55 @@ func getStatementDetailsPerPlanHash( crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics, index_recommendations - FROM %s%s %s + FROM %s %s GROUP BY plan_hash, plan_gist, index_recommendations LIMIT $%d` - query := fmt.Sprintf( - queryFormat, - `crdb_internal.statement_statistics_persisted`, - tableSuffix, - whereClause, - len(args)+1) args = append(args, limit) - - it, err := ie.QueryIteratorEx(ctx, "combined-stmts-details-by-plan-hash", nil, - sessiondata.NodeUserSessionDataOverride, query, args...) - - if err != nil { - return nil, serverError(ctx, err) - } - + var it isql.Rows + var err error + var query string defer func() { err = closeIterator(it, err) }() + if activityTableHasAllData { + query = fmt.Sprintf( + queryFormat, + "crdb_internal.statement_activity", + whereClause, + len(args)) + it, err = ie.QueryIteratorEx(ctx, "combined-stmts-activity-details-by-plan-hash", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return nil, serverError(ctx, err) + } + } + + // If there are no results from the activity table, retrieve the data from the persisted table. + if it == nil || !it.HasResults() { + if it != nil { + err = closeIterator(it, err) + } + query = fmt.Sprintf( + queryFormat, + fmt.Sprintf("crdb_internal.statement_statistics_persisted%s", tableSuffix), + whereClause, + len(args)) + it, err = ie.QueryIteratorEx(ctx, "combined-stmts-persisted-details-by-plan-hash", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return nil, serverError(ctx, err) + } + } + // If there are no results from the persisted table, retrieve the data from the combined view // with data in-memory. if !it.HasResults() { err = closeIterator(it, err) - query = fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics`, "", whereClause, len(args)) + query = fmt.Sprintf(queryFormat, "crdb_internal.statement_statistics", whereClause, len(args)) it, err = ie.QueryIteratorEx(ctx, "combined-stmts-details-by-plan-hash-with-memory", nil, sessiondata.NodeUserSessionDataOverride, query, args...) if err != nil { diff --git a/pkg/sql/copy/copy_in_test.go b/pkg/sql/copy/copy_in_test.go index bc7f5a53807e..524f3a3447aa 100644 --- a/pkg/sql/copy/copy_in_test.go +++ b/pkg/sql/copy/copy_in_test.go @@ -160,6 +160,8 @@ func TestCopyFromRandom(t *testing.T) { cs TEXT COLLATE en_us_u_ks_level2, o BOOL, i INT, + i2 INT2, + i4 INT4, f FLOAT, e DECIMAL, t TIME, @@ -184,7 +186,7 @@ func TestCopyFromRandom(t *testing.T) { t.Fatal(err) } - stmt, err := txn.Prepare(pq.CopyInSchema("d", "t", "id", "n", "cs", "o", "i", "f", "e", "t", "ttz", "ts", "s", "b", "u", "ip", "tz", "geography", "geometry", "box2d")) + stmt, err := txn.Prepare(pq.CopyInSchema("d", "t", "id", "n", "cs", "o", "i", "i2", "i4", "f", "e", "t", "ttz", "ts", "s", "b", "u", "ip", "tz", "geography", "geometry", "box2d")) if err != nil { t.Fatal(err) } @@ -196,6 +198,8 @@ func TestCopyFromRandom(t *testing.T) { types.MakeCollatedString(types.String, "en_us_u_ks_level2"), types.Bool, types.Int, + types.Int2, + types.Int4, types.Float, types.Decimal, types.Time, diff --git a/pkg/sql/sem/tree/parse_string.go b/pkg/sql/sem/tree/parse_string.go index b11076d54269..a1507d0fa66c 100644 --- a/pkg/sql/sem/tree/parse_string.go +++ b/pkg/sql/sem/tree/parse_string.go @@ -170,6 +170,8 @@ type ValueHandler interface { // Decimal returns a pointer into the vec for in place construction. Decimal() *apd.Decimal Float(f float64) + Int16(i int16) + Int32(i int32) Int(i int64) Duration(d duration.Duration) JSON(j json.JSON) @@ -219,10 +221,25 @@ func ParseAndRequireStringHandler( } case types.IntFamily: var i int64 - if i, err = strconv.ParseInt(s, 0, 64); err == nil { - vh.Int(i) - } else { - err = MakeParseError(s, types.Int, err) + switch t.Width() { + case 16: + if i, err = strconv.ParseInt(s, 0, 16); err == nil { + vh.Int16(int16(i)) + } else { + err = MakeParseError(s, t, err) + } + case 32: + if i, err = strconv.ParseInt(s, 0, 32); err == nil { + vh.Int32(int32(i)) + } else { + err = MakeParseError(s, t, err) + } + default: + if i, err = strconv.ParseInt(s, 0, 64); err == nil { + vh.Int(i) + } else { + err = MakeParseError(s, t, err) + } } case types.JsonFamily: var j json.JSON diff --git a/pkg/sql/sem/tree/parse_string_test.go b/pkg/sql/sem/tree/parse_string_test.go index f89c26a106ad..e351319b8913 100644 --- a/pkg/sql/sem/tree/parse_string_test.go +++ b/pkg/sql/sem/tree/parse_string_test.go @@ -151,6 +151,8 @@ func (a *anyHandler) Bool(b bool) { a.val = b } func (a *anyHandler) Bytes(b []byte) { a.val = b } func (a *anyHandler) Decimal() *apd.Decimal { return &a.dec } func (a *anyHandler) Float(f float64) { a.val = f } +func (a *anyHandler) Int16(i int16) { a.val = i } +func (a *anyHandler) Int32(i int32) { a.val = i } func (a *anyHandler) Int(i int64) { a.val = i } func (a *anyHandler) Duration(d duration.Duration) { a.val = d } func (a *anyHandler) JSON(j json.JSON) { a.val = j } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 3864dda99e7a..f3f3243d410a 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -843,14 +843,10 @@ func (p *Pebble) SetStoreID(ctx context.Context, storeID int32) error { if p == nil { return nil } - if p.storeIDPebbleLog != nil { - p.storeIDPebbleLog.Set(ctx, storeID) - } + p.storeIDPebbleLog.Set(ctx, storeID) // Note that SetCreatorID only does something if shared storage is configured // in the pebble options. The version gate protects against accidentally // setting the creator ID on an older store. - // TODO(radu): we don't yet have a complete story about how we will transition - // an existing store to use shared storage. if storeID != base.TempStoreID && p.minVersion.AtLeast(clusterversion.ByKey(clusterversion.V23_1SetPebbleCreatorID)) { if err := p.db.SetCreatorID(uint64(storeID)); err != nil { return err @@ -2004,6 +2000,15 @@ func (p *Pebble) SetMinVersion(version roachpb.Version) error { return err } + // Set the shared object creator ID if the version is high enough. See SetStoreID(). + if version.AtLeast(clusterversion.ByKey(clusterversion.V23_1SetPebbleCreatorID)) { + if storeID := p.storeIDPebbleLog.Get(); storeID != 0 && storeID != base.TempStoreID { + if err := p.db.SetCreatorID(uint64(storeID)); err != nil { + return err + } + } + } + // Pebble has a concept of format major versions, similar to cluster // versions. Backwards incompatible changes to Pebble's on-disk // format are gated behind new format major versions. Bumping the diff --git a/pkg/ui/workspaces/cluster-ui/src/statementsTable/statementsTable.tsx b/pkg/ui/workspaces/cluster-ui/src/statementsTable/statementsTable.tsx index c5189acd7577..a8cf064ea132 100644 --- a/pkg/ui/workspaces/cluster-ui/src/statementsTable/statementsTable.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/statementsTable/statementsTable.tsx @@ -105,7 +105,13 @@ export function shortStatement( function formatStringArray(databases: string): string { try { // Case where the database is returned as an array in a string form. - return JSON.parse(databases).join(", "); + const d = JSON.parse(databases); + try { + // Case where the database is returned as an array of array in a string form. + return JSON.parse(d).join(", "); + } catch (e) { + return d.join(", "); + } } catch (e) { // Case where the database is a single value as a string. return databases; diff --git a/pkg/workload/BUILD.bazel b/pkg/workload/BUILD.bazel index 9f498cbf37b0..2061f1ff4c5e 100644 --- a/pkg/workload/BUILD.bazel +++ b/pkg/workload/BUILD.bazel @@ -31,7 +31,6 @@ go_library( "@com_github_jackc_pgx_v5//:pgx", "@com_github_jackc_pgx_v5//pgconn", "@com_github_jackc_pgx_v5//pgxpool", - "@com_github_jackc_pgx_v5//tracelog", "@com_github_lib_pq//:pq", "@com_github_spf13_pflag//:pflag", "@org_golang_x_sync//errgroup", diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go index cd99512f97f4..16d04725e7ee 100644 --- a/pkg/workload/pgx_helpers.go +++ b/pkg/workload/pgx_helpers.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/errors" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/jackc/pgx/v5/tracelog" "golang.org/x/sync/errgroup" ) @@ -36,8 +35,11 @@ type MultiConnPool struct { // preparedStatements is a map from name to SQL. The statements in the map // are prepared whenever a new connection is acquired from the pool. preparedStatements map[string]string - method pgx.QueryExecMode } + + // NOTE(seanc@): method is on the hot-path, therefore make all reads to the + // query exec mode a dirty read. + method pgx.QueryExecMode } // MultiConnPoolCfg encapsulates the knobs passed to NewMultiConnPool. @@ -84,9 +86,6 @@ type MultiConnPoolCfg struct { // max number of connections per pool. A value less than 0 skips the // connection warmup phase. WarmupConns int - - // LogLevel specifies the log level (default: warn) - LogLevel tracelog.LogLevel } // NewMultiConnPoolCfgFromFlags constructs a new MultiConnPoolCfg object based @@ -123,35 +122,6 @@ var stringToMethod = map[string]pgx.QueryExecMode{ "simple": pgx.QueryExecModeSimpleProtocol, } -// pgxLogger implements the pgx.Logger interface. -type pgxLogger struct{} - -var _ tracelog.Logger = pgxLogger{} - -// Log implements the pgx.Logger interface. -func (p pgxLogger) Log( - ctx context.Context, level tracelog.LogLevel, msg string, data map[string]interface{}, -) { - if ctx.Err() != nil { - // Don't log anything from pgx if the context was canceled by the workload - // runner. It would result in spam at the end of every workload. - return - } - if strings.Contains(msg, "restart transaction") { - // Our workloads have a lot of contention, so "restart transaction" messages - // are expected and noisy. - return - } - // data may contain error with "restart transaction" -- skip those as well. - if data != nil { - ev := data["err"] - if err, ok := ev.(error); ok && strings.Contains(err.Error(), "restart transaction") { - return - } - } - log.VInfof(ctx, log.Level(level), "pgx logger [%s]: %s logParams=%v", level.String(), msg, data) -} - // NewMultiConnPool creates a new MultiConnPool. // // Each URL gets one or more pools, and each pool has at most MaxConnsPerPool @@ -165,10 +135,6 @@ func NewMultiConnPool( m := &MultiConnPool{} m.mu.preparedStatements = map[string]string{} - logLevel := tracelog.LogLevelWarn - if cfg.LogLevel != 0 { - logLevel = cfg.LogLevel - } maxConnLifetime := 300 * time.Second if cfg.MaxConnLifetime > 0 { maxConnLifetime = cfg.MaxConnLifetime @@ -203,7 +169,7 @@ func NewMultiConnPool( if !ok { return nil, errors.Errorf("unknown method %s", cfg.Method) } - m.mu.method = queryMode + m.method = queryMode for i := range urls { connsPerPool := distributeMax(connsPerURL[i], maxConnsPerPool) @@ -238,10 +204,6 @@ func NewMultiConnPool( connCfg := poolCfg.ConnConfig connCfg.DefaultQueryExecMode = queryMode - connCfg.Tracer = &tracelog.TraceLog{ - Logger: &pgxLogger{}, - LogLevel: logLevel, - } p, err := pgxpool.NewWithConfig(ctx, poolCfg) if err != nil { return nil, err @@ -287,9 +249,7 @@ func (m *MultiConnPool) Close() { // Method returns the query execution mode of the connection pool. func (m *MultiConnPool) Method() pgx.QueryExecMode { - m.mu.Lock() - defer m.mu.Unlock() - return m.mu.method + return m.method } // WarmupConns warms up numConns connections across all pools contained within