diff --git a/AUTHORS b/AUTHORS index bceeb96563f2..efa630dd6216 100644 --- a/AUTHORS +++ b/AUTHORS @@ -274,6 +274,7 @@ Lasantha Pambagoda Lasse Nordahl Lauren Hirata Lauren lhirata Lee Reilly +Leon Fattakhov Levon Lloyd Liam Gillies Lidor Carmel diff --git a/docs/generated/eventlog.md b/docs/generated/eventlog.md index 06f4087563fd..e43d665e45d6 100644 --- a/docs/generated/eventlog.md +++ b/docs/generated/eventlog.md @@ -2607,6 +2607,12 @@ contains common SQL event/execution details. | `ZigZagJoinCount` | The number of zig zag joins in the query plan. | no | | `ContentionNanos` | The duration of time in nanoseconds that the query experienced contention. | no | | `Regions` | The regions of the nodes where SQL processors ran. | no | +| `NetworkBytesSent` | The number of network bytes sent by nodes for this query. | no | +| `MaxMemUsage` | The maximum amount of memory usage by nodes for this query. | no | +| `MaxDiskUsage` | The maximum amount of disk usage by nodes for this query. | no | +| `KVBytesRead` | The number of bytes read at the KV layer for this query. | no | +| `KVRowsRead` | The number of rows read at the KV layer for this query. | no | +| `NetworkMessages` | The number of network messages sent by nodes for this query. | no | #### Common fields diff --git a/pkg/keys/BUILD.bazel b/pkg/keys/BUILD.bazel index 81e5d179559b..0e499ef75f7a 100644 --- a/pkg/keys/BUILD.bazel +++ b/pkg/keys/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//pkg/util/encoding", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", ], ) @@ -45,6 +46,7 @@ go_test( "//pkg/util/uuid", "@com_github_cockroachdb_apd_v3//:apd", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/keys/printer.go b/pkg/keys/printer.go index a4782cba27da..f2a83de88488 100644 --- a/pkg/keys/printer.go +++ b/pkg/keys/printer.go @@ -21,12 +21,26 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" ) // PrettyPrintTimeseriesKey is a hook for pretty printing a timeseries key. The // timeseries key prefix will already have been stripped off. var PrettyPrintTimeseriesKey func(key roachpb.Key) string +// QuoteOpt is a flag option used when pretty-printing keys to indicate whether +// to quote raw key values. +type QuoteOpt bool + +const ( + // QuoteRaw is the QuoteOpt used to indicate that we should use quotes when + // printing raw keys. + QuoteRaw QuoteOpt = true + // DontQuoteRaw is the QuoteOpt used to indicate that we shouldn't use quotes + // when printing raw keys. + DontQuoteRaw QuoteOpt = false +) + // DictEntry contains info on pretty-printing and pretty-scanning keys in a // region of the key space. type DictEntry struct { @@ -34,6 +48,8 @@ type DictEntry struct { prefix roachpb.Key // print the key's pretty value, key has been removed prefix data ppFunc func(valDirs []encoding.Direction, key roachpb.Key) string + // safe format the key's pretty value into a RedactableString + sfFunc func(valDirs []encoding.Direction, key roachpb.Key) redact.RedactableString // PSFunc parses the relevant prefix of the input into a roachpb.Key, // returning the remainder and the key corresponding to the consumed prefix of // 'input'. Allowed to panic on errors. @@ -56,9 +72,14 @@ type KeyComprehensionTable []struct { Entries []DictEntry } +// KeyDict drives the pretty-printing and pretty-scanning of the key space. +// This is initialized in init(). +var KeyDict KeyComprehensionTable + var ( - // ConstKeyDict translates some pretty-printed keys. - ConstKeyDict = []struct { + // ConstKeyOverrides provides overrides that define how to translate specific + // pretty-printed keys. + ConstKeyOverrides = []struct { Name string Value roachpb.Key }{ @@ -68,82 +89,6 @@ var ( {"/Meta2/Max", Meta2KeyMax}, } - // KeyDict drives the pretty-printing and pretty-scanning of the key space. - KeyDict = KeyComprehensionTable{ - {Name: "/Local", start: LocalPrefix, end: LocalMax, Entries: []DictEntry{ - {Name: "/Store", prefix: roachpb.Key(LocalStorePrefix), - ppFunc: localStoreKeyPrint, PSFunc: localStoreKeyParse}, - {Name: "/RangeID", prefix: roachpb.Key(LocalRangeIDPrefix), - ppFunc: localRangeIDKeyPrint, PSFunc: localRangeIDKeyParse}, - {Name: "/Range", prefix: LocalRangePrefix, ppFunc: localRangeKeyPrint, - PSFunc: parseUnsupported}, - {Name: "/Lock", prefix: LocalRangeLockTablePrefix, ppFunc: localRangeLockTablePrint, - PSFunc: parseUnsupported}, - }}, - {Name: "/Meta1", start: Meta1Prefix, end: Meta1KeyMax, Entries: []DictEntry{ - {Name: "", prefix: Meta1Prefix, ppFunc: print, - PSFunc: func(input string) (string, roachpb.Key) { - input = mustShiftSlash(input) - unq, err := strconv.Unquote(input) - if err != nil { - panic(err) - } - if len(unq) == 0 { - return "", Meta1Prefix - } - return "", RangeMetaKey(RangeMetaKey(MustAddr( - roachpb.Key(unq)))).AsRawKey() - }, - }}, - }, - {Name: "/Meta2", start: Meta2Prefix, end: Meta2KeyMax, Entries: []DictEntry{ - {Name: "", prefix: Meta2Prefix, ppFunc: print, - PSFunc: func(input string) (string, roachpb.Key) { - input = mustShiftSlash(input) - unq, err := strconv.Unquote(input) - if err != nil { - panic(&ErrUglifyUnsupported{err}) - } - if len(unq) == 0 { - return "", Meta2Prefix - } - return "", RangeMetaKey(MustAddr(roachpb.Key(unq))).AsRawKey() - }, - }}, - }, - {Name: "/System", start: SystemPrefix, end: SystemMax, Entries: []DictEntry{ - {Name: "/NodeLiveness", prefix: NodeLivenessPrefix, - ppFunc: decodeKeyPrint, - PSFunc: parseUnsupported, - }, - {Name: "/NodeLivenessMax", prefix: NodeLivenessKeyMax, - ppFunc: decodeKeyPrint, - PSFunc: parseUnsupported, - }, - {Name: "/StatusNode", prefix: StatusNodePrefix, - ppFunc: decodeKeyPrint, - PSFunc: parseUnsupported, - }, - {Name: "/tsd", prefix: TimeseriesPrefix, - ppFunc: timeseriesKeyPrint, - PSFunc: parseUnsupported, - }, - {Name: "/SystemSpanConfigKeys", prefix: SystemSpanConfigPrefix, - ppFunc: decodeKeyPrint, - PSFunc: parseUnsupported, - }, - }}, - {Name: "/NamespaceTable", start: NamespaceTableMin, end: NamespaceTableMax, Entries: []DictEntry{ - {Name: "", prefix: nil, ppFunc: decodeKeyPrint, PSFunc: parseUnsupported}, - }}, - {Name: "/Table", start: TableDataMin, end: TableDataMax, Entries: []DictEntry{ - {Name: "", prefix: nil, ppFunc: decodeKeyPrint, PSFunc: tableKeyParse}, - }}, - {Name: "/Tenant", start: TenantTableDataMin, end: TenantTableDataMax, Entries: []DictEntry{ - {Name: "", prefix: nil, ppFunc: tenantKeyPrint, PSFunc: tenantKeyParse}, - }}, - } - // keyofKeyDict means the key of suffix which is itself a key, // should recursively pretty print it, see issue #3228 keyOfKeyDict = []struct { @@ -541,7 +486,9 @@ func localRangeKeyPrint(valDirs []encoding.Direction, key roachpb.Key) string { // lockTablePrintLockedKey is initialized to prettyPrintInternal in init() to break an // initialization loop. -var lockTablePrintLockedKey func(valDirs []encoding.Direction, key roachpb.Key, quoteRawKeys bool) string +var lockTablePrintLockedKey func( + valDirs []encoding.Direction, key roachpb.Key, quoteRawKeys QuoteOpt, skipOverrides bool, +) string func localRangeLockTablePrint(valDirs []encoding.Direction, key roachpb.Key) string { var buf bytes.Buffer @@ -556,7 +503,7 @@ func localRangeLockTablePrint(valDirs []encoding.Direction, key roachpb.Key) str fmt.Fprintf(&buf, "/\"%x\"", key) return buf.String() } - buf.WriteString(lockTablePrintLockedKey(valDirs, lockedKey, true)) + buf.WriteString(lockTablePrintLockedKey(valDirs, lockedKey, QuoteRaw, false /*skipOverrides*/)) return buf.String() } @@ -618,7 +565,7 @@ func tenantKeyPrint(valDirs []encoding.Direction, key roachpb.Key) string { if len(key) == 0 { return fmt.Sprintf("/%s", tID) } - return fmt.Sprintf("/%s%s", tID, key.StringWithDirs(valDirs, 0)) + return fmt.Sprintf("/%s%s", tID, key.StringWithDirs(valDirs)) } // prettyPrintInternal parse key with prefix in KeyDict. @@ -628,10 +575,22 @@ func tenantKeyPrint(valDirs []encoding.Direction, key roachpb.Key) string { // type is used (see encoding.go:prettyPrintFirstValue). // If the key doesn't match any prefix in KeyDict, return its byte value with // quotation and false, or else return its human readable value and true. -func prettyPrintInternal(valDirs []encoding.Direction, key roachpb.Key, quoteRawKeys bool) string { - for _, k := range ConstKeyDict { - if key.Equal(k.Value) { - return k.Name +// +// skipOverrides provides a way to skip the usage of ConstKeyOverrides. This is +// configurable to enable recursive printing of keys (e.g. from SafeFormat) a way +// to avoid treating the remainder as a potential match for the overrides in +// ConstKeyOverrides, which in general, should not apply to the remainder of any key. +// For example: The key `/Meta1/""` would have `/Meta1` trimmed from the key, and +// prettyPrintInternal may be called to print the remainder of `""`. This would +// incorrectly be interpreted as `/Min` if we didn't skip the usage of ConstKeyOverrides. +func prettyPrintInternal( + valDirs []encoding.Direction, key roachpb.Key, quoteRawKeys QuoteOpt, skipOverrides bool, +) string { + if !skipOverrides { + for _, k := range ConstKeyOverrides { + if key.Equal(k.Value) { + return k.Name + } } } @@ -702,14 +661,210 @@ func prettyPrintInternal(valDirs []encoding.Direction, key roachpb.Key, quoteRaw // type is used (see encoding.go:prettyPrintFirstValue). // // See keysutil.UglyPrint() for an inverse. +// +// See SafeFormat for a redaction-safe implementation. func PrettyPrint(valDirs []encoding.Direction, key roachpb.Key) string { - return prettyPrintInternal(valDirs, key, true /* quoteRawKeys */) + return prettyPrintInternal(valDirs, key, QuoteRaw, false /*skipOverrides*/) +} + +// formatTableKey formats the given key in the system tenant table keyspace & redacts any +// sensitive information from the result. Sensitive information is considered any value other +// than the table ID or index ID (e.g. any index-key/value-literal). +// +// NB: It's the responsibility of the caller to prefix the printed key values with the relevant +// keyspace identifier (e.g. `/Table`). +// +// For example: +// - `/42/‹"index key"›` +// - `/42/122/‹"index key"›` +// - `/42/122/‹"index key"›/‹"some value"›` +// - `/42/122/‹"index key"›/‹"some value"›/‹"some other value"›` +func formatTableKey(valDirs []encoding.Direction, key roachpb.Key) redact.RedactableString { + buf := redact.StringBuilder{} + vals, types := encoding.PrettyPrintValuesWithTypes(valDirs, key) + prefixLength := 1 + + if len(vals) > 0 && types[0] != encoding.Int { + buf.Printf("/err:ExpectedTableID-FoundType%v", redact.Safe(types[0])) + return buf.RedactableString() + } + + // Accommodate cases where the table key contains a primary index field in + // the prefix. ex: `//` + if len(vals) > 1 && types[1] == encoding.Int { + prefixLength++ + } + + for i := 0; i < prefixLength; i++ { + buf.Printf("/%v", redact.Safe(vals[i])) + } + for _, val := range vals[prefixLength:] { + buf.Printf("/%s", val) + } + return buf.RedactableString() +} + +// formatTenantKey formats the given key for a tenant table & redacts any sensitive information +// from the result. Sensitive information is considered any value other than the TenantID, +// table ID, or index ID (e.g. any index-key/value-literal). +// +// NB: It's the responsibility of the caller to prefix the printed key values with the relevant +// keyspace identifier (e.g. `/Tenant`). +// +// For example: +// - `/5/Table/42/‹"index key"›` +// - `/5/Table/42/122/‹"index key"›` +func formatTenantKey(valDirs []encoding.Direction, key roachpb.Key) redact.RedactableString { + buf := redact.StringBuilder{} + key, tID, err := DecodeTenantPrefix(key) + if err != nil { + buf.Printf("/err:%v", err) + return buf.RedactableString() + } + + buf.Printf("/%s", tID) + if len(key) != 0 { + buf.Print(safeFormatInternal(valDirs, key)) + } + return buf.RedactableString() +} + +// SafeFormat is the generalized redaction function used to redact pretty-printed keys. +func SafeFormat(w redact.SafeWriter, valDirs []encoding.Direction, key roachpb.Key) { + w.Print(safeFormatInternal(valDirs, key)) +} + +func safeFormatInternal(valDirs []encoding.Direction, key roachpb.Key) redact.RedactableString { + for _, k := range ConstKeyOverrides { + if key.Equal(k.Value) { + return redact.Sprint(redact.Safe(k.Name)) + } + } + + helper := func(key roachpb.Key, isRemainder bool) redact.RedactableString { + var b redact.StringBuilder + for _, k := range KeyDict { + if key.Compare(k.start) >= 0 && (k.end == nil || key.Compare(k.end) <= 0) { + if k.end != nil && k.end.Compare(key) == 0 { + b.Print(redact.Safe(k.Name)) + b.Print(redact.Safe("/Max")) + return b.RedactableString() + } + + for _, e := range k.Entries { + if bytes.HasPrefix(key, e.prefix) && e.sfFunc != nil { + b.Print(redact.Safe(k.Name)) + key = key[len(e.prefix):] + b.Print(redact.Safe(e.Name)) + b.Print(e.sfFunc(valDirs, key)) + return b.RedactableString() + } + } + } + } + // If we reach this point, the key is not recognized based on KeyDict, or no `sfFunc` + // is defined for the keyspace. Therefore, we fall back to the standard pretty print + // functionality and avoid marking safe from a redaction perspective. + // NB: This will lead to the entirety of the pretty-printed key to be redactable, e.g: + // Unredacted: `‹/SomeKeyspace/42›` + // Redacted: `‹x›` + return redact.Sprint(prettyPrintInternal(valDirs, key, QuoteRaw, isRemainder /*skipOverrides*/)) + } + + for _, k := range keyOfKeyDict { + if bytes.HasPrefix(key, k.prefix) { + key = key[len(k.prefix):] + str := helper(key, true /* isRemainder */) + if len(str) > 0 && strings.Index(str.StripMarkers(), "/") != 0 { + return redact.Sprintf("%v/%v", redact.Sprint(k.name), str) + } + return redact.Sprintf("%v%v", redact.Sprint(k.name), str) + } + } + return helper(key, false /* isRemainder */) } func init() { roachpb.PrettyPrintKey = PrettyPrint + roachpb.SafeFormatKey = SafeFormat roachpb.PrettyPrintRange = PrettyPrintRange lockTablePrintLockedKey = prettyPrintInternal + + // KeyDict drives the pretty-printing and pretty-scanning of the key space. + KeyDict = KeyComprehensionTable{ + {Name: "/Local", start: LocalPrefix, end: LocalMax, Entries: []DictEntry{ + {Name: "/Store", prefix: roachpb.Key(LocalStorePrefix), + ppFunc: localStoreKeyPrint, PSFunc: localStoreKeyParse}, + {Name: "/RangeID", prefix: roachpb.Key(LocalRangeIDPrefix), + ppFunc: localRangeIDKeyPrint, PSFunc: localRangeIDKeyParse}, + {Name: "/Range", prefix: LocalRangePrefix, ppFunc: localRangeKeyPrint, + PSFunc: parseUnsupported}, + {Name: "/Lock", prefix: LocalRangeLockTablePrefix, ppFunc: localRangeLockTablePrint, + PSFunc: parseUnsupported}, + }}, + {Name: "/Meta1", start: Meta1Prefix, end: Meta1KeyMax, Entries: []DictEntry{ + {Name: "", prefix: Meta1Prefix, ppFunc: print, + PSFunc: func(input string) (string, roachpb.Key) { + input = mustShiftSlash(input) + unq, err := strconv.Unquote(input) + if err != nil { + panic(err) + } + if len(unq) == 0 { + return "", Meta1Prefix + } + return "", RangeMetaKey(RangeMetaKey(MustAddr( + roachpb.Key(unq)))).AsRawKey() + }, + }}, + }, + {Name: "/Meta2", start: Meta2Prefix, end: Meta2KeyMax, Entries: []DictEntry{ + {Name: "", prefix: Meta2Prefix, ppFunc: print, + PSFunc: func(input string) (string, roachpb.Key) { + input = mustShiftSlash(input) + unq, err := strconv.Unquote(input) + if err != nil { + panic(&ErrUglifyUnsupported{err}) + } + if len(unq) == 0 { + return "", Meta2Prefix + } + return "", RangeMetaKey(MustAddr(roachpb.Key(unq))).AsRawKey() + }, + }}, + }, + {Name: "/System", start: SystemPrefix, end: SystemMax, Entries: []DictEntry{ + {Name: "/NodeLiveness", prefix: NodeLivenessPrefix, + ppFunc: decodeKeyPrint, + PSFunc: parseUnsupported, + }, + {Name: "/NodeLivenessMax", prefix: NodeLivenessKeyMax, + ppFunc: decodeKeyPrint, + PSFunc: parseUnsupported, + }, + {Name: "/StatusNode", prefix: StatusNodePrefix, + ppFunc: decodeKeyPrint, + PSFunc: parseUnsupported, + }, + {Name: "/tsd", prefix: TimeseriesPrefix, + ppFunc: timeseriesKeyPrint, + PSFunc: parseUnsupported, + }, + {Name: "/SystemSpanConfigKeys", prefix: SystemSpanConfigPrefix, + ppFunc: decodeKeyPrint, + PSFunc: parseUnsupported, + }, + }}, + {Name: "/NamespaceTable", start: NamespaceTableMin, end: NamespaceTableMax, Entries: []DictEntry{ + {Name: "", prefix: nil, ppFunc: decodeKeyPrint, PSFunc: parseUnsupported}, + }}, + {Name: "/Table", start: TableDataMin, end: TableDataMax, Entries: []DictEntry{ + {Name: "", prefix: nil, ppFunc: decodeKeyPrint, PSFunc: tableKeyParse, sfFunc: formatTableKey}, + }}, + {Name: "/Tenant", start: TenantTableDataMin, end: TenantTableDataMax, Entries: []DictEntry{ + {Name: "", prefix: nil, ppFunc: tenantKeyPrint, PSFunc: tenantKeyParse, sfFunc: formatTenantKey}, + }}, + } } // PrettyPrintRange pretty prints a compact representation of a key range. The @@ -728,7 +883,7 @@ func PrettyPrintRange(start, end roachpb.Key, maxChars int) string { if maxChars < 8 { maxChars = 8 } - prettyStart := prettyPrintInternal(nil /* valDirs */, start, false /* quoteRawKeys */) + prettyStart := prettyPrintInternal(nil /* valDirs */, start, DontQuoteRaw, false /*skipOverrides*/) if len(end) == 0 { if len(prettyStart) <= maxChars { return prettyStart @@ -737,7 +892,7 @@ func PrettyPrintRange(start, end roachpb.Key, maxChars int) string { b.WriteRune('…') return b.String() } - prettyEnd := prettyPrintInternal(nil /* valDirs */, end, false /* quoteRawKeys */) + prettyEnd := prettyPrintInternal(nil /* valDirs */, end, DontQuoteRaw, false /*skipOverrides*/) i := 0 // Find the common prefix. for ; i < len(prettyStart) && i < len(prettyEnd) && prettyStart[i] == prettyEnd[i]; i++ { diff --git a/pkg/keys/printer_test.go b/pkg/keys/printer_test.go index 0c4d7efcda12..fa4bdb576ba1 100644 --- a/pkg/keys/printer_test.go +++ b/pkg/keys/printer_test.go @@ -30,6 +30,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/keysutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" + "github.com/stretchr/testify/require" ) func lockTableKey(key roachpb.Key) roachpb.Key { @@ -37,6 +39,166 @@ func lockTableKey(key roachpb.Key) roachpb.Key { return k } +func TestSafeFormatKey_SystemTenant(t *testing.T) { + tenSysCodec := keys.SystemSQLCodec + testCases := []struct { + name string + key roachpb.Key + exp string + }{ + { + "table with string index key", + roachpb.Key(makeKey(tenSysCodec.TablePrefix(42), + encoding.EncodeVarintAscending(nil, 1222), + encoding.EncodeStringAscending(nil, "handsome man"))), + `/Table/42/1222/‹"handsome man"›`, + }, + { + "multi-column value", + roachpb.Key(makeKey(tenSysCodec.TablePrefix(42), + encoding.EncodeStringAscending(nil, "California"), + encoding.EncodeStringAscending(nil, "Los Angeles"))), + `/Table/42/‹"California"›/‹"Los Angeles"›`, + }, + { + "table with decimal index key", + roachpb.Key(makeKey(tenSysCodec.IndexPrefix(84, 2), + encoding.EncodeDecimalAscending(nil, apd.New(1234, -2)))), + `/Table/84/2/‹12.34›`, + }, + { + "namespace table handled as standard system tenant table", + keys.NamespaceTableMin, + "/Table/30", + }, + { + "table index without index key", + tenSysCodec.IndexPrefix(42, 5), + "/Table/42/5", + }, + { + "handles infinity values", + makeKey(tenSysCodec.TablePrefix(42), + encoding.EncodeFloatAscending(nil, math.Inf(1))), + "/Table/42/‹+Inf›", + }, + { + "handles null values", + makeKey(tenSysCodec.TablePrefix(42), + encoding.EncodeNullAscending(nil)), + "/Table/42/‹NULL›", + }, + { + "handles PrefixEnd", + roachpb.Key(makeKey(tenSysCodec.TablePrefix(42), + encoding.EncodeBitArrayAscending(nil, bitarray.MakeZeroBitArray(64)), + )).PrefixEnd(), + "/Table/42/‹B0000000000000000000000000000000000000000000000000000000000000000›/‹PrefixEnd›", + }, + { + "handles /Table/Max", + keys.TableDataMax, + "/Table/Max", + }, + { + "handles unknowns", + makeKey(tenSysCodec.TablePrefix(42), []byte{0x12, 'a', 0x00, 0x03}), + `/Table/42/‹???›`, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + require.Equal(t, redact.RedactableString(test.exp), redact.Sprint(test.key)) + }) + } +} + +func TestSafeFormatKey_UnsupportedKeyspace(t *testing.T) { + ten5Codec := keys.MakeSQLCodec(roachpb.MakeTenantID(5)) + testCases := []struct { + name string + key roachpb.Key + exp string + }{ + { + "key-spaces without a safe format function implementation are fully redacted", + keys.MakeRangeKeyPrefix(roachpb.RKey(ten5Codec.TablePrefix(42))), + `‹/Local/Range/Tenant/5/Table/42›`, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + require.Equal(t, redact.RedactableString(test.exp), redact.Sprint(test.key)) + }) + } +} + +func TestSafeFormatKey_AppTenant(t *testing.T) { + ten5Codec := keys.MakeSQLCodec(roachpb.MakeTenantID(5)) + testCases := []struct { + name string + key roachpb.Key + exp string + }{ + { + "table with string index key", + roachpb.Key(makeKey(ten5Codec.IndexPrefix(42, 122), + encoding.EncodeStringAscending(nil, "handsome man"))), + `/Tenant/5/Table/42/122/‹"handsome man"›`, + }, + { + "table with decimal index key", + roachpb.Key(makeKey(ten5Codec.IndexPrefix(84, 2), + encoding.EncodeDecimalAscending(nil, apd.New(1234, -2)))), + `/Tenant/5/Table/84/2/‹12.34›`, + }, + { + "multi-column value", + roachpb.Key(makeKey(ten5Codec.TablePrefix(42), + encoding.EncodeStringAscending(nil, "California"), + encoding.EncodeStringAscending(nil, "Los Angeles"))), + `/Tenant/5/Table/42/‹"California"›/‹"Los Angeles"›`, + }, + { + "table index without index key", + ten5Codec.IndexPrefix(42, 5), + "/Tenant/5/Table/42/5", + }, + { + "handles infinity values", + makeKey(ten5Codec.TablePrefix(42), + encoding.EncodeFloatAscending(nil, math.Inf(1))), + "/Tenant/5/Table/42/‹+Inf›", + }, + { + "handles null values", + makeKey(ten5Codec.IndexPrefix(42, 2), + encoding.EncodeNullAscending(nil)), + "/Tenant/5/Table/42/2/‹NULL›", + }, + { + "handles PrefixEnd", + roachpb.Key(makeKey(ten5Codec.TablePrefix(42), + encoding.EncodeBitArrayAscending(nil, bitarray.MakeZeroBitArray(64)), + )).PrefixEnd(), + "/Tenant/5/Table/42/‹B0000000000000000000000000000000000000000000000000000000000000000›/‹PrefixEnd›", + }, + { + "handles unknowns", + makeKey(ten5Codec.TablePrefix(42), []byte{0x12, 'a', 0x00, 0x03}), + `/Tenant/5/Table/42/‹???›`, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + require.Equal(t, redact.RedactableString(test.exp), redact.Sprint(test.key)) + }) + } +} + func TestPrettyPrint(t *testing.T) { tenSysCodec := keys.SystemSQLCodec ten5Codec := keys.MakeSQLCodec(roachpb.MakeTenantID(5)) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 96a63e9eb9cf..05e3dcf83999 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -4396,7 +4396,7 @@ func TestDistSenderSlowLogMessage(t *testing.T) { br.Error = roachpb.NewError(errors.New("boom")) desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")} { - exp := `have been waiting 8.16s (120 attempts) for RPC Get [‹"a"›,‹/Min›) to` + + exp := `have been waiting 8.16s (120 attempts) for RPC Get [‹"a"›,/Min) to` + ` r9:‹{x-z}› [, next=0, gen=0]; resp: ‹(err: boom)›` var s redact.StringBuilder slowRangeRPCWarningStr(&s, ba, dur, attempts, desc, nil /* err */, br) diff --git a/pkg/kv/kvserver/gc/BUILD.bazel b/pkg/kv/kvserver/gc/BUILD.bazel index 8f04fb16495a..53bcb80bd59c 100644 --- a/pkg/kv/kvserver/gc/BUILD.bazel +++ b/pkg/kv/kvserver/gc/BUILD.bazel @@ -35,6 +35,7 @@ go_test( size = "large", srcs = [ "data_distribution_test.go", + "gc_int_test.go", "gc_iterator_test.go", "gc_old_test.go", "gc_random_test.go", @@ -44,11 +45,20 @@ go_test( args = ["-test.timeout=895s"], embed = [":gc"], deps = [ + "//pkg/base", "//pkg/keys", + "//pkg/kv", "//pkg/kv/kvserver/rditer", "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/server/serverpb", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/kv/kvserver/gc/gc_int_test.go b/pkg/kv/kvserver/gc/gc_int_test.go new file mode 100644 index 000000000000..368b7ccb78f3 --- /dev/null +++ b/pkg/kv/kvserver/gc/gc_int_test.go @@ -0,0 +1,220 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc_test + +import ( + "context" + gosql "database/sql" + "fmt" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func init() { + randutil.SeedForTests() + securityassets.SetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) +} + +func TestEndToEndGC(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + manualClock := hlc.NewHybridManualClock() + tc := testcluster.NewTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + WallClock: manualClock, + }, + }, + DisableDefaultTestTenant: true, + }, + }) + tc.Start(t) + defer tc.Stopper().Stop(context.Background()) + require.NoError(t, tc.WaitForFullReplication()) + + sqlDb := tc.ServerConn(0) + kvDb := tc.Server(0).DB() + statusServer := tc.Server(0).StatusServer().(serverpb.StatusServer) + + execOrFatal := func(t *testing.T, db *gosql.DB, stmt string, args ...interface{}) { + t.Helper() + _, err := db.Exec(stmt, args...) + require.NoError(t, err, "failed to execute %s", stmt) + } + + getTableRangeIDs := func(t *testing.T, db *gosql.DB) ids { + t.Helper() + rows, err := db.Query("with r as (show ranges from table kv) select range_id from r order by start_key") + require.NoError(t, err, "failed to query ranges") + var rangeIDs []int64 + for rows.Next() { + var rangeID int64 + require.NoError(t, rows.Scan(&rangeID), "failed to read row with range id") + rangeIDs = append(rangeIDs, rangeID) + } + return rangeIDs + } + + readSomeKeys := func(t *testing.T, db *gosql.DB) []int64 { + t.Helper() + var ids []int64 + rows, err := db.Query("select k from kv limit 5") + require.NoError(t, err, "failed to query kv data") + for rows.Next() { + var id int64 + require.NoError(t, rows.Scan(&id), "failed to scan value") + ids = append(ids, id) + } + return ids + } + + getRangeInfo := func(t *testing.T, rangeID int64, db *gosql.DB) (startKey, endKey []byte) { + t.Helper() + row := db.QueryRow("select start_key, end_key from crdb_internal.ranges_no_leases where range_id=$1", + rangeID) + require.NoError(t, row.Err(), "failed to query range info") + require.NoError(t, row.Scan(&startKey, &endKey), "failed to scan range info") + return startKey, endKey + } + + deleteRangeDataWithRangeTombstone := func(t *testing.T, rangeIDs ids, kvDb *kv.DB, db *gosql.DB) { + t.Helper() + for _, id := range rangeIDs { + start, end := getRangeInfo(t, id, db) + require.NoError(t, kvDb.DelRangeUsingTombstone(context.Background(), start, end), + "failed to delete range with tombstone") + } + } + + getRangeStats := func(t *testing.T, rangeID int64) enginepb.MVCCStats { + t.Helper() + rr := &serverpb.RangesRequest{ + NodeId: "1", + RangeIDs: []roachpb.RangeID{roachpb.RangeID(rangeID)}, + } + infos, err := statusServer.Ranges(ctx, rr) + require.NoError(t, err, "failed to query range info") + return *infos.Ranges[0].State.Stats + } + + findNonEmptyRanges := func(t *testing.T, rangeIDs ids) (nonEmptyRangeIDs ids) { + t.Helper() + for _, id := range rangeIDs { + stats := getRangeStats(t, id) + t.Logf("range %d stats: %s", id, &stats) + // Test can't give meaningful results if stats contain estimates. + // Test also doesn't perform any operations that result in estimated stats + // being created, so it is a failure in the environment if that happens. + require.Zerof(t, stats.ContainsEstimates, "we must not have estimates") + if stats.RangeKeyCount > 0 || stats.KeyCount > 0 { + nonEmptyRangeIDs = append(nonEmptyRangeIDs, id) + } + } + return nonEmptyRangeIDs + } + + rng, _ := randutil.NewTestRand() + + // Set closed timestamp duration, this is needed to avoid waiting for default + // 2 min interval for protected timestamp to get bumped and letting GC collect + // old values. + execOrFatal(t, sqlDb, `SET CLUSTER SETTING kv.protectedts.poll_interval = '5s'`) + + execOrFatal(t, sqlDb, `create table kv (k BIGINT NOT NULL PRIMARY KEY, v BYTES NOT NULL)`) + + for i := 0; i < 1000; i++ { + execOrFatal(t, sqlDb, "upsert into kv values ($1, $2)", rng.Int63(), "hello") + } + + require.NotEmptyf(t, readSomeKeys(t, sqlDb), "found no keys in table") + + rangeIDs := getTableRangeIDs(t, sqlDb) + require.NotEmpty(t, rangeIDs, "failed to query ranges belonging to table") + + nonEmptyRangeIDs := findNonEmptyRanges(t, rangeIDs) + require.NotEmptyf(t, nonEmptyRangeIDs, "all table ranges are empty according to MVCCStats") + + deleteRangeDataWithRangeTombstone(t, rangeIDs, kvDb, sqlDb) + + require.Empty(t, readSomeKeys(t, sqlDb), "table still contains data after range deletion") + + // Push clock forward to make all data eligible for GC. Mind that this is not + // enough just to push the clock, we need to wait for protected timestamp to + // be pushed by periodic task. + manualClock.Increment((time.Hour * 50).Nanoseconds()) + + // Keep pushing replicas through the queue and checking that ranges were + // cleared up. We do both operations in the retry loop because we are dealing + // with two async processes: 1 - protected timestamp update, 2 - queue + // processing as we could only enqueue, but not force GC op. + enqueueSucceeded := false + testutils.SucceedsSoon(t, func() error { + tableRangeIDs := getTableRangeIDs(t, sqlDb) + t.Logf("pushing kv table ranges through mvcc gc queue: %s", tableRangeIDs) + + for _, id := range tableRangeIDs { + _, err := sqlDb.Exec(`SELECT crdb_internal.kv_enqueue_replica($1, 'mvccGC', true)`, id) + if err != nil { + t.Logf("failed to enqueue range to mvcc gc queue: %s", err) + } + enqueueSucceeded = enqueueSucceeded || err == nil + } + + // Enqueue operations could fail if ranges change underneath, test will + // report different error if we didn't enqueue replicas at least once. + // This is giving us a better visibility if failure is because of GC queue + // misbehaving and not actual GC behaviour test is checking. + if !enqueueSucceeded { + return errors.New("failed to enqueue replicas to GC queue") + } + + nonEmptyRangeIDs := findNonEmptyRanges(t, tableRangeIDs) + if len(nonEmptyRangeIDs) > 0 { + return errors.New("not all ranges were cleared") + } + return nil + }) +} + +type ids []int64 + +func (r ids) String() string { + s := make([]string, len(r)) + for i, r := range r { + s[i] = fmt.Sprintf("%d", r) + } + return fmt.Sprintf("[%s]", strings.Join(s, ",")) +} diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index a40758f343eb..d0dfdbea5c1f 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -413,8 +413,8 @@ func (r *Replica) adminSplitWithDescriptor( } extra += splitSnapshotWarningStr(r.RangeID, r.RaftStatus()) - log.Infof(ctx, "initiating a split of this range at key %s [r%d] (%s)%s", - splitKey.StringWithDirs(nil /* valDirs */, 50 /* maxLen */), rightRangeID, reason, extra) + log.Infof(ctx, "initiating a split of this range at key %v [r%d] (%s)%s", + splitKey, rightRangeID, reason, extra) if err := r.store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return splitTxnAttempt(ctx, r.store, txn, rightRangeID, splitKey, args.ExpirationTime, desc, reason) diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index a287fe22cb84..36af61d49520 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -75,13 +75,24 @@ var ( // PrettyPrintKey prints a key in human readable format. It's // implemented in package git.com/cockroachdb/cockroach/keys to avoid - // package circle import. + // circular package import dependencies (see keys.PrettyPrint for + // implementation). // valDirs correspond to the encoding direction of each encoded value // in the key (if known). If left unspecified, the default encoding // direction for each value type is used (see // encoding.go:prettyPrintFirstValue). + // See SafeFormatKey for a redaction-safe implementation. PrettyPrintKey func(valDirs []encoding.Direction, key Key) string + // SafeFormatKey is the generalized redaction function used to redact pretty + // printed keys. It's implemented in git.com/cockroachdb/cockroach/keys to + // avoid circular package import dependencies (see keys.SafeFormat for + // implementation). + // valDirs correspond to the encoding direction of each encoded value + // in the key (if known). If left unspecified, the default encoding + // direction for each value type is used (see encoding.go:prettyPrintFirstValue). + SafeFormatKey func(w redact.SafeWriter, valDirs []encoding.Direction, key Key) + // PrettyPrintRange prints a key range in human readable format. It's // implemented in package git.com/cockroachdb/cockroach/keys to avoid // package circle import. @@ -133,13 +144,18 @@ func (rk RKey) PrefixEnd() RKey { return RKey(keysbase.PrefixEnd(rk)) } +// SafeFormat - see Key.SafeFormat. +func (rk RKey) SafeFormat(w redact.SafePrinter, r rune) { + rk.AsRawKey().SafeFormat(w, r) +} + func (rk RKey) String() string { return Key(rk).String() } -// StringWithDirs - see Key.String.WithDirs. -func (rk RKey) StringWithDirs(valDirs []encoding.Direction, maxLen int) string { - return Key(rk).StringWithDirs(valDirs, maxLen) +// StringWithDirs - see Key.StringWithDirs. +func (rk RKey) StringWithDirs(valDirs []encoding.Direction) string { + return Key(rk).StringWithDirs(valDirs) } // Key is a custom type for a byte string in proto @@ -202,32 +218,32 @@ func (k Key) Compare(b Key) int { return bytes.Compare(k, b) } +// SafeFormat implements the redact.SafeFormatter interface. +func (k Key) SafeFormat(w redact.SafePrinter, _ rune) { + SafeFormatKey(w, nil /* valDirs */, k) +} + // String returns a string-formatted version of the key. func (k Key) String() string { - return k.StringWithDirs(nil /* valDirs */, 0 /* maxLen */) + return redact.StringWithoutMarkers(k) } // StringWithDirs is the value encoding direction-aware version of String. // // Args: -// valDirs: The direction for the key's components, generally needed for correct // -// decoding. If nil, the values are pretty-printed with default encoding -// direction. -// -// maxLen: If not 0, only the first maxLen chars from the decoded key are +// valDirs: The direction for the key's components, generally needed for +// correct decoding. If nil, the values are pretty-printed with default +// encoding direction. // // returned, plus a "..." suffix. -func (k Key) StringWithDirs(valDirs []encoding.Direction, maxLen int) string { +func (k Key) StringWithDirs(valDirs []encoding.Direction) string { var s string if PrettyPrintKey != nil { s = PrettyPrintKey(valDirs, k) } else { s = fmt.Sprintf("%q", []byte(k)) } - if maxLen != 0 && len(s) > maxLen { - return s[0:maxLen] + "..." - } return s } diff --git a/pkg/roachpb/string_test.go b/pkg/roachpb/string_test.go index c85b2825e1f8..680c7e4e24cc 100644 --- a/pkg/roachpb/string_test.go +++ b/pkg/roachpb/string_test.go @@ -94,12 +94,6 @@ func TestBatchRequestString(t *testing.T) { act := ba.String() require.Equal(t, exp, act) } - - { - exp := `Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›),... 76 skipped ..., Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), Get [‹/Min›,‹/Min›), EndTxn(abort) [‹/Min›], [txn: 6ba7b810], [wait-policy: Error], [can-forward-ts], [bounded-staleness, min_ts_bound: 0.000000001,0, min_ts_bound_strict, max_ts_bound: 0.000000002,0]` - act := redact.Sprint(ba) - require.EqualValues(t, exp, act) - } } func TestKeyString(t *testing.T) { diff --git a/pkg/sql/catalog/catalogkeys/keys.go b/pkg/sql/catalog/catalogkeys/keys.go index 43de53126393..62d66e12885d 100644 --- a/pkg/sql/catalog/catalogkeys/keys.go +++ b/pkg/sql/catalog/catalogkeys/keys.go @@ -84,7 +84,7 @@ func IndexKeyValDirs(index catalog.Index) []encoding.Direction { // currently true for the fields we care about stripping (the table and index // ID). func PrettyKey(valDirs []encoding.Direction, key roachpb.Key, skip int) string { - p := key.StringWithDirs(valDirs, 0 /* maxLen */) + p := key.StringWithDirs(valDirs) for i := 0; i <= skip; i++ { n := strings.IndexByte(p[1:], '/') if n == -1 { diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index f65762aa550c..6b0d5a527978 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/privilege" @@ -395,12 +396,12 @@ func (p *planner) maybeLogStatementInternal( requiredTimeElapsed = 0 } if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) { - var contentionNanos int64 + var stats execstats.QueryLevelStats if queryLevelStats, ok := p.instrumentation.GetQueryLevelStats(); ok { - contentionNanos = queryLevelStats.ContentionTime.Nanoseconds() + stats = *queryLevelStats } - contentionNanos = telemetryMetrics.getContentionTime(contentionNanos) + stats = telemetryMetrics.getQueryLevelStats(stats) skippedQueries := telemetryMetrics.resetSkippedQueryCount() sampledQuery := eventpb.SampledQuery{ @@ -437,8 +438,14 @@ func (p *planner) maybeLogStatementInternal( InvertedJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.InvertedJoin]), ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]), ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]), - ContentionNanos: contentionNanos, + ContentionNanos: stats.ContentionTime.Nanoseconds(), Regions: p.curPlan.instrumentation.regions, + NetworkBytesSent: stats.NetworkBytesSent, + MaxMemUsage: stats.MaxMemUsage, + MaxDiskUsage: stats.MaxDiskUsage, + KVBytesRead: stats.KVBytesRead, + KVRowsRead: stats.KVRowsRead, + NetworkMessages: stats.NetworkMessages, } p.logOperationalEventsOnlyExternally(ctx, &sampledQuery) } else { diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index 26360e9ef49b..10141aa2dd9d 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -54,10 +55,8 @@ type TelemetryLoggingTestingKnobs struct { // getTimeNow allows tests to override the timeutil.Now() function used // when updating rolling query counts. getTimeNow func() time.Time - // getContentionNanos allows tests to override the recorded contention time - // for the query. Used to stub non-zero values to populate the log's contention - // time field. - getContentionNanos func() int64 + // getQueryLevelMetrics allows tests to override the recorded query level stats. + getQueryLevelStats func() execstats.QueryLevelStats // getTracingStatus allows tests to override whether the current query has tracing // enabled or not. Queries with tracing enabled are always sampled to telemetry. getTracingStatus func() bool @@ -91,11 +90,13 @@ func (t *TelemetryLoggingMetrics) maybeUpdateLastEmittedTime( return false } -func (t *TelemetryLoggingMetrics) getContentionTime(contentionTimeInNanoseconds int64) int64 { - if t.Knobs != nil && t.Knobs.getContentionNanos != nil { - return t.Knobs.getContentionNanos() +func (t *TelemetryLoggingMetrics) getQueryLevelStats( + queryLevelStats execstats.QueryLevelStats, +) execstats.QueryLevelStats { + if t.Knobs != nil && t.Knobs.getQueryLevelStats != nil { + return t.Knobs.getQueryLevelStats() } - return contentionTimeInNanoseconds + return queryLevelStats } func (t *TelemetryLoggingMetrics) isTracing(_ *tracing.Span, tracingEnabled bool) bool { diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index a8367eaa4669..9e58752fa0f6 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -22,68 +22,17 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/logtestutils" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) -type stubTime struct { - syncutil.RWMutex - t time.Time -} - -func (s *stubTime) setTime(t time.Time) { - s.RWMutex.Lock() - defer s.RWMutex.Unlock() - s.t = t -} - -func (s *stubTime) TimeNow() time.Time { - s.RWMutex.RLock() - defer s.RWMutex.RUnlock() - return s.t -} - -type stubQueryMetrics struct { - syncutil.RWMutex - contentionNanos int64 -} - -func (s *stubQueryMetrics) setContentionNanos(t int64) { - s.RWMutex.Lock() - defer s.RWMutex.Unlock() - s.contentionNanos = t -} - -func (s *stubQueryMetrics) ContentionNanos() int64 { - s.RWMutex.RLock() - defer s.RWMutex.RUnlock() - return s.contentionNanos -} - -type stubTracingStatus struct { - syncutil.RWMutex - isTracing bool -} - -func (s *stubTracingStatus) setTracingStatus(t bool) { - s.RWMutex.Lock() - defer s.RWMutex.Unlock() - s.isTracing = t -} - -func (s *stubTracingStatus) TracingStatus() bool { - s.RWMutex.RLock() - defer s.RWMutex.RUnlock() - return s.isTracing -} - // TestTelemetryLogging verifies that telemetry events are logged to the telemetry log // and are sampled according to the configured sample rate. func TestTelemetryLogging(t *testing.T) { @@ -94,9 +43,9 @@ func TestTelemetryLogging(t *testing.T) { cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t) defer cleanup() - st := stubTime{} - sqm := stubQueryMetrics{} - sts := stubTracingStatus{} + st := logtestutils.StubTime{} + sqm := logtestutils.StubQueryStats{} + sts := logtestutils.StubTracingStatus{} s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ @@ -107,7 +56,7 @@ func TestTelemetryLogging(t *testing.T) { }, TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{ getTimeNow: st.TimeNow, - getContentionNanos: sqm.ContentionNanos, + getQueryLevelStats: sqm.QueryLevelStats, getTracingStatus: sts.TracingStatus, }, }, @@ -159,7 +108,7 @@ func TestTelemetryLogging(t *testing.T) { expectedRead bool expectedWrite bool expectedErr string // Empty string means no error is expected. - contentionNanos int64 + queryLevelStats execstats.QueryLevelStats enableTracing bool }{ { @@ -180,8 +129,16 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: false, expectedRead: false, expectedWrite: false, - contentionNanos: 0, - enableTracing: false, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 0 * time.Nanosecond, + NetworkBytesSent: 1, + MaxMemUsage: 2, + MaxDiskUsage: 3, + KVBytesRead: 4, + KVRowsRead: 5, + NetworkMessages: 6, + }, + enableTracing: false, }, { // Test case with statement that is of type DML. @@ -199,8 +156,10 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: false, expectedRead: false, expectedWrite: false, - contentionNanos: 1, - enableTracing: false, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 1 * time.Nanosecond, + }, + enableTracing: false, }, { // Test case with statement that is of type DML. @@ -219,8 +178,13 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, - contentionNanos: 2, - enableTracing: false, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 2 * time.Nanosecond, + NetworkBytesSent: 1, + MaxMemUsage: 2, + NetworkMessages: 6, + }, + enableTracing: false, }, { // Test case with statement that is of type DML. @@ -238,8 +202,16 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, - contentionNanos: 3, - enableTracing: false, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 3 * time.Nanosecond, + NetworkBytesSent: 1124, + MaxMemUsage: 132, + MaxDiskUsage: 3, + KVBytesRead: 4, + KVRowsRead: 2345, + NetworkMessages: 36, + }, + enableTracing: false, }, { // Test case with a full scan. @@ -257,8 +229,15 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, - contentionNanos: 0, - enableTracing: false, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 0 * time.Nanosecond, + NetworkBytesSent: 124235, + MaxMemUsage: 12412, + MaxDiskUsage: 3, + KVRowsRead: 5, + NetworkMessages: 6235, + }, + enableTracing: false, }, { // Test case with a write. @@ -276,8 +255,14 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: true, - contentionNanos: 0, - enableTracing: false, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 0 * time.Nanosecond, + NetworkBytesSent: 1, + KVBytesRead: 4, + KVRowsRead: 5, + NetworkMessages: 6, + }, + enableTracing: false, }, // Not of type DML so not sampled { @@ -314,8 +299,16 @@ func TestTelemetryLogging(t *testing.T) { expectedStatsAvailable: true, expectedRead: true, expectedWrite: false, - contentionNanos: 2, - enableTracing: true, + queryLevelStats: execstats.QueryLevelStats{ + ContentionTime: 2 * time.Nanosecond, + NetworkBytesSent: 10, + MaxMemUsage: 20, + MaxDiskUsage: 33, + KVBytesRead: 24, + KVRowsRead: 55, + NetworkMessages: 66, + }, + enableTracing: true, }, } @@ -323,9 +316,9 @@ func TestTelemetryLogging(t *testing.T) { telemetryMaxEventFrequency.Override(context.Background(), &s.ClusterSettings().SV, tc.stubMaxEventFrequency) for _, execTimestamp := range tc.execTimestampsSeconds { stubTime := timeutil.FromUnixMicros(int64(execTimestamp * 1e6)) - st.setTime(stubTime) - sqm.setContentionNanos(tc.contentionNanos) - sts.setTracingStatus(tc.enableTracing) + st.SetTime(stubTime) + sqm.SetQueryLevelStats(tc.queryLevelStats) + sts.SetTracingStatus(tc.enableTracing) _, err := db.DB.ExecContext(context.Background(), tc.query) if err != nil && tc.expectedErr == "" { t.Errorf("unexpected error executing query `%s`: %v", tc.query, err) @@ -490,13 +483,61 @@ func TestTelemetryLogging(t *testing.T) { } } contentionNanos := regexp.MustCompile("\"ContentionNanos\":[0-9]*") - if tc.contentionNanos > 0 && !contentionNanos.MatchString(e.Message) { + if tc.queryLevelStats.ContentionTime.Nanoseconds() > 0 && !contentionNanos.MatchString(e.Message) { // If we have contention, we expect the ContentionNanos field to be populated. t.Errorf("expected to find ContentionNanos but none was found") - } else if tc.contentionNanos == 0 && contentionNanos.MatchString(e.Message) { + } else if tc.queryLevelStats.ContentionTime.Nanoseconds() == 0 && contentionNanos.MatchString(e.Message) { // If we do not have contention, expect no ContentionNanos field. t.Errorf("expected no ContentionNanos field, but was found") } + networkBytesSent := regexp.MustCompile("\"NetworkBytesSent\":[0-9]*") + if tc.queryLevelStats.NetworkBytesSent > 0 && !networkBytesSent.MatchString(e.Message) { + // If we have sent network bytes, we expect the NetworkBytesSent field to be populated. + t.Errorf("expected to find NetworkBytesSent but none was found") + } else if tc.queryLevelStats.NetworkBytesSent == 0 && networkBytesSent.MatchString(e.Message) { + // If we have not sent network bytes, expect no NetworkBytesSent field. + t.Errorf("expected no NetworkBytesSent field, but was found") + } + maxMemUsage := regexp.MustCompile("\"MaxMemUsage\":[0-9]*") + if tc.queryLevelStats.MaxMemUsage > 0 && !maxMemUsage.MatchString(e.Message) { + // If we have a max memory usage, we expect the MaxMemUsage field to be populated. + t.Errorf("expected to find MaxMemUsage but none was found") + } else if tc.queryLevelStats.MaxMemUsage == 0 && maxMemUsage.MatchString(e.Message) { + // If we do not have a max memory usage, expect no MaxMemUsage field. + t.Errorf("expected no MaxMemUsage field, but was found") + } + maxDiskUsage := regexp.MustCompile("\"MaxDiskUsage\":[0-9]*") + if tc.queryLevelStats.MaxDiskUsage > 0 && !maxDiskUsage.MatchString(e.Message) { + // If we have a max disk usage, we expect the MaxDiskUsage field to be populated. + t.Errorf("expected to find MaxDiskUsage but none was found") + } else if tc.queryLevelStats.MaxDiskUsage == 0 && maxDiskUsage.MatchString(e.Message) { + // If we do not a max disk usage, expect no MaxDiskUsage field. + t.Errorf("expected no MaxDiskUsage field, but was found") + } + kvBytesRead := regexp.MustCompile("\"KVBytesRead\":[0-9]*") + if tc.queryLevelStats.KVBytesRead > 0 && !kvBytesRead.MatchString(e.Message) { + // If we have read bytes from KV, we expect the KVBytesRead field to be populated. + t.Errorf("expected to find KVBytesRead but none was found") + } else if tc.queryLevelStats.KVBytesRead == 0 && kvBytesRead.MatchString(e.Message) { + // If we have not read bytes from KV, expect no KVBytesRead field. + t.Errorf("expected no KVBytesRead field, but was found") + } + kvRowsRead := regexp.MustCompile("\"KVRowsRead\":[0-9]*") + if tc.queryLevelStats.KVRowsRead > 0 && !kvRowsRead.MatchString(e.Message) { + // If we have read rows from KV, we expect the KVRowsRead field to be populated. + t.Errorf("expected to find KVRowsRead but none was found") + } else if tc.queryLevelStats.KVRowsRead == 0 && kvRowsRead.MatchString(e.Message) { + // If we have not read rows from KV, expect no KVRowsRead field. + t.Errorf("expected no KVRowsRead field, but was found") + } + networkMessages := regexp.MustCompile("\"NetworkMessages\":[0-9]*") + if tc.queryLevelStats.NetworkMessages > 0 && !networkMessages.MatchString(e.Message) { + // If we have network messages, we expect the NetworkMessages field to be populated. + t.Errorf("expected to find NetworkMessages but none was found") + } else if tc.queryLevelStats.NetworkMessages == 0 && networkMessages.MatchString(e.Message) { + // If we do not have network messages, expect no NetworkMessages field. + t.Errorf("expected no NetworkMessages field, but was found") + } if tc.expectedErr != "" { if !strings.Contains(e.Message, tc.expectedErr) { t.Errorf("%s: missing error %s in message %s", tc.name, tc.expectedErr, e.Message) @@ -520,7 +561,7 @@ func TestNoTelemetryLogOnTroubleshootMode(t *testing.T) { cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t) defer cleanup() - st := stubTime{} + st := logtestutils.StubTime{} s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ @@ -572,7 +613,7 @@ func TestNoTelemetryLogOnTroubleshootMode(t *testing.T) { // Set the time for when we issue a query to enable/disable // troubleshooting mode. setTroubleshootModeTime := timeutil.FromUnixMicros(int64(idx * 1e6)) - st.setTime(setTroubleshootModeTime) + st.SetTime(setTroubleshootModeTime) if tc.enableTroubleshootingMode { db.Exec(t, `SET troubleshooting_mode = true;`) } else { @@ -581,7 +622,7 @@ func TestNoTelemetryLogOnTroubleshootMode(t *testing.T) { // Advance time 1 second from previous query. Ensure enough time has passed // from when we set troubleshooting mode for this query to be sampled. setQueryTime := timeutil.FromUnixMicros(int64((idx + 1) * 1e6)) - st.setTime(setQueryTime) + st.SetTime(setQueryTime) db.Exec(t, tc.query) } diff --git a/pkg/util/encoding/encoding.go b/pkg/util/encoding/encoding.go index 39a4583f1fd2..b717bc419226 100644 --- a/pkg/util/encoding/encoding.go +++ b/pkg/util/encoding/encoding.go @@ -1884,6 +1884,9 @@ func PrettyPrintValue(valDirs []Direction, b []byte, sep string) string { if allDecoded { return s1 } + // If we failed to decoded everything above, assume the key was the result of a + // `PrefixEnd()`. Attempt to undo PrefixEnd & retry the process, otherwise return + // what we were able to decode. if undoPrefixEnd, ok := UndoPrefixEnd(b); ok { // When we UndoPrefixEnd, we may have lost a tail of 0xFFs. Try to add // enough of them to get something decoded. This is best-effort, we have to stop @@ -1902,6 +1905,66 @@ func PrettyPrintValue(valDirs []Direction, b []byte, sep string) string { return s1 } +// PrettyPrintValuesWithTypes returns a slice containing each contiguous decodable value +// in the provided byte slice along with a slice containing the type of each value. +// The directions each value is encoded may be provided. If valDirs is nil, +// all values are decoded and printed with the default direction (ascending). +func PrettyPrintValuesWithTypes(valDirs []Direction, b []byte) (vals []string, types []Type) { + vals1, types1, allDecoded := prettyPrintValuesWithTypesImpl(valDirs, b) + if allDecoded { + return vals1, types1 + } + // If we failed to decoded everything above, assume the key was the result of a + // `PrefixEnd()`. Attempt to undo PrefixEnd & retry the process, otherwise return + // what we were able to decode. + if undoPrefixEnd, ok := UndoPrefixEnd(b); ok { + // When we UndoPrefixEnd, we may have lost a tail of 0xFFs. Try to add + // enough of them to get something decoded. This is best-effort, we have to stop + // somewhere. + cap := 20 + if len(valDirs) > len(b) { + cap = len(valDirs) - len(b) + } + for i := 0; i < cap; i++ { + if vals2, types2, allDecoded := prettyPrintValuesWithTypesImpl(valDirs, undoPrefixEnd); allDecoded { + vals2 = append(vals2, "PrefixEnd") + types2 = append(types2, Bytes) + return vals2, types2 + } + undoPrefixEnd = append(undoPrefixEnd, 0xFF) + } + } + return vals1, types1 +} + +func prettyPrintValuesWithTypesImpl( + valDirs []Direction, b []byte, +) (vals []string, types []Type, allDecoded bool) { + allDecoded = true + for len(b) > 0 { + var valDir Direction + if len(valDirs) > 0 { + valDir = valDirs[0] + valDirs = valDirs[1:] + } + + bb, s, err := prettyPrintFirstValue(valDir, b) + if err != nil { + // If we fail to decode, mark as unknown and attempt + // to continue - it's possible we can still decode the + // remainder of the key bytes. + allDecoded = false + vals = append(vals, "???") + types = append(types, Unknown) + } else { + vals = append(vals, s) + types = append(types, PeekType(b)) + } + b = bb + } + return vals, types, allDecoded +} + func prettyPrintValueImpl(valDirs []Direction, b []byte, sep string) (string, bool) { allDecoded := true var buf strings.Builder @@ -1918,6 +1981,9 @@ func prettyPrintValueImpl(valDirs []Direction, b []byte, sep string) (string, bo bb, s, err := prettyPrintFirstValue(valDir, b) if err != nil { + // If we fail to decode, mark as unknown and attempt + // to continue - it's possible we can still decode the + // remainder of the key bytes. allDecoded = false buf.WriteString(sep) buf.WriteByte('?') diff --git a/pkg/util/hlc/timestamp.go b/pkg/util/hlc/timestamp.go index b7cc112cff4d..dc7beed28137 100644 --- a/pkg/util/hlc/timestamp.go +++ b/pkg/util/hlc/timestamp.go @@ -71,6 +71,8 @@ func (t Timestamp) Compare(s Timestamp) int { } // String implements the fmt.Stringer interface. +// Outputs: seconds.nanos,logical[?] +// ? is added if synthetic is set. func (t Timestamp) String() string { // The following code was originally written as // fmt.Sprintf("%d.%09d,%d", t.WallTime/1e9, t.WallTime%1e9, t.Logical). diff --git a/pkg/util/keysutil/keys.go b/pkg/util/keysutil/keys.go index 40489cbb536b..efbb6c38e35d 100644 --- a/pkg/util/keysutil/keys.go +++ b/pkg/util/keysutil/keys.go @@ -133,7 +133,7 @@ outer: Wrapped: errors.New("known key, but unsupported subtype"), } } - for _, v := range keys.ConstKeyDict { + for _, v := range keys.ConstKeyOverrides { if strings.HasPrefix(input, v.Name) { output = append(output, v.Value...) input = input[len(v.Name):] diff --git a/pkg/util/log/eventpb/json_encode_generated.go b/pkg/util/log/eventpb/json_encode_generated.go index 0543e93b2278..e5ccee19e33d 100644 --- a/pkg/util/log/eventpb/json_encode_generated.go +++ b/pkg/util/log/eventpb/json_encode_generated.go @@ -3904,6 +3904,60 @@ func (m *SampledQuery) AppendJSONFields(printComma bool, b redact.RedactableByte b = append(b, ']') } + if m.NetworkBytesSent != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"NetworkBytesSent\":"...) + b = strconv.AppendInt(b, int64(m.NetworkBytesSent), 10) + } + + if m.MaxMemUsage != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"MaxMemUsage\":"...) + b = strconv.AppendInt(b, int64(m.MaxMemUsage), 10) + } + + if m.MaxDiskUsage != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"MaxDiskUsage\":"...) + b = strconv.AppendInt(b, int64(m.MaxDiskUsage), 10) + } + + if m.KVBytesRead != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"KVBytesRead\":"...) + b = strconv.AppendInt(b, int64(m.KVBytesRead), 10) + } + + if m.KVRowsRead != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"KVRowsRead\":"...) + b = strconv.AppendInt(b, int64(m.KVRowsRead), 10) + } + + if m.NetworkMessages != 0 { + if printComma { + b = append(b, ',') + } + printComma = true + b = append(b, "\"NetworkMessages\":"...) + b = strconv.AppendInt(b, int64(m.NetworkMessages), 10) + } + return printComma, b } diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index 9b703200d17e..5355e44bb47c 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -143,6 +143,24 @@ message SampledQuery { // The regions of the nodes where SQL processors ran. repeated string regions = 38 [(gogoproto.jsontag) = ',omitempty', (gogoproto.moretags) = "redact:\"nonsensitive\""]; + // The number of network bytes sent by nodes for this query. + int64 network_bytes_sent = 39 [(gogoproto.jsontag) = ',omitempty']; + + // The maximum amount of memory usage by nodes for this query. + int64 max_mem_usage = 40 [(gogoproto.jsontag) = ',omitempty']; + + // The maximum amount of disk usage by nodes for this query. + int64 max_disk_usage = 41 [(gogoproto.jsontag) = ',omitempty']; + + // The number of bytes read at the KV layer for this query. + int64 kv_bytes_read = 42 [(gogoproto.customname) = "KVBytesRead", (gogoproto.jsontag) = ',omitempty']; + + // The number of rows read at the KV layer for this query. + int64 kv_rows_read = 43 [(gogoproto.customname) = "KVRowsRead", (gogoproto.jsontag) = ',omitempty']; + + // The number of network messages sent by nodes for this query. + int64 network_messages = 44 [(gogoproto.jsontag) = ',omitempty']; + reserved 12; } diff --git a/pkg/util/log/logtestutils/BUILD.bazel b/pkg/util/log/logtestutils/BUILD.bazel index b51df9587bb8..805a166be975 100644 --- a/pkg/util/log/logtestutils/BUILD.bazel +++ b/pkg/util/log/logtestutils/BUILD.bazel @@ -3,13 +3,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "logtestutils", - srcs = ["log_test_utils.go"], + srcs = [ + "log_test_utils.go", + "telemetry_logging_test_utils.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/util/log/logtestutils", visibility = ["//visibility:public"], deps = [ + "//pkg/sql/execstats", "//pkg/util/log", "//pkg/util/log/channel", "//pkg/util/log/logconfig", + "//pkg/util/syncutil", ], ) diff --git a/pkg/util/log/logtestutils/telemetry_logging_test_utils.go b/pkg/util/log/logtestutils/telemetry_logging_test_utils.go new file mode 100644 index 000000000000..5873a9f6142d --- /dev/null +++ b/pkg/util/log/logtestutils/telemetry_logging_test_utils.go @@ -0,0 +1,79 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package logtestutils + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// StubTime is a helper struct to stub the current time. +type StubTime struct { + syncutil.RWMutex + t time.Time +} + +// SetTime sets the current time. +func (s *StubTime) SetTime(t time.Time) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.t = t +} + +// TimeNow returns the current stubbed time. +func (s *StubTime) TimeNow() time.Time { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.t +} + +// StubQueryStats is a helper struct to stub query level stats. +type StubQueryStats struct { + syncutil.RWMutex + stats execstats.QueryLevelStats +} + +// SetQueryLevelStats sets the stubbed query level stats. +func (s *StubQueryStats) SetQueryLevelStats(stats execstats.QueryLevelStats) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.stats = stats +} + +// QueryLevelStats returns the current stubbed query level stats. +func (s *StubQueryStats) QueryLevelStats() execstats.QueryLevelStats { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.stats +} + +// StubTracingStatus is a helper struct to stub whether a query is being +// traced. +type StubTracingStatus struct { + syncutil.RWMutex + isTracing bool +} + +// SetTracingStatus sets the stubbed status for tracing (true/false). +func (s *StubTracingStatus) SetTracingStatus(t bool) { + s.RWMutex.Lock() + defer s.RWMutex.Unlock() + s.isTracing = t +} + +// TracingStatus returns the stubbed status for tracing. +func (s *StubTracingStatus) TracingStatus() bool { + s.RWMutex.RLock() + defer s.RWMutex.RUnlock() + return s.isTracing +} diff --git a/pkg/workload/schemachange/schemachange.go b/pkg/workload/schemachange/schemachange.go index 93f3d1240e3c..da709d835bca 100644 --- a/pkg/workload/schemachange/schemachange.go +++ b/pkg/workload/schemachange/schemachange.go @@ -331,7 +331,7 @@ func (w *schemaChangeWorker) getErrorState() string { } func (w *schemaChangeWorker) runInTxn(ctx context.Context, tx pgx.Tx) error { - w.logger.startLog() + w.logger.startLog(w.id) w.logger.writeLog("BEGIN") opsNum := 1 + w.opGen.randIntn(w.maxOpsPerWorker) @@ -534,13 +534,14 @@ func (w *schemaChangeWorker) releaseLocksIfHeld() { // startLog initializes the currentLogEntry of the schemaChangeWorker. It is a noop // if l.verbose < 1. -func (l *logger) startLog() { +func (l *logger) startLog(workerID int) { if l.verbose < 1 { return } l.currentLogEntry.mu.Lock() defer l.currentLogEntry.mu.Unlock() l.currentLogEntry.mu.entry = &LogEntry{ + WorkerID: workerID, ClientTimestamp: timeutil.Now().Format("15:04:05.999999"), } }