Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#96893

96109: metrics: introduce CounterFloat64 and use for tenant RUs r=RaduBerinde,tbg a=dhartunian

Previously, we were prevented from using float64s directly in metric counters since counters were limited to ints. This led to the use of Gauges in situations where Counters would be preferable since we didn't have code to help manage a monotonically increasing float64 value.

This commit introduces some helpers around atomically adding float64s together and increasing one monotonically. Those primitives are composed into a `CounterFloat64` type further used to construct `AggCounterFloat64` which can be used in place of `AggCounter` to track per-tenant metrics.

The two `GaugeFloat64` types used for tenant `totalRU` and `totalKVRU` metrics are replaced with the new `CounterFloat64` type to properly reflect the fact that these are monotonically increasing values. This helps Prometheus when scraping these metrics to correctly account for missing data if necessary.

Resolves cockroachdb#68291

Epic: CRDB0-14536

Release note: None

96685: storage/pebbleiter: mangle unsafe buffers during positioning r=jbowens a=jbowens

**storage/pebbleiter: mangle unsafe buffers between positioning methods**

Randomly clobber the key and value buffers in order to ensure lifetimes are respected. This commit extends the `pebbleiter.assertionIter` type used in `crdb_test` builds that sits between pkg/storage and the pebble.Iterator type. It now will randomly zero the buffers used to return the previous iterator position's key and value, to ensure code isn't improperly relying on the stability of the these keys.

**storage: fix UnsafeKey() usage of invalid iterator**

Previously, CheckSSTConflicts would improperly call UnsafeKey on an exhausted
sstIter. This could result in undefined, inconsistent behavior as UnsafeKey
may point to arbitrary data once the iterator is exhausted.

**kv/kvserver/spanset: fix read of invalid iterator's UnsafeKey**

Previously, the spanset.MVCCIterator implementation would retrieve the
contained iterator's UsafeKey() on an invalid iterator in Next, NextKey and
Prev. This retrieval was harmless because it always checked the validity of the
iterator before using it, but recent assertions in test builds prevent this
usage.

Epic: None
Close cockroachdb#86657.
Informs cockroachdb#94141.
Release note: None

96832: ui: txn insights details fixes r=maryliag a=xinhaoz

See individual commits.

<img width="1115" alt="image" src="https://user-images.githubusercontent.com/20136951/217676969-94de02ba-1499-43c3-a542-ece3e5829a6c.png">
<img width="1911" alt="image" src="https://user-images.githubusercontent.com/20136951/217677020-c0a20035-f80b-472d-943d-a2bfd0c92b13.png">


96893: roachtest: test with sqlalchemy 2.0 r=rafiss a=rafiss

fixes cockroachdb#96880
backports for cockroachdb#96859 and cockroachdb#96856

Release note: None

