diff --git a/pkg/ccl/multitenantccl/tenantcostserver/metrics.go b/pkg/ccl/multitenantccl/tenantcostserver/metrics.go index f0c28a45be84..edddd38986ea 100644 --- a/pkg/ccl/multitenantccl/tenantcostserver/metrics.go +++ b/pkg/ccl/multitenantccl/tenantcostserver/metrics.go @@ -25,8 +25,8 @@ import ( // aggregated value for a metric is not useful (it sums up the consumption for // each tenant, as last reported to this node). type Metrics struct { - TotalRU *aggmetric.AggGaugeFloat64 - TotalKVRU *aggmetric.AggGaugeFloat64 + TotalRU *aggmetric.AggCounterFloat64 + TotalKVRU *aggmetric.AggCounterFloat64 TotalReadBatches *aggmetric.AggGauge TotalReadRequests *aggmetric.AggGauge TotalReadBytes *aggmetric.AggGauge @@ -130,8 +130,8 @@ var ( func (m *Metrics) init() { b := aggmetric.MakeBuilder(multitenant.TenantIDLabel) *m = Metrics{ - TotalRU: b.GaugeFloat64(metaTotalRU), - TotalKVRU: b.GaugeFloat64(metaTotalKVRU), + TotalRU: b.CounterFloat64(metaTotalRU), + TotalKVRU: b.CounterFloat64(metaTotalKVRU), TotalReadBatches: b.Gauge(metaTotalReadBatches), TotalReadRequests: b.Gauge(metaTotalReadRequests), TotalReadBytes: b.Gauge(metaTotalReadBytes), @@ -148,8 +148,8 @@ func (m *Metrics) init() { // tenantMetrics represent metrics for an individual tenant. type tenantMetrics struct { - totalRU *aggmetric.GaugeFloat64 - totalKVRU *aggmetric.GaugeFloat64 + totalRU *aggmetric.CounterFloat64 + totalKVRU *aggmetric.CounterFloat64 totalReadBatches *aggmetric.Gauge totalReadRequests *aggmetric.Gauge totalReadBytes *aggmetric.Gauge diff --git a/pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go b/pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go index d0fb4c350328..925ed03553c0 100644 --- a/pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go +++ b/pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go @@ -122,8 +122,8 @@ func (s *instance) TokenBucketRequest( } // Report current consumption. - metrics.totalRU.Update(consumption.RU) - metrics.totalKVRU.Update(consumption.KVRU) + metrics.totalRU.UpdateIfHigher(consumption.RU) + metrics.totalKVRU.UpdateIfHigher(consumption.KVRU) metrics.totalReadBatches.Update(int64(consumption.ReadBatches)) metrics.totalReadRequests.Update(int64(consumption.ReadRequests)) metrics.totalReadBytes.Update(int64(consumption.ReadBytes)) diff --git a/pkg/cmd/roachtest/tests/sqlalchemy.go b/pkg/cmd/roachtest/tests/sqlalchemy.go index 52ba71246c94..8dc6f1646fdc 100644 --- a/pkg/cmd/roachtest/tests/sqlalchemy.go +++ b/pkg/cmd/roachtest/tests/sqlalchemy.go @@ -30,7 +30,7 @@ import ( var sqlAlchemyResultRegex = regexp.MustCompile(`^(?Ptest.*::.*::[^ \[\]]*(?:\[.*])?) (?P\w+)\s+\[.+]$`) var sqlAlchemyReleaseTagRegex = regexp.MustCompile(`^rel_(?P\d+)_(?P\d+)_(?P\d+)$`) -var supportedSQLAlchemyTag = "1.4.46" +var supportedSQLAlchemyTag = "2.0.2" // This test runs the SQLAlchemy dialect test suite against a single Cockroach // node. @@ -96,7 +96,7 @@ func runSQLAlchemy(ctx context.Context, t test.Test, c cluster.Cluster) { if err := repeatRunE(ctx, t, c, node, "install pytest", fmt.Sprintf(` source venv/bin/activate && - pip3 install --upgrade --force-reinstall setuptools pytest==6.0.1 pytest-xdist psycopg2 alembic sqlalchemy==%s`, + pip3 install --upgrade --force-reinstall setuptools pytest==7.2.1 pytest-xdist psycopg2 alembic sqlalchemy==%s`, supportedSQLAlchemyTag)); err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index bdbbe338b709..2198e2cf8371 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -102,21 +102,37 @@ func (i *MVCCIterator) SeekLT(key storage.MVCCKey) { // Next is part of the storage.MVCCIterator interface. func (i *MVCCIterator) Next() { i.i.Next() - i.checkAllowed(roachpb.Span{Key: i.UnsafeKey().Key}, false) + i.checkAllowedCurrPosForward(false) } // Prev is part of the storage.MVCCIterator interface. func (i *MVCCIterator) Prev() { i.i.Prev() - i.checkAllowed(roachpb.Span{Key: i.UnsafeKey().Key}, false) + i.checkAllowedCurrPosForward(false) } // NextKey is part of the storage.MVCCIterator interface. func (i *MVCCIterator) NextKey() { i.i.NextKey() - i.checkAllowed(roachpb.Span{Key: i.UnsafeKey().Key}, false) + i.checkAllowedCurrPosForward(false) } +// checkAllowedCurrPosForward checks the span starting at the current iterator +// position, if the current iterator position is valid. +func (i *MVCCIterator) checkAllowedCurrPosForward(errIfDisallowed bool) { + i.invalid = false + i.err = nil + if ok, _ := i.i.Valid(); !ok { + // If the iterator is invalid after the operation, there's nothing to + // check. We allow uses of iterators to exceed the declared span bounds + // as long as the iterator itself is configured with proper boundaries. + return + } + i.checkAllowedValidPos(roachpb.Span{Key: i.UnsafeKey().Key}, errIfDisallowed) +} + +// checkAllowed checks the provided span if the current iterator position is +// valid. func (i *MVCCIterator) checkAllowed(span roachpb.Span, errIfDisallowed bool) { i.invalid = false i.err = nil @@ -126,6 +142,10 @@ func (i *MVCCIterator) checkAllowed(span roachpb.Span, errIfDisallowed bool) { // as long as the iterator itself is configured with proper boundaries. return } + i.checkAllowedValidPos(span, errIfDisallowed) +} + +func (i *MVCCIterator) checkAllowedValidPos(span roachpb.Span, errIfDisallowed bool) { var err error if i.spansOnly { err = i.spans.CheckAllowed(SpanReadOnly, span) diff --git a/pkg/storage/pebbleiter/BUILD.bazel b/pkg/storage/pebbleiter/BUILD.bazel index fb3bffb2f0bb..c3b54f46f659 100644 --- a/pkg/storage/pebbleiter/BUILD.bazel +++ b/pkg/storage/pebbleiter/BUILD.bazel @@ -13,7 +13,10 @@ go_library( }), importpath = "github.com/cockroachdb/cockroach/pkg/storage/pebbleiter", visibility = ["//visibility:public"], - deps = ["@com_github_cockroachdb_pebble//:pebble"], + deps = [ + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_pebble//:pebble", + ], ) REMOVE_GO_BUILD_CONSTRAINTS = "cat $< | grep -v '//go:build' | grep -v '// +build' > $@" diff --git a/pkg/storage/pebbleiter/crdb_test_on.go b/pkg/storage/pebbleiter/crdb_test_on.go index 211d3c8f4d4a..cdb738e06c20 100644 --- a/pkg/storage/pebbleiter/crdb_test_on.go +++ b/pkg/storage/pebbleiter/crdb_test_on.go @@ -13,7 +13,12 @@ package pebbleiter -import "github.com/cockroachdb/pebble" +import ( + "math/rand" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" +) // Iterator wraps the *pebble.Iterator in crdb_test builds with an assertionIter // that detects when Close is called on the iterator twice. Double closes are @@ -31,6 +36,23 @@ func MaybeWrap(iter *pebble.Iterator) Iterator { type assertionIter struct { *pebble.Iterator closed bool + // unsafeBufs hold buffers used for returning values with short lifetimes to + // the caller. To assert that the client is respecting the lifetimes, + // assertionIter mangles the buffers as soon as the associated lifetime + // expires. This is the same technique applied by the unsafeMVCCIterator in + // pkg/storage, but this time applied at the API boundary between + // pkg/storage and Pebble. + // + // unsafeBufs holds two buffers per-key type and an index indicating which + // are currently in use. This is used to randomly switch to a different + // buffer, ensuring that the buffer(s) returned to the caller for the + // previous iterator position are garbage (as opposed to just state + // corresponding to the current iterator position). + unsafeBufs struct { + idx int + key [2][]byte + val [2][]byte + } } func (i *assertionIter) Clone(cloneOpts pebble.CloneOptions) (Iterator, error) { @@ -43,8 +65,111 @@ func (i *assertionIter) Clone(cloneOpts pebble.CloneOptions) (Iterator, error) { func (i *assertionIter) Close() error { if i.closed { - panic("pebble.Iterator already closed") + panic(errors.AssertionFailedf("pebble.Iterator already closed")) } i.closed = true return i.Iterator.Close() } + +func (i *assertionIter) Key() []byte { + if !i.Valid() { + panic(errors.AssertionFailedf("Key() called on !Valid() pebble.Iterator")) + } + idx := i.unsafeBufs.idx + i.unsafeBufs.key[idx] = append(i.unsafeBufs.key[idx][:0], i.Iterator.Key()...) + return i.unsafeBufs.key[idx] +} + +func (i *assertionIter) Value() []byte { + if !i.Valid() { + panic(errors.AssertionFailedf("Value() called on !Valid() pebble.Iterator")) + } + idx := i.unsafeBufs.idx + i.unsafeBufs.val[idx] = append(i.unsafeBufs.val[idx][:0], i.Iterator.Value()...) + return i.unsafeBufs.val[idx] +} + +func (i *assertionIter) LazyValue() pebble.LazyValue { + if !i.Valid() { + panic(errors.AssertionFailedf("LazyValue() called on !Valid() pebble.Iterator")) + } + return i.Iterator.LazyValue() +} + +func (i *assertionIter) First() bool { + i.maybeMangleBufs() + return i.Iterator.First() +} + +func (i *assertionIter) SeekGE(key []byte) bool { + i.maybeMangleBufs() + return i.Iterator.SeekGE(key) +} + +func (i *assertionIter) SeekGEWithLimit(key []byte, limit []byte) pebble.IterValidityState { + i.maybeMangleBufs() + return i.Iterator.SeekGEWithLimit(key, limit) +} + +func (i *assertionIter) SeekPrefixGE(key []byte) bool { + i.maybeMangleBufs() + return i.Iterator.SeekPrefixGE(key) +} + +func (i *assertionIter) Next() bool { + i.maybeMangleBufs() + return i.Iterator.Next() +} + +func (i *assertionIter) NextWithLimit(limit []byte) pebble.IterValidityState { + i.maybeMangleBufs() + return i.Iterator.NextWithLimit(limit) +} + +func (i *assertionIter) NextPrefix() bool { + i.maybeMangleBufs() + return i.Iterator.NextPrefix() +} + +func (i *assertionIter) Last() bool { + i.maybeMangleBufs() + return i.Iterator.Last() +} + +func (i *assertionIter) SeekLT(key []byte) bool { + i.maybeMangleBufs() + return i.Iterator.SeekLT(key) +} + +func (i *assertionIter) SeekLTWithLimit(key []byte, limit []byte) pebble.IterValidityState { + i.maybeMangleBufs() + return i.Iterator.SeekLTWithLimit(key, limit) +} + +func (i *assertionIter) Prev() bool { + i.maybeMangleBufs() + return i.Iterator.Prev() +} + +func (i *assertionIter) PrevWithLimit(limit []byte) pebble.IterValidityState { + i.maybeMangleBufs() + return i.Iterator.PrevWithLimit(limit) +} + +// maybeMangleBufs trashes the contents of buffers used to return unsafe values +// to the caller. This is used to ensure that the client respects the Pebble +// iterator interface and the lifetimes of buffers it returns. +func (i *assertionIter) maybeMangleBufs() { + if rand.Intn(2) == 0 { + idx := i.unsafeBufs.idx + for _, b := range [...][]byte{i.unsafeBufs.key[idx], i.unsafeBufs.val[idx]} { + for i := range b { + b[i] = 0 + } + } + if rand.Intn(2) == 0 { + // Switch to a new buffer for the next iterator position. + i.unsafeBufs.idx = (i.unsafeBufs.idx + 1) % 2 + } + } +} diff --git a/pkg/storage/sst.go b/pkg/storage/sst.go index 54207dbfdbb5..d2ba10c94eab 100644 --- a/pkg/storage/sst.go +++ b/pkg/storage/sst.go @@ -1077,7 +1077,7 @@ func CheckSSTConflicts( // 2) the ext iterator became invalid // 3) both iterators changed keys and the sst iterator's key is further // ahead. - if extChangedKeys && (!sstChangedKeys || (!extOK && sstOK) || extIter.UnsafeKey().Key.Compare(sstIter.UnsafeKey().Key) < 0) { + if sstOK && extChangedKeys && (!sstChangedKeys || !extOK || extIter.UnsafeKey().Key.Compare(sstIter.UnsafeKey().Key) < 0) { extIter.SeekGE(MVCCKey{Key: sstIter.UnsafeKey().Key}) extOK, extErr = extIter.Valid() } diff --git a/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.spec.ts b/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.spec.ts index 444d580dd204..88ed1ff541f7 100644 --- a/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.spec.ts +++ b/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.spec.ts @@ -8,7 +8,11 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -import { SqlExecutionResponse, sqlResultsAreEmpty } from "./sqlApi"; +import { + SqlExecutionResponse, + sqlResultsAreEmpty, + isUpgradeError, +} from "./sqlApi"; describe("sqlApi", () => { test("sqlResultsAreEmpty should return true when there are no rows in the response", () => { @@ -83,4 +87,24 @@ describe("sqlApi", () => { expect(sqlResultsAreEmpty(tc.response)).toEqual(tc.expected); }); }); + + test("isUpgradeError", () => { + const tests = [ + { + msg: 'relation "crdb_internal.txn_execution_insights" does not exist', + expected: true, + }, + { + msg: 'column "hello" does not exist', + expected: true, + }, + { + msg: "not an upgrade error", + expected: false, + }, + ]; + tests.forEach(tc => { + expect(isUpgradeError(tc.msg)).toEqual(tc.expected); + }); + }); }); diff --git a/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts b/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts index f838294ce646..3fafc43c6c17 100644 --- a/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts +++ b/pkg/ui/workspaces/cluster-ui/src/api/sqlApi.ts @@ -129,8 +129,8 @@ const UPGRADE_RELATED_ERRORS = [ /column "(.*)" does not exist/i, ]; -function isUpgradeError(message: string): boolean { - return UPGRADE_RELATED_ERRORS.some(err => message.search(err)); +export function isUpgradeError(message: string): boolean { + return UPGRADE_RELATED_ERRORS.some(err => message.search(err) !== -1); } /** diff --git a/pkg/ui/workspaces/cluster-ui/src/api/txnInsightsApi.ts b/pkg/ui/workspaces/cluster-ui/src/api/txnInsightsApi.ts index 84f098489aa9..7f51fdee1bdd 100644 --- a/pkg/ui/workspaces/cluster-ui/src/api/txnInsightsApi.ts +++ b/pkg/ui/workspaces/cluster-ui/src/api/txnInsightsApi.ts @@ -622,7 +622,9 @@ export async function getTxnInsightDetailsApi( } const stmts = result.execution.txn_results[0]; - txnInsightDetails.statements = formatStmtInsights(stmts); + if (stmts.rows?.length) { + txnInsightDetails.statements = formatStmtInsights(stmts); + } } catch (e) { errors.statementsErr = e; } @@ -636,7 +638,7 @@ export async function getTxnInsightDetailsApi( if (!req.excludeContention && highContention) { const contentionInfo = await getTxnInsightsContentionDetailsApi(req); txnInsightDetails.blockingContentionDetails = - contentionInfo.blockingContentionDetails; + contentionInfo?.blockingContentionDetails; } } catch (e) { errors.contentionErr = e; diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetails.tsx b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetails.tsx index be93545fdbb9..ee5de6cc9915 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetails.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetails.tsx @@ -7,7 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -import React, { useEffect } from "react"; +import React, { useEffect, useRef } from "react"; import Helmet from "react-helmet"; import { RouteComponentProps } from "react-router-dom"; import { ArrowLeft } from "@cockroachlabs/icons"; @@ -51,6 +51,8 @@ enum TabKeysEnum { STATEMENTS = "statements", } +const MAX_REQ_ATTEMPTS = 3; + export const TransactionInsightDetails: React.FC< TransactionInsightDetailsProps > = ({ @@ -64,16 +66,22 @@ export const TransactionInsightDetails: React.FC< hasAdminRole, refreshUserSQLRoles, }) => { + const fetches = useRef(0); const executionID = getMatchParamByName(match, idAttr); - const txnDetails = insightDetails.txnDetails; - const stmts = insightDetails.statements; - const contentionInfo = insightDetails.blockingContentionDetails; useEffect(() => { refreshUserSQLRoles(); }, [refreshUserSQLRoles]); useEffect(() => { + if (fetches.current === MAX_REQ_ATTEMPTS) { + return; + } + + const txnDetails = insightDetails.txnDetails; + const stmts = insightDetails.statements; + const contentionInfo = insightDetails.blockingContentionDetails; + const stmtsComplete = stmts != null && stmts.length === txnDetails?.stmtExecutionIDs?.length; @@ -86,29 +94,27 @@ export const TransactionInsightDetails: React.FC< if (!stmtsComplete || !contentionComplete || txnDetails == null) { // Only fetch if we are missing some information. + // Note that we will attempt to refetch if we are stll missing some + // information only if the results differ from what we already have, + // with the maximum number of retries capped at MAX_REQ_ATTEMPTS. const execReq = timeScaleRangeToObj(timeScale); const req = { - mergeResultWith: { - txnDetails, - blockingContentionDetails: contentionInfo, - statements: stmts, - start: execReq.start, - end: execReq.end, - }, + mergeResultWith: insightDetails, + start: execReq.start, + end: execReq.end, txnExecutionID: executionID, excludeTxn: txnDetails != null, excludeStmts: stmtsComplete, excludeContention: contentionComplete, }; refreshTransactionInsightDetails(req); + fetches.current += 1; } }, [ timeScale, executionID, refreshTransactionInsightDetails, - stmts, - txnDetails, - contentionInfo, + insightDetails, ]); const prevPage = (): void => history.goBack(); @@ -140,6 +146,7 @@ export const TransactionInsightDetails: React.FC< > - {txnDetails?.stmtExecutionIDs?.length && ( + {(insightDetails.txnDetails?.stmtExecutionIDs?.length || + insightDetails.statements?.length) && ( )} diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetailsOverviewTab.tsx b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetailsOverviewTab.tsx index ce146f912ba7..02ad4db39d92 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetailsOverviewTab.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetailsOverviewTab.tsx @@ -59,6 +59,7 @@ type Props = { setTimeScale: (ts: TimeScale) => void; hasAdminRole: boolean; errors: TxnInsightDetailsReqErrs | null; + maxRequestsReached: boolean; }; export const TransactionInsightDetailsOverviewTab: React.FC = ({ @@ -68,6 +69,7 @@ export const TransactionInsightDetailsOverviewTab: React.FC = ({ statements, setTimeScale, hasAdminRole, + maxRequestsReached, }) => { const [insightsSortSetting, setInsightsSortSetting] = useState({ ascending: false, @@ -76,8 +78,9 @@ export const TransactionInsightDetailsOverviewTab: React.FC = ({ const isCockroachCloud = useContext(CockroachCloudContext); const queryFromStmts = statements?.map(s => s.query)?.join("\n"); - const insightQueries = - queryFromStmts ?? txnDetails?.query ?? "Insight not found."; + const insightQueries = queryFromStmts?.length + ? queryFromStmts + : txnDetails?.query ?? "Insight not found."; const insightsColumns = makeInsightsColumns( isCockroachCloud, hasAdminRole, @@ -106,19 +109,19 @@ export const TransactionInsightDetailsOverviewTab: React.FC = ({ return (
- InsightsError(errors?.txnDetailsErr?.message)} - > - {txnDetails && ( -
- - - - - +
+ InsightsError(errors?.txnDetailsErr?.message)} + > + + + + + + {txnDetails && ( <> @@ -200,12 +203,12 @@ export const TransactionInsightDetailsOverviewTab: React.FC = ({ -
- )} - + )} + +
{hasContentionInsights && ( InsightsError(errors?.contentionErr?.message)} diff --git a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetailsStmtsTab.tsx b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetailsStmtsTab.tsx index e2e9d0980636..631d986a95d0 100644 --- a/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetailsStmtsTab.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/insights/workloadInsightDetails/transactionInsightDetailsStmtsTab.tsx @@ -11,7 +11,7 @@ import React from "react"; import { Link } from "react-router-dom"; import { ColumnDescriptor, SortedTable } from "src/sortedtable"; -import { StmtInsightEvent, TxnInsightDetails } from "../types"; +import { StmtInsightEvent } from "../types"; import { InsightCell } from "../workloadInsights/util/insightCell"; import { DATE_WITH_SECONDS_AND_MILLISECONDS_FORMAT, @@ -80,22 +80,24 @@ const stmtColumns: ColumnDescriptor[] = [ ]; type Props = { - insightDetails: TxnInsightDetails; + isLoading: boolean; + statements: StmtInsightEvent[] | null; error: Error; }; export const TransactionInsightsDetailsStmtsTab: React.FC = ({ - insightDetails, + isLoading, error, + statements, }) => { return ( InsightsError(error?.message)} > - + ); }; diff --git a/pkg/util/metric/aggmetric/agg_metric.go b/pkg/util/metric/aggmetric/agg_metric.go index ab5ad03ce5b6..9a841864f713 100644 --- a/pkg/util/metric/aggmetric/agg_metric.go +++ b/pkg/util/metric/aggmetric/agg_metric.go @@ -48,6 +48,11 @@ func (b Builder) Counter(metadata metric.Metadata) *AggCounter { return NewCounter(metadata, b.labels...) } +// CounterFloat64 constructs a new AggCounter with the Builder's labels. +func (b Builder) CounterFloat64(metadata metric.Metadata) *AggCounterFloat64 { + return NewCounterFloat64(metadata, b.labels...) +} + // Histogram constructs a new AggHistogram with the Builder's labels. func (b Builder) Histogram(opts metric.HistogramOptions) *AggHistogram { return NewHistogram(opts, b.labels...) diff --git a/pkg/util/metric/aggmetric/agg_metric_test.go b/pkg/util/metric/aggmetric/agg_metric_test.go index 605f7276b2f6..73652961ddea 100644 --- a/pkg/util/metric/aggmetric/agg_metric_test.go +++ b/pkg/util/metric/aggmetric/agg_metric_test.go @@ -54,6 +54,11 @@ func TestAggMetric(t *testing.T) { }, "tenant_id") r.AddMetric(c) + d := aggmetric.NewCounterFloat64(metric.Metadata{ + Name: "fob_counter", + }, "tenant_id") + r.AddMetric(d) + g := aggmetric.NewGauge(metric.Metadata{ Name: "bar_gauge", }, "tenant_id") @@ -78,6 +83,8 @@ func TestAggMetric(t *testing.T) { tenant3 := roachpb.MustMakeTenantID(3) c2 := c.AddChild(tenant2.String()) c3 := c.AddChild(tenant3.String()) + d2 := d.AddChild(tenant2.String()) + d3 := d.AddChild(tenant3.String()) g2 := g.AddChild(tenant2.String()) g3 := g.AddChild(tenant3.String()) f2 := f.AddChild(tenant2.String()) @@ -88,6 +95,8 @@ func TestAggMetric(t *testing.T) { t.Run("basic", func(t *testing.T) { c2.Inc(2) c3.Inc(4) + d2.Inc(123456.5) + d3.Inc(789089.5) g2.Inc(2) g3.Inc(3) g3.Dec(1) @@ -105,6 +114,7 @@ func TestAggMetric(t *testing.T) { t.Run("destroy", func(t *testing.T) { g3.Unlink() c2.Unlink() + d3.Unlink() f3.Unlink() h3.Unlink() testFile := "destroy.txt" @@ -119,6 +129,9 @@ func TestAggMetric(t *testing.T) { require.Panics(t, func() { c.AddChild(tenant3.String()) }) + require.Panics(t, func() { + d.AddChild(tenant2.String()) + }) require.Panics(t, func() { g.AddChild(tenant2.String()) }) @@ -130,6 +143,7 @@ func TestAggMetric(t *testing.T) { t.Run("add after destroy", func(t *testing.T) { g3 = g.AddChild(tenant3.String()) c2 = c.AddChild(tenant2.String()) + d3 = d.AddChild(tenant3.String()) f3 = f.AddChild(tenant3.String()) h3 = h.AddChild(tenant3.String()) testFile := "add_after_destroy.txt" @@ -141,15 +155,23 @@ func TestAggMetric(t *testing.T) { t.Run("panic on label length mismatch", func(t *testing.T) { require.Panics(t, func() { c.AddChild() }) + require.Panics(t, func() { d.AddChild() }) require.Panics(t, func() { g.AddChild("", "") }) }) } +type Eacher interface { + Each( + labels []*prometheusgo.LabelPair, f func(metric *prometheusgo.Metric), + ) +} + func TestAggMetricBuilder(t *testing.T) { defer leaktest.AfterTest(t)() b := aggmetric.MakeBuilder("tenant_id") c := b.Counter(metric.Metadata{Name: "foo_counter"}) + d := b.CounterFloat64(metric.Metadata{Name: "fob_counter"}) g := b.Gauge(metric.Metadata{Name: "bar_gauge"}) f := b.GaugeFloat64(metric.Metadata{Name: "baz_gauge"}) h := b.Histogram(metric.HistogramOptions{ @@ -163,12 +185,20 @@ func TestAggMetricBuilder(t *testing.T) { for i := 5; i < 10; i++ { tenantLabel := roachpb.MustMakeTenantID(uint64(i)).String() c.AddChild(tenantLabel) + d.AddChild(tenantLabel) g.AddChild(tenantLabel) f.AddChild(tenantLabel) h.AddChild(tenantLabel) } - c.Each(nil, func(pm *prometheusgo.Metric) { - require.Equal(t, 1, len(pm.GetLabel())) - }) + for _, m := range [5]Eacher{ + c, d, g, f, h, + } { + numChildren := 0 + m.Each(nil, func(pm *prometheusgo.Metric) { + require.Equal(t, 1, len(pm.GetLabel())) + numChildren += 1 + }) + require.Equal(t, 5, numChildren) + } } diff --git a/pkg/util/metric/aggmetric/counter.go b/pkg/util/metric/aggmetric/counter.go index e36d4cb7a70a..a6b703ef1eb5 100644 --- a/pkg/util/metric/aggmetric/counter.go +++ b/pkg/util/metric/aggmetric/counter.go @@ -126,3 +126,118 @@ func (g *Counter) Inc(i int64) { g.parent.g.Inc(i) atomic.AddInt64(&g.value, i) } + +// AggCounterFloat64 maintains a value as the sum of its children. The counter will +// report to crdb-internal time series only the aggregate sum of all of its +// children, while its children are additionally exported to prometheus via the +// PrometheusIterable interface. +type AggCounterFloat64 struct { + g metric.CounterFloat64 + childSet +} + +var _ metric.Iterable = (*AggCounterFloat64)(nil) +var _ metric.PrometheusIterable = (*AggCounterFloat64)(nil) +var _ metric.PrometheusExportable = (*AggCounterFloat64)(nil) + +// NewCounterFloat64 constructs a new AggCounterFloat64. +func NewCounterFloat64(metadata metric.Metadata, childLabels ...string) *AggCounterFloat64 { + c := &AggCounterFloat64{g: *metric.NewCounterFloat64(metadata)} + c.init(childLabels) + return c +} + +// GetName is part of the metric.Iterable interface. +func (c *AggCounterFloat64) GetName() string { return c.g.GetName() } + +// GetHelp is part of the metric.Iterable interface. +func (c *AggCounterFloat64) GetHelp() string { return c.g.GetHelp() } + +// GetMeasurement is part of the metric.Iterable interface. +func (c *AggCounterFloat64) GetMeasurement() string { return c.g.GetMeasurement() } + +// GetUnit is part of the metric.Iterable interface. +func (c *AggCounterFloat64) GetUnit() metric.Unit { return c.g.GetUnit() } + +// GetMetadata is part of the metric.Iterable interface. +func (c *AggCounterFloat64) GetMetadata() metric.Metadata { return c.g.GetMetadata() } + +// Inspect is part of the metric.Iterable interface. +func (c *AggCounterFloat64) Inspect(f func(interface{})) { f(c) } + +// GetType is part of the metric.PrometheusExportable interface. +func (c *AggCounterFloat64) GetType() *io_prometheus_client.MetricType { + return c.g.GetType() +} + +// GetLabels is part of the metric.PrometheusExportable interface. +func (c *AggCounterFloat64) GetLabels() []*io_prometheus_client.LabelPair { + return c.g.GetLabels() +} + +// ToPrometheusMetric is part of the metric.PrometheusExportable interface. +func (c *AggCounterFloat64) ToPrometheusMetric() *io_prometheus_client.Metric { + return c.g.ToPrometheusMetric() +} + +// Count returns the aggregate count of all of its current and past children. +func (c *AggCounterFloat64) Count() float64 { + return c.g.Count() +} + +// AddChild adds a Counter to this AggCounter. This method panics if a Counter +// already exists for this set of labelVals. +func (c *AggCounterFloat64) AddChild(labelVals ...string) *CounterFloat64 { + child := &CounterFloat64{ + parent: c, + labelValuesSlice: labelValuesSlice(labelVals), + } + c.add(child) + return child +} + +// CounterFloat64 is a child of a AggCounter. When it is incremented, so too is the +// parent. When metrics are collected by prometheus, each of the children will +// appear with a distinct label, however, when cockroach internally collects +// metrics, only the parent is collected. +type CounterFloat64 struct { + parent *AggCounterFloat64 + labelValuesSlice + value metric.CounterFloat64 +} + +// ToPrometheusMetric constructs a prometheus metric for this Counter. +func (g *CounterFloat64) ToPrometheusMetric() *io_prometheus_client.Metric { + return &io_prometheus_client.Metric{ + Counter: &io_prometheus_client.Counter{ + Value: proto.Float64(g.Value()), + }, + } +} + +// Unlink unlinks this child from the parent, i.e. the parent will no longer +// track this child (i.e. won't generate labels for it, etc). However, the child +// will continue to be functional and reference the parent, meaning updates to +// it will be reflected in the aggregate stored in the parent. +// +// See tenantrate.TestUseAfterRelease. +func (g *CounterFloat64) Unlink() { + g.parent.remove(g) +} + +// Value returns the AggCounter's current value. +func (g *CounterFloat64) Value() float64 { + return g.value.Count() +} + +// Inc increments the AggCounter's value. +func (g *CounterFloat64) Inc(i float64) { + g.parent.g.Inc(i) + g.value.Inc(i) +} + +// UpdateIfHigher sets the counter's value only if it's higher +// than the currently set one. It's assumed the caller holds +func (g *CounterFloat64) UpdateIfHigher(i float64) { + g.value.UpdateIfHigher(i) +} diff --git a/pkg/util/metric/aggmetric/testdata/add_after_destroy.txt b/pkg/util/metric/aggmetric/testdata/add_after_destroy.txt index 45b67cb0a546..823da0aa06b8 100644 --- a/pkg/util/metric/aggmetric/testdata/add_after_destroy.txt +++ b/pkg/util/metric/aggmetric/testdata/add_after_destroy.txt @@ -6,6 +6,9 @@ bar_gauge{tenant_id="3"} 0 baz_gauge 4 baz_gauge{tenant_id="2"} 1.5 baz_gauge{tenant_id="3"} 0 +fob_counter 912546 +fob_counter{tenant_id="2"} 123456.5 +fob_counter{tenant_id="3"} 0 foo_counter 6 foo_counter{tenant_id="2"} 0 foo_counter{tenant_id="3"} 4 diff --git a/pkg/util/metric/aggmetric/testdata/basic.txt b/pkg/util/metric/aggmetric/testdata/basic.txt index c8c4533fbbb3..fe5a58971444 100644 --- a/pkg/util/metric/aggmetric/testdata/basic.txt +++ b/pkg/util/metric/aggmetric/testdata/basic.txt @@ -6,6 +6,9 @@ bar_gauge{tenant_id="3"} 2 baz_gauge 4 baz_gauge{tenant_id="2"} 1.5 baz_gauge{tenant_id="3"} 2.5 +fob_counter 912546 +fob_counter{tenant_id="2"} 123456.5 +fob_counter{tenant_id="3"} 789089.5 foo_counter 6 foo_counter{tenant_id="2"} 2 foo_counter{tenant_id="3"} 4 diff --git a/pkg/util/metric/aggmetric/testdata/destroy.txt b/pkg/util/metric/aggmetric/testdata/destroy.txt index c7ad402b85a4..d8b22e618dcf 100644 --- a/pkg/util/metric/aggmetric/testdata/destroy.txt +++ b/pkg/util/metric/aggmetric/testdata/destroy.txt @@ -4,6 +4,8 @@ bar_gauge 4 bar_gauge{tenant_id="2"} 2 baz_gauge 4 baz_gauge{tenant_id="2"} 1.5 +fob_counter 912546 +fob_counter{tenant_id="2"} 123456.5 foo_counter 6 foo_counter{tenant_id="3"} 4 histo_gram_bucket{le="+Inf"} 2 diff --git a/pkg/util/metric/metric.go b/pkg/util/metric/metric.go index afd19cf83638..3b3b806404fd 100644 --- a/pkg/util/metric/metric.go +++ b/pkg/util/metric/metric.go @@ -578,6 +578,63 @@ func (c *Counter) GetMetadata() Metadata { return baseMetadata } +type CounterFloat64 struct { + Metadata + count syncutil.AtomicFloat64 +} + +// GetMetadata returns the metric's metadata including the Prometheus +// MetricType. +func (c *CounterFloat64) GetMetadata() Metadata { + baseMetadata := c.Metadata + baseMetadata.MetricType = prometheusgo.MetricType_COUNTER + return baseMetadata +} + +func (c *CounterFloat64) Clear() { + syncutil.StoreFloat64(&c.count, 0) +} + +func (c *CounterFloat64) Count() float64 { + return syncutil.LoadFloat64(&c.count) +} + +func (c *CounterFloat64) Inc(i float64) { + syncutil.AddFloat64(&c.count, i) +} + +func (c *CounterFloat64) UpdateIfHigher(i float64) { + syncutil.StoreFloat64IfHigher(&c.count, i) +} + +func (c *CounterFloat64) Snapshot() *CounterFloat64 { + newCounter := NewCounterFloat64(c.Metadata) + syncutil.StoreFloat64(&newCounter.count, c.Count()) + return newCounter +} + +// GetType returns the prometheus type enum for this metric. +func (c *CounterFloat64) GetType() *prometheusgo.MetricType { + return prometheusgo.MetricType_COUNTER.Enum() +} + +// ToPrometheusMetric returns a filled-in prometheus metric of the right type. +func (c *CounterFloat64) ToPrometheusMetric() *prometheusgo.Metric { + return &prometheusgo.Metric{ + Counter: &prometheusgo.Counter{Value: proto.Float64(c.Count())}, + } +} + +// MarshalJSON marshals to JSON. +func (c *CounterFloat64) MarshalJSON() ([]byte, error) { + return json.Marshal(c.Count()) +} + +// NewCounterFloat64 creates a counter. +func NewCounterFloat64(metadata Metadata) *CounterFloat64 { + return &CounterFloat64{Metadata: metadata} +} + // A Gauge atomically stores a single integer value. type Gauge struct { Metadata diff --git a/pkg/util/metric/metric_test.go b/pkg/util/metric/metric_test.go index d8a451ec06ad..bee57d768b8a 100644 --- a/pkg/util/metric/metric_test.go +++ b/pkg/util/metric/metric_test.go @@ -95,6 +95,34 @@ func TestCounter(t *testing.T) { testMarshal(t, c, "90") } +func TestCounterFloat64(t *testing.T) { + g := NewCounterFloat64(emptyMetadata) + g.UpdateIfHigher(10) + if v := g.Count(); v != 10 { + t.Fatalf("unexpected value: %f", v) + } + testMarshal(t, g, "10") + + var wg sync.WaitGroup + for i := int64(0); i < 10; i++ { + wg.Add(1) + go func(i int64) { g.Inc(float64(i)); wg.Done() }(i) + } + wg.Wait() + if v := g.Count(); math.Abs(v-55.0) > 0.001 { + t.Fatalf("unexpected value: %g", v) + } + + for i := int64(55); i < 65; i++ { + wg.Add(1) + go func(i int64) { g.UpdateIfHigher(float64(i)); wg.Done() }(i) + } + wg.Wait() + if v := g.Count(); math.Abs(v-64.0) > 0.001 { + t.Fatalf("unexpected value: %g", v) + } +} + func setNow(d time.Duration) { now = func() time.Time { return time.Time{}.Add(d) diff --git a/pkg/util/syncutil/atomic.go b/pkg/util/syncutil/atomic.go index e0357331fc49..419d6b74f62d 100644 --- a/pkg/util/syncutil/atomic.go +++ b/pkg/util/syncutil/atomic.go @@ -31,6 +31,32 @@ func LoadFloat64(addr *AtomicFloat64) (val float64) { return math.Float64frombits(atomic.LoadUint64((*uint64)(addr))) } +func AddFloat64(addr *AtomicFloat64, add float64) (val float64) { + for { + oldFloat := LoadFloat64(addr) + oldInt := math.Float64bits(oldFloat) + newFloat := oldFloat + add + newInt := math.Float64bits(newFloat) + if atomic.CompareAndSwapUint64((*uint64)(addr), oldInt, newInt) { + return + } + } +} + +func StoreFloat64IfHigher(addr *AtomicFloat64, new float64) (val float64) { + for { + oldFloat := LoadFloat64(addr) + if oldFloat > new { + return + } + oldInt := math.Float64bits(oldFloat) + newInt := math.Float64bits(new) + if atomic.CompareAndSwapUint64((*uint64)(addr), oldInt, newInt) { + return + } + } +} + // AtomicBool mimics an atomic boolean. type AtomicBool uint32 diff --git a/pkg/util/syncutil/atomic_test.go b/pkg/util/syncutil/atomic_test.go index 0e68dbf0452c..b95a783d277e 100644 --- a/pkg/util/syncutil/atomic_test.go +++ b/pkg/util/syncutil/atomic_test.go @@ -11,6 +11,7 @@ package syncutil import ( + "math" "testing" "github.com/stretchr/testify/require" @@ -33,8 +34,8 @@ func TestAtomicFloat64(t *testing.T) { } x.before = magic64 x.after = magic64 - for delta := uint64(1); delta+delta > delta; delta += delta { - e := float64(delta) + for delta := float64(1); delta+delta > delta; delta += delta { + e := delta StoreFloat64(&x.i, e) a := LoadFloat64(&x.i) if a != e { @@ -46,6 +47,60 @@ func TestAtomicFloat64(t *testing.T) { } } +// TestAtomicStoreFloat64IfHigher is also adapted from https://golang.org/src/sync/atomic/atomic_test.go +func TestAtomicStoreFloat64IfHigher(t *testing.T) { + var x struct { + before AtomicFloat64 + i AtomicFloat64 + after AtomicFloat64 + } + x.before = magic64 + x.after = magic64 + + // Roughly half the time we will have to store a larger value. + StoreFloat64(&x.i, math.MaxFloat64/math.Pow(2, 500)) + for delta := float64(1); delta+delta > delta; delta += delta { + e := delta + cur := LoadFloat64(&x.i) + shouldStore := e > cur + StoreFloat64IfHigher(&x.i, e) + afterStore := LoadFloat64(&x.i) + if shouldStore && e != afterStore { + t.Fatalf("should store: expected=%f got=%f", e, afterStore) + } + if !shouldStore && cur != afterStore { + t.Fatalf("should not store: expected=%f got=%f", cur, afterStore) + } + StoreFloat64(&x.i, math.MaxFloat64/math.Pow(2, 500)) + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + +// TestAtomicAddFloat64 is also adapted from https://golang.org/src/sync/atomic/atomic_test.go +func TestAtomicAddFloat64(t *testing.T) { + var x struct { + before AtomicFloat64 + i AtomicFloat64 + after AtomicFloat64 + } + x.before = magic64 + x.after = magic64 + j := LoadFloat64(&x.i) + for delta := float64(1); delta+delta > delta; delta += delta { + AddFloat64(&x.i, delta) + j += delta + got := LoadFloat64(&x.i) + if j != LoadFloat64(&x.i) { + t.Fatalf("expected=%f got=%f", j, got) + } + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + func TestAtomicBool(t *testing.T) { var x AtomicBool x.Set(true)