Skip to content

Commit

Permalink
sql,roachpb: add plan hash correct value to persisted stats
Browse files Browse the repository at this point in the history
Previously, we didn't have the plan hash/gist values, so
a dummy value was being used instead. Now that we have the value,
this commit uses those values to be corrected stored.
The Plan Hash is saved on its own column and is part of a
statement key. A plan gist is a string saved in the metadata
and can later on converted back into a logical plan.
The combined statements groups the value no using the
plan hash as a key, and creates a list of gist executed by
the same fingerprint id.

Partially addresses cockroachdb#72129

Release note (sql change): Saving plan hash/gist to the Statements
persisted stats inside Statistics column.
  • Loading branch information
maryliag committed Feb 4, 2022
1 parent b51c9b4 commit 7f1395a
Show file tree
Hide file tree
Showing 20 changed files with 166 additions and 59 deletions.
1 change: 1 addition & 0 deletions pkg/roachpb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (s *StatementStatistics) Add(other *StatementStatistics) {
s.RowsRead.Add(other.RowsRead, s.Count, other.Count)
s.RowsWritten.Add(other.RowsWritten, s.Count, other.Count)
s.Nodes = util.CombineUniqueInt64(s.Nodes, other.Nodes)
s.PlanGists = util.CombineUniqueString(s.PlanGists, other.PlanGists)

s.ExecStats.Add(other.ExecStats)

Expand Down
6 changes: 6 additions & 0 deletions pkg/roachpb/app_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ message StatementStatistics {
// Nodes is the ordered list of nodes ids on which the statement was executed.
repeated int64 nodes = 24;

// plan_gists is list of a compressed version of plan that can be converted (lossily)
// back into a logical plan.
// Each statement contain only one plan gist, but the same statement fingerprint id
// can contain more than one value.
repeated string plan_gists = 26;

// Note: be sure to update `sql/app_stats.go` when adding/removing fields here!

reserved 13, 14, 17, 18, 19, 20;
Expand Down
47 changes: 32 additions & 15 deletions pkg/server/combined_statement_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func getCombinedStatementStats(
startTime := getTimeFromSeconds(req.Start)
endTime := getTimeFromSeconds(req.End)
limit := SQLStatsResponseMax.Get(&settings.SV)
whereClause, args := getFilterAndParams(startTime, endTime, limit, testingKnobs)
statements, err := collectCombinedStatements(ctx, ie, whereClause, args)
whereClause, args := getFilterAndParams(startTime, endTime)
statements, err := collectCombinedStatements(ctx, ie, whereClause, args, limit, testingKnobs)
if err != nil {
return nil, err
}
Expand All @@ -89,12 +89,9 @@ func getCombinedStatementStats(
return response, nil
}

func getFilterAndParams(
start, end *time.Time, limit int64, testingKnobs *sqlstats.TestingKnobs,
) (string, []interface{}) {
func getFilterAndParams(start, end *time.Time) (string, []interface{}) {
var args []interface{}
var buffer strings.Builder
buffer.WriteString(testingKnobs.GetAOSTClause())

// Filter out internal statements by app name.
buffer.WriteString(" WHERE app_name NOT LIKE '$ internal%'")
Expand All @@ -109,17 +106,16 @@ func getFilterAndParams(
args = append(args, *end)
}

// Retrieve the top rows ordered by aggregation time and service latency.
buffer.WriteString(fmt.Sprintf(`
ORDER BY aggregated_ts DESC,(statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float DESC
LIMIT $%d`, len(args)+1))
args = append(args, limit)

return buffer.String(), args
}

func collectCombinedStatements(
ctx context.Context, ie *sql.InternalExecutor, whereClause string, qargs []interface{},
ctx context.Context,
ie *sql.InternalExecutor,
whereClause string,
qargs []interface{},
limit int64,
testingKnobs *sqlstats.TestingKnobs,
) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) {

query := fmt.Sprintf(
Expand All @@ -132,8 +128,29 @@ func collectCombinedStatements(
statistics,
sampled_plan,
aggregation_interval
FROM crdb_internal.statement_statistics
%s`, whereClause)
FROM (
SELECT
fingerprint_id,
transaction_fingerprint_id,
app_name,
aggregated_ts,
max(metadata) AS metadata,
crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics,
max(sampled_plan) AS sampled_plan,
aggregation_interval
FROM crdb_internal.statement_statistics
%s
GROUP BY
fingerprint_id,
transaction_fingerprint_id,
app_name,
aggregated_ts,
aggregation_interval)
%s
ORDER BY (statistics -> 'statistics' -> 'svcLat' ->> 'mean')::float DESC
LIMIT $%d`, whereClause, testingKnobs.GetAOSTClause(), len(qargs)+1)

qargs = append(qargs, limit)

const expectedNumDatums = 8

Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5118,10 +5118,8 @@ CREATE TABLE crdb_internal.cluster_statement_statistics (
transactionFingerprintID := tree.NewDBytes(
tree.DBytes(sqlstatsutil.EncodeUint64ToBytes(uint64(statistics.Key.TransactionFingerprintID))))

// TODO(azhng): properly update plan_hash value once we can expose it
// from the optimizer.
planHash := tree.NewDBytes(
tree.DBytes(sqlstatsutil.EncodeUint64ToBytes(0)))
tree.DBytes(sqlstatsutil.EncodeUint64ToBytes(statistics.Key.PlanHash)))

metadataJSON, err := sqlstatsutil.BuildStmtMetadataJSON(statistics)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func (ex *connExecutor) recordStatementSummary(
FullScan: flags.IsSet(planFlagContainsFullIndexScan) || flags.IsSet(planFlagContainsFullTableScan),
Failed: stmtErr != nil,
Database: planner.SessionData().Database,
PlanHash: planner.instrumentation.planGist.Hash(),
}

// We only populate the transaction fingerprint ID field if we are in an
Expand Down Expand Up @@ -204,6 +205,7 @@ func (ex *connExecutor) recordStatementSummary(
Nodes: getNodesFromPlanner(planner),
StatementType: stmt.AST.StatementType(),
Plan: planner.instrumentation.PlanForStats(ctx),
PlanGist: planner.instrumentation.planGist.String(),
StatementError: stmtErr,
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (ih *instrumentationHelper) Finish(
ImplicitTxn: ih.implicitTxn,
Database: p.SessionData().Database,
Failed: retErr != nil,
PlanHash: ih.planGist.Hash(),
}
// We populate transaction fingerprint ID if this is an implicit transaction.
// See executor_statement_metrics.go:recordStatementSummary() for further
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/instrumentation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func TestSampledStatsCollection(t *testing.T) {
},
))
require.NotNil(t, stats)
require.NotZero(t, stats.Key.PlanHash)
return stats
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *PersistedSQLStats) doFlushSingleStmtStats(

serializedFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(scopedStats.ID))
serializedTransactionFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(scopedStats.Key.TransactionFingerprintID))
serializedPlanHash := sqlstatsutil.EncodeUint64ToBytes(uint64(dummyPlanHash))
serializedPlanHash := sqlstatsutil.EncodeUint64ToBytes(scopedStats.Key.PlanHash)

insertFn := func(ctx context.Context, txn *kv.Txn) (alreadyExists bool, err error) {
rowsAffected, err := s.insertStatementStats(
Expand Down Expand Up @@ -417,7 +417,7 @@ WHERE fingerprint_id = $2
"plan_hash: %d, "+
"node_id: %d",
serializedFingerprintID, serializedTransactionFingerprintID, stats.Key.App,
aggregatedTs, dummyPlanHash, s.cfg.SQLIDContainer.SQLInstanceID())
aggregatedTs, serializedPlanHash, s.cfg.SQLIDContainer.SQLInstanceID())
}

return nil
Expand Down Expand Up @@ -581,12 +581,12 @@ FOR UPDATE
if row == nil {
return errors.AssertionFailedf(
"statement statistics not found fingerprint_id: %s, app: %s, aggregated_ts: %s, plan_hash: %d, node_id: %d",
serializedFingerprintID, key.App, aggregatedTs, dummyPlanHash, s.cfg.SQLIDContainer.SQLInstanceID())
serializedFingerprintID, key.App, aggregatedTs, serializedPlanHash, s.cfg.SQLIDContainer.SQLInstanceID())
}