Co-authored-by: David Hartunian <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
Co-authored-by: Xin Hao Zhang <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
5 people committed Feb 9, 2023
5 parents 732c3a7 + f96b21f + a80f29c + e350956 + a3e760f commit faeebee
Show file tree
Hide file tree
Showing 23 changed files with 582 additions and 67 deletions.
12 changes: 6 additions & 6 deletions pkg/ccl/multitenantccl/tenantcostserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
// aggregated value for a metric is not useful (it sums up the consumption for
// each tenant, as last reported to this node).
type Metrics struct {
TotalRU *aggmetric.AggGaugeFloat64
TotalKVRU *aggmetric.AggGaugeFloat64
TotalRU *aggmetric.AggCounterFloat64
TotalKVRU *aggmetric.AggCounterFloat64
TotalReadBatches *aggmetric.AggGauge
TotalReadRequests *aggmetric.AggGauge
TotalReadBytes *aggmetric.AggGauge
Expand Down Expand Up @@ -130,8 +130,8 @@ var (
func (m *Metrics) init() {
b := aggmetric.MakeBuilder(multitenant.TenantIDLabel)
*m = Metrics{
TotalRU: b.GaugeFloat64(metaTotalRU),
TotalKVRU: b.GaugeFloat64(metaTotalKVRU),
TotalRU: b.CounterFloat64(metaTotalRU),
TotalKVRU: b.CounterFloat64(metaTotalKVRU),
TotalReadBatches: b.Gauge(metaTotalReadBatches),
TotalReadRequests: b.Gauge(metaTotalReadRequests),
TotalReadBytes: b.Gauge(metaTotalReadBytes),
Expand All @@ -148,8 +148,8 @@ func (m *Metrics) init() {

// tenantMetrics represent metrics for an individual tenant.
type tenantMetrics struct {
totalRU *aggmetric.GaugeFloat64
totalKVRU *aggmetric.GaugeFloat64
totalRU *aggmetric.CounterFloat64
totalKVRU *aggmetric.CounterFloat64
totalReadBatches *aggmetric.Gauge
totalReadRequests *aggmetric.Gauge
totalReadBytes *aggmetric.Gauge
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func (s *instance) TokenBucketRequest(
}

// Report current consumption.
metrics.totalRU.Update(consumption.RU)
metrics.totalKVRU.Update(consumption.KVRU)
metrics.totalRU.UpdateIfHigher(consumption.RU)
metrics.totalKVRU.UpdateIfHigher(consumption.KVRU)
metrics.totalReadBatches.Update(int64(consumption.ReadBatches))
metrics.totalReadRequests.Update(int64(consumption.ReadRequests))
metrics.totalReadBytes.Update(int64(consumption.ReadBytes))
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/sqlalchemy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
var sqlAlchemyResultRegex = regexp.MustCompile(`^(?P<test>test.*::.*::[^ \[\]]*(?:\[.*])?) (?P<result>\w+)\s+\[.+]$`)
var sqlAlchemyReleaseTagRegex = regexp.MustCompile(`^rel_(?P<major>\d+)_(?P<minor>\d+)_(?P<point>\d+)$`)

var supportedSQLAlchemyTag = "1.4.46"
var supportedSQLAlchemyTag = "2.0.2"

// This test runs the SQLAlchemy dialect test suite against a single Cockroach
// node.
Expand Down Expand Up @@ -96,7 +96,7 @@ func runSQLAlchemy(ctx context.Context, t test.Test, c cluster.Cluster) {

if err := repeatRunE(ctx, t, c, node, "install pytest", fmt.Sprintf(`
source venv/bin/activate &&
pip3 install --upgrade --force-reinstall setuptools pytest==6.0.1 pytest-xdist psycopg2 alembic sqlalchemy==%s`,
pip3 install --upgrade --force-reinstall setuptools pytest==7.2.1 pytest-xdist psycopg2 alembic sqlalchemy==%s`,
supportedSQLAlchemyTag)); err != nil {
t.Fatal(err)
}
Expand Down
26 changes: 23 additions & 3 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,37 @@ func (i *MVCCIterator) SeekLT(key storage.MVCCKey) {
// Next is part of the storage.MVCCIterator interface.
func (i *MVCCIterator) Next() {
i.i.Next()
i.checkAllowed(roachpb.Span{Key: i.UnsafeKey().Key}, false)
i.checkAllowedCurrPosForward(false)
}

// Prev is part of the storage.MVCCIterator interface.
func (i *MVCCIterator) Prev() {
i.i.Prev()
i.checkAllowed(roachpb.Span{Key: i.UnsafeKey().Key}, false)
i.checkAllowedCurrPosForward(false)
}

// NextKey is part of the storage.MVCCIterator interface.
func (i *MVCCIterator) NextKey() {
i.i.NextKey()
i.checkAllowed(roachpb.Span{Key: i.UnsafeKey().Key}, false)
i.checkAllowedCurrPosForward(false)
}

// checkAllowedCurrPosForward checks the span starting at the current iterator
// position, if the current iterator position is valid.
func (i *MVCCIterator) checkAllowedCurrPosForward(errIfDisallowed bool) {
i.invalid = false
i.err = nil
if ok, _ := i.i.Valid(); !ok {
// If the iterator is invalid after the operation, there's nothing to
// check. We allow uses of iterators to exceed the declared span bounds
// as long as the iterator itself is configured with proper boundaries.
return
}
i.checkAllowedValidPos(roachpb.Span{Key: i.UnsafeKey().Key}, errIfDisallowed)
}

// checkAllowed checks the provided span if the current iterator position is
// valid.
func (i *MVCCIterator) checkAllowed(span roachpb.Span, errIfDisallowed bool) {
i.invalid = false
i.err = nil
Expand All @@ -126,6 +142,10 @@ func (i *MVCCIterator) checkAllowed(span roachpb.Span, errIfDisallowed bool) {
// as long as the iterator itself is configured with proper boundaries.
return
}
i.checkAllowedValidPos(span, errIfDisallowed)
}

func (i *MVCCIterator) checkAllowedValidPos(span roachpb.Span, errIfDisallowed bool) {
var err error
if i.spansOnly {
err = i.spans.CheckAllowed(SpanReadOnly, span)
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/pebbleiter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ go_library(
}),
importpath = "github.com/cockroachdb/cockroach/pkg/storage/pebbleiter",
visibility = ["//visibility:public"],
deps = ["@com_github_cockroachdb_pebble//:pebble"],
deps = [
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
],
)

REMOVE_GO_BUILD_CONSTRAINTS = "cat $< | grep -v '//go:build' | grep -v '// +build' > $@"
Expand Down
129 changes: 127 additions & 2 deletions pkg/storage/pebbleiter/crdb_test_on.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@

package pebbleiter

import "github.com/cockroachdb/pebble"
import (
"math/rand"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)

// Iterator wraps the *pebble.Iterator in crdb_test builds with an assertionIter
// that detects when Close is called on the iterator twice. Double closes are
Expand All @@ -31,6 +36,23 @@ func MaybeWrap(iter *pebble.Iterator) Iterator {
type assertionIter struct {
*pebble.Iterator
closed bool
// unsafeBufs hold buffers used for returning values with short lifetimes to
// the caller. To assert that the client is respecting the lifetimes,
// assertionIter mangles the buffers as soon as the associated lifetime
// expires. This is the same technique applied by the unsafeMVCCIterator in
// pkg/storage, but this time applied at the API boundary between
// pkg/storage and Pebble.
//
// unsafeBufs holds two buffers per-key type and an index indicating which
// are currently in use. This is used to randomly switch to a different
// buffer, ensuring that the buffer(s) returned to the caller for the
// previous iterator position are garbage (as opposed to just state
// corresponding to the current iterator position).
unsafeBufs struct {
idx int
key [2][]byte
val [2][]byte
}
}

func (i *assertionIter) Clone(cloneOpts pebble.CloneOptions) (Iterator, error) {
Expand All @@ -43,8 +65,111 @@ func (i *assertionIter) Clone(cloneOpts pebble.CloneOptions) (Iterator, error) {

func (i *assertionIter) Close() error {
if i.closed {
panic("pebble.Iterator already closed")
panic(errors.AssertionFailedf("pebble.Iterator already closed"))
}
i.closed = true
return i.Iterator.Close()
}

func (i *assertionIter) Key() []byte {
if !i.Valid() {
panic(errors.AssertionFailedf("Key() called on !Valid() pebble.Iterator"))
}
idx := i.unsafeBufs.idx
i.unsafeBufs.key[idx] = append(i.unsafeBufs.key[idx][:0], i.Iterator.Key()...)
return i.unsafeBufs.key[idx]
}

func (i *assertionIter) Value() []byte {
if !i.Valid() {
panic(errors.AssertionFailedf("Value() called on !Valid() pebble.Iterator"))
}
idx := i.unsafeBufs.idx
i.unsafeBufs.val[idx] = append(i.unsafeBufs.val[idx][:0], i.Iterator.Value()...)
return i.unsafeBufs.val[idx]
}

func (i *assertionIter) LazyValue() pebble.LazyValue {
if !i.Valid() {
panic(errors.AssertionFailedf("LazyValue() called on !Valid() pebble.Iterator"))
}
return i.Iterator.LazyValue()
}

func (i *assertionIter) First() bool {
i.maybeMangleBufs()
return i.Iterator.First()
}

func (i *assertionIter) SeekGE(key []byte) bool {
i.maybeMangleBufs()
return i.Iterator.SeekGE(key)
}

func (i *assertionIter) SeekGEWithLimit(key []byte, limit []byte) pebble.IterValidityState {
i.maybeMangleBufs()
return i.Iterator.SeekGEWithLimit(key, limit)
}

func (i *assertionIter) SeekPrefixGE(key []byte) bool {
i.maybeMangleBufs()
return i.Iterator.SeekPrefixGE(key)
}

func (i *assertionIter) Next() bool {
i.maybeMangleBufs()
return i.Iterator.Next()
}

func (i *assertionIter) NextWithLimit(limit []byte) pebble.IterValidityState {
i.maybeMangleBufs()
return i.Iterator.NextWithLimit(limit)
}

func (i *assertionIter) NextPrefix() bool {
i.maybeMangleBufs()
return i.Iterator.NextPrefix()
}

func (i *assertionIter) Last() bool {
i.maybeMangleBufs()
return i.Iterator.Last()
}

func (i *assertionIter) SeekLT(key []byte) bool {
i.maybeMangleBufs()
return i.Iterator.SeekLT(key)
}

func (i *assertionIter) SeekLTWithLimit(key []byte, limit []byte) pebble.IterValidityState {
i.maybeMangleBufs()
return i.Iterator.SeekLTWithLimit(key, limit)
}

func (i *assertionIter) Prev() bool {
i.maybeMangleBufs()
return i.Iterator.Prev()
}

func (i *assertionIter) PrevWithLimit(limit []byte) pebble.IterValidityState {
i.maybeMangleBufs()
return i.Iterator.PrevWithLimit(limit)
}

// maybeMangleBufs trashes the contents of buffers used to return unsafe values
// to the caller. This is used to ensure that the client respects the Pebble
// iterator interface and the lifetimes of buffers it returns.
func (i *assertionIter) maybeMangleBufs() {
if rand.Intn(2) == 0 {
idx := i.unsafeBufs.idx
for _, b := range [...][]byte{i.unsafeBufs.key[idx], i.unsafeBufs.val[idx]} {
for i := range b {
b[i] = 0
}
}
if rand.Intn(2) == 0 {
// Switch to a new buffer for the next iterator position.
i.unsafeBufs.idx = (i.unsafeBufs.idx + 1) % 2
}
}
}
2 changes: 1 addition & 1 deletion pkg/storage/sst.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ func CheckSSTConflicts(
// 2) the ext iterator became invalid
// 3) both iterators changed keys and the sst iterator's key is further
// ahead.
if extChangedKeys && (!sstChangedKeys || (!extOK && sstOK) || extIter.UnsafeKey().Key.Compare(sstIter.UnsafeKey().Key) < 0) {
if sstOK && extChangedKeys && (!sstChangedKeys || !extOK || extIter.UnsafeKey().Key.Compare(sstIter.UnsafeKey().Key) < 0) {
extIter.SeekGE(MVCCKey{Key: sstIter.UnsafeKey().Key})
extOK, extErr = extIter.Valid()
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/ui/workspaces/cluster-ui/src/api/sqlApi.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

import { SqlExecutionResponse, sqlResultsAreEmpty } from "./sqlApi";
import {
SqlExecutionResponse,
sqlResultsAreEmpty,
isUpgradeError,
} from "./sqlApi";

describe("sqlApi", () => {
test("sqlResultsAreEmpty should return true when there are no rows in the response", () => {
Expand Down Expand Up @@ -83,4 +87,24 @@ describe("sqlApi", () => {
expect(sqlResultsAreEmpty(tc.response)).toEqual(tc.expected);
});
});

test("isUpgradeError", () => {
const tests = [
{
msg: 'relation "crdb_internal.txn_execution_insights" does not exist',
expected: true,
},
{
msg: 'column "hello" does not exist',
expected: true,
},
{
msg: "not an upgrade error",
expected: false,
},
];
tests.forEach(tc => {
expect(isUpgradeError(tc.msg)).toEqual(tc.expected);
});
});
});
4 changes: 2 additions & 2 deletions pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ const UPGRADE_RELATED_ERRORS = [
/column "(.*)" does not exist/i,
];

function isUpgradeError(message: string): boolean {
return UPGRADE_RELATED_ERRORS.some(err => message.search(err));
export function isUpgradeError(message: string): boolean {
return UPGRADE_RELATED_ERRORS.some(err => message.search(err) !== -1);
}

/**
Expand Down
6 changes: 4 additions & 2 deletions pkg/ui/workspaces/cluster-ui/src/api/txnInsightsApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,9 @@ export async function getTxnInsightDetailsApi(
}

const stmts = result.execution.txn_results[0];
txnInsightDetails.statements = formatStmtInsights(stmts);
if (stmts.rows?.length) {
txnInsightDetails.statements = formatStmtInsights(stmts);
}
} catch (e) {
errors.statementsErr = e;
}
Expand All @@ -636,7 +638,7 @@ export async function getTxnInsightDetailsApi(
if (!req.excludeContention && highContention) {
const contentionInfo = await getTxnInsightsContentionDetailsApi(req);
txnInsightDetails.blockingContentionDetails =
contentionInfo.blockingContentionDetails;
contentionInfo?.blockingContentionDetails;
}
} catch (e) {
errors.contentionErr = e;
Expand Down
Loading

0 comments on commit faeebee

Please sign in to comment.