if len(row) != 1 {
return errors.AssertionFailedf("unexpectedly found %d returning columns for fingerprint_id: %s, app: %s, aggregated_ts: %s, plan_hash %d, node_id: %d",
len(row), serializedFingerprintID, key.App, aggregatedTs, dummyPlanHash,
len(row), serializedFingerprintID, key.App, aggregatedTs, serializedPlanHash,
s.cfg.SQLIDContainer.SQLInstanceID())
}

Expand Down
8 changes: 0 additions & 8 deletions pkg/sql/sqlstats/persistedsqlstats/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ import (
"github.com/cockroachdb/errors"
)

const (
// TODO(azhng): currently we do not have the ability to compute a hash for
// query plan. This is currently being worked on by the SQL Queries team.
// Once we are able get consistent hash value from a query plan, we should
// update this.
dummyPlanHash = int64(0)
)

// ErrConcurrentSQLStatsCompaction is reported when two sql stats compaction
// jobs are issued concurrently. This is a sentinel error.
var ErrConcurrentSQLStatsCompaction = errors.New("another sql stats compaction job is already running")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ func TestSQLStatsJsonEncoding(t *testing.T) {
"mean": {{.Float}},
"sqDiff": {{.Float}}
},
"nodes": [{{joinInts .IntArray}}]
"nodes": [{{joinInts .IntArray}}],
"planGists": [{{joinStrings .StringArray}}]
},
"execution_statistics": {
"cnt": {{.Int64}},
Expand Down Expand Up @@ -211,6 +212,7 @@ func TestSQLStatsJsonEncoding(t *testing.T) {
"sqDiff": {{.Float}}
},
"nodes": [{{joinInts .IntArray}}]
"planGists": [{{joinStrings .StringArray}}]
},
"execution_statistics": {
"cnt": {{.Int64}},
Expand Down
34 changes: 34 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/json_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,39 @@ func (a *int64Array) encodeJSON() (json.JSON, error) {
return builder.Build(), nil
}

type stringArray []string

func (a *stringArray) decodeJSON(js json.JSON) error {
arrLen := js.Len()
for i := 0; i < arrLen; i++ {
var value jsonString
valJSON, err := js.FetchValIdx(i)
if err != nil {
return err
}
if err := value.decodeJSON(valJSON); err != nil {
return err
}
*a = append(*a, string(value))
}

return nil
}

func (a *stringArray) encodeJSON() (json.JSON, error) {
builder := json.NewArrayBuilder(len(*a))

for _, value := range *a {
jsVal, err := (*jsonString)(&value).encodeJSON()
if err != nil {
return nil, err
}
builder.Add(jsVal)
}

return builder.Build(), nil
}

type stmtFingerprintIDArray []roachpb.StmtFingerprintID

func (s *stmtFingerprintIDArray) decodeJSON(js json.JSON) error {
Expand Down Expand Up @@ -237,6 +270,7 @@ func (s *innerStmtStats) jsonFields() jsonFields {
{"rowsRead", (*numericStats)(&s.RowsRead)},
{"rowsWritten", (*numericStats)(&s.RowsWritten)},
{"nodes", (*int64Array)(&s.Nodes)},
{"planGists", (*stringArray)(&s.PlanGists)},
}
}

Expand Down
48 changes: 38 additions & 10 deletions pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
package sqlstatsutil

import (
"html/template"
"fmt"
"math/rand"
"reflect"
"strconv"
"strings"
"testing"
"text/template"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -36,12 +37,13 @@ func GetRandomizedCollectedStatementStatisticsForTest(
}

type randomData struct {
Bool bool
String string
Int64 int64
Float float64
IntArray []int64
Time time.Time
Bool bool
String string
Int64 int64
Float float64
IntArray []int64
StringArray []string
Time time.Time
}

var alphabet = []rune("abcdefghijklmkopqrstuvwxyz")
Expand All @@ -57,11 +59,22 @@ func genRandomData() randomData {
}
r.String = b.String()

// Generate a randomized array of length 5.
arrLen := 5
r.StringArray = make([]string, arrLen)
for i := 0; i < arrLen; i++ {
// Randomly generating 10-character string.
b := strings.Builder{}
for j := 0; j < 10; j++ {
b.WriteRune(alphabet[rand.Intn(26)])
}
r.StringArray[i] = b.String()
}

r.Int64 = rand.Int63()
r.Float = rand.Float64()

// Generate a randomized array of length 5.
arrLen := 5
r.IntArray = make([]int64, arrLen)
for i := 0; i < arrLen; i++ {
r.IntArray[i] = rand.Int63()
Expand All @@ -79,6 +92,13 @@ func fillTemplate(t *testing.T, tmplStr string, data randomData) string {
}
return strings.Join(strArr, ",")
}
joinStrings := func(arr []string) string {
strArr := make([]string, len(arr))
for i, val := range arr {
strArr[i] = fmt.Sprintf("%q", val)
}
return strings.Join(strArr, ",")
}
stringifyTime := func(tm time.Time) string {
s, err := tm.MarshalText()
require.NoError(t, err)
Expand All @@ -88,6 +108,7 @@ func fillTemplate(t *testing.T, tmplStr string, data randomData) string {
New("").
Funcs(template.FuncMap{
"joinInts": joinInts,
"joinStrings": joinStrings,
"stringifyTime": stringifyTime,
}).
Parse(tmplStr)
Expand Down Expand Up @@ -133,8 +154,15 @@ func fillObject(t *testing.T, val reflect.Value, data *randomData) {
case reflect.Bool:
val.SetBool(data.Bool)
case reflect.Slice:
for _, randInt := range data.IntArray {
val.Set(reflect.Append(val, reflect.ValueOf(randInt)))
switch val.Type().String() {
case "[]string":
for _, randString := range data.StringArray {
val.Set(reflect.Append(val, reflect.ValueOf(randString)))
}
case "[]int64":
for _, randInt := range data.IntArray {
val.Set(reflect.Append(val, reflect.ValueOf(randInt)))
}
}
case reflect.Struct:
switch val.Type().Name() {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/ssmemstorage/ss_mem_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (s *StmtStatsIterator) Next() bool {
Failed: stmtKey.failed,
App: s.container.appName,
Database: database,
PlanHash: stmtKey.planHash,
TransactionFingerprintID: stmtKey.transactionFingerprintID,
},
ID: stmtFingerprintID,
Expand Down
Loading

0 comments on commit 7f1395a

Please sign in to comment.