diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 5292ac482cff..e9ff77d8d8b5 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -129,6 +129,7 @@ sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enable
sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode
sql.stats.automatic_collection.fraction_stale_rows float 0.2 target fraction of stale rows per table that will trigger a statistics refresh
sql.stats.automatic_collection.min_stale_rows integer 500 target minimum number of stale rows per table that will trigger a statistics refresh
+sql.stats.flush.interval duration 1h0m0s the interval at which SQL execution statistics are flushed to disk
sql.stats.histogram_collection.enabled boolean true histogram collection mode
sql.stats.multi_column_collection.enabled boolean true multi-column statistics collection mode
sql.stats.post_events.enabled boolean false if set, an event is logged for every CREATE STATISTICS job
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index e8e8c38317bd..ababff6e9b85 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -133,6 +133,7 @@
sql.stats.automatic_collection.enabled | boolean | true | automatic statistics collection mode |
sql.stats.automatic_collection.fraction_stale_rows | float | 0.2 | target fraction of stale rows per table that will trigger a statistics refresh |
sql.stats.automatic_collection.min_stale_rows | integer | 500 | target minimum number of stale rows per table that will trigger a statistics refresh |
+sql.stats.flush.interval | duration | 1h0m0s | the interval at which SQL execution statistics are flushed to disk |
sql.stats.histogram_collection.enabled | boolean | true | histogram collection mode |
sql.stats.multi_column_collection.enabled | boolean | true | multi-column statistics collection mode |
sql.stats.post_events.enabled | boolean | false | if set, an event is logged for every CREATE STATISTICS job |
diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel
index d83a6e7ef1b2..85728c03c0b1 100644
--- a/pkg/BUILD.bazel
+++ b/pkg/BUILD.bazel
@@ -295,6 +295,7 @@ ALL_TESTS = [
"//pkg/sql/sqlliveness/slinstance:slinstance_test",
"//pkg/sql/sqlliveness/slstorage:slstorage_test",
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil:sqlstatsutil_test",
+ "//pkg/sql/sqlstats/persistedsqlstats:persistedsqlstats_test",
"//pkg/sql/stats:stats_test",
"//pkg/sql/stmtdiagnostics:stmtdiagnostics_test",
"//pkg/sql/tests:tests_test",
diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go
index b903e8f0ebb1..467eb900c11a 100644
--- a/pkg/base/testing_knobs.go
+++ b/pkg/base/testing_knobs.go
@@ -39,4 +39,5 @@ type TestingKnobs struct {
BackupRestore ModuleTestingKnobs
MigrationManager ModuleTestingKnobs
IndexUsageStatsKnobs ModuleTestingKnobs
+ SQLStatsKnobs ModuleTestingKnobs
}
diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go
index 965e2f531566..a56c99b5c428 100644
--- a/pkg/ccl/backupccl/backup_planning.go
+++ b/pkg/ccl/backupccl/backup_planning.go
@@ -328,7 +328,9 @@ func spansForAllTableIndexes(
checkForKVInBounds := func(start, end roachpb.Key, endTime hlc.Timestamp) (bool, error) {
var foundKV bool
err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
- txn.SetFixedTimestamp(ctx, endTime)
+ if err := txn.SetFixedTimestamp(ctx, endTime); err != nil {
+ return err
+ }
res, err := txn.Scan(ctx, start, end, 1 /* maxRows */)
if err != nil {
return err
diff --git a/pkg/ccl/backupccl/backupresolver/targets.go b/pkg/ccl/backupccl/backupresolver/targets.go
index 03b64e66bfa3..e9ece2248c56 100644
--- a/pkg/ccl/backupccl/backupresolver/targets.go
+++ b/pkg/ccl/backupccl/backupresolver/targets.go
@@ -571,8 +571,11 @@ func LoadAllDescs(
var allDescs []catalog.Descriptor
if err := db.Txn(
ctx,
- func(ctx context.Context, txn *kv.Txn) (err error) {
- txn.SetFixedTimestamp(ctx, asOf)
+ func(ctx context.Context, txn *kv.Txn) error {
+ err := txn.SetFixedTimestamp(ctx, asOf)
+ if err != nil {
+ return err
+ }
allDescs, err = catalogkv.GetAllDescriptors(
ctx, txn, codec, true, /* shouldRunPostDeserializationChanges */
)
diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go
index 3e6b617309fb..ee5f6bfe1ed3 100644
--- a/pkg/ccl/changefeedccl/changefeed_dist.go
+++ b/pkg/ccl/changefeedccl/changefeed_dist.go
@@ -129,7 +129,9 @@ func fetchSpansForTargets(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
spans = nil
- txn.SetFixedTimestamp(ctx, ts)
+ if err := txn.SetFixedTimestamp(ctx, ts); err != nil {
+ return err
+ }
// Note that all targets are currently guaranteed to be tables.
for tableID := range targets {
flags := tree.ObjectLookupFlagsWithRequired()
diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go
index cd42b11ef699..3c098202fefd 100644
--- a/pkg/ccl/changefeedccl/kvfeed/scanner.go
+++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go
@@ -103,7 +103,9 @@ func (p *scanRequestScanner) exportSpan(
if log.V(2) {
log.Infof(ctx, `sending ScanRequest %s at %s`, span, ts)
}
- txn.SetFixedTimestamp(ctx, ts)
+ if err := txn.SetFixedTimestamp(ctx, ts); err != nil {
+ return err
+ }
stopwatchStart := timeutil.Now()
var scanDuration, bufferDuration time.Duration
const targetBytesPerScan = 16 << 20 // 16 MiB
diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go
index 63d6d715a60f..6e8eb87b0c70 100644
--- a/pkg/ccl/changefeedccl/rowfetcher_cache.go
+++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go
@@ -101,8 +101,10 @@ func (c *rowFetcherCache) TableDescForKey(
// descs.Collection directly here.
// TODO (SQL Schema): #53751.
if err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
- txn.SetFixedTimestamp(ctx, ts)
- var err error
+ err := txn.SetFixedTimestamp(ctx, ts)
+ if err != nil {
+ return err
+ }
tableDesc, err = c.collection.GetImmutableTableByID(ctx, txn, tableID, tree.ObjectLookupFlags{})
return err
}); err != nil {
diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go
index 43303029a2f4..041f94475016 100644
--- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go
+++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go
@@ -274,7 +274,9 @@ func (tf *schemaFeed) primeInitialTableDescs(ctx context.Context) error {
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
initialDescs = initialDescs[:0]
- txn.SetFixedTimestamp(ctx, initialTableDescTs)
+ if err := txn.SetFixedTimestamp(ctx, initialTableDescTs); err != nil {
+ return err
+ }
// Note that all targets are currently guaranteed to be tables.
for tableID := range tf.targets {
flags := tree.ObjectLookupFlagsWithRequired()
diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
index 5a6fbc6a5023..cf11f95a3503 100644
--- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
+++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
@@ -400,11 +400,11 @@ func TestOracle(t *testing.T) {
c := kv.NewDB(log.AmbientContext{Tracer: tracing.NewTracer()}, kv.MockTxnSenderFactory{}, clock, stopper)
staleTxn := kv.NewTxn(ctx, c, 0)
- staleTxn.SetFixedTimestamp(ctx, stale)
+ require.NoError(t, staleTxn.SetFixedTimestamp(ctx, stale))
currentTxn := kv.NewTxn(ctx, c, 0)
- currentTxn.SetFixedTimestamp(ctx, current)
+ require.NoError(t, currentTxn.SetFixedTimestamp(ctx, current))
futureTxn := kv.NewTxn(ctx, c, 0)
- futureTxn.SetFixedTimestamp(ctx, future)
+ require.NoError(t, futureTxn.SetFixedTimestamp(ctx, future))
nodes := mockNodeStore{
{NodeID: 1, Address: util.MakeUnresolvedAddr("tcp", "1")},
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
index a2d1269eacba..fc2cb46a7a5b 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
@@ -189,7 +189,7 @@ func (ds *DistSender) partialRangeFeed(
case roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT,
roachpb.RangeFeedRetryError_REASON_RANGE_MERGED,
roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER:
- // Evict the decriptor from the cache.
+ // Evict the descriptor from the cache.
rangeInfo.token.Evict(ctx)
return ds.divideAndSendRangeFeedToRanges(ctx, rangeInfo.rs, ts, rangeCh)
default:
@@ -204,7 +204,7 @@ func (ds *DistSender) partialRangeFeed(
}
// singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed
-// RPC call. Results will be send on the provided channel. Returns the timestamp
+// RPC call. Results will be sent on the provided channel. Returns the timestamp
// of the maximum rangefeed checkpoint seen, which can be used to re-establish
// the rangefeed with a larger starting timestamp, reflecting the fact that all
// values up to the last checkpoint have already been observed. Returns the
diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go
index 1525b79b8157..be30aa59d065 100644
--- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go
+++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go
@@ -1008,9 +1008,19 @@ func (tc *TxnCoordSender) CommitTimestampFixed() bool {
}
// SetFixedTimestamp is part of the client.TxnSender interface.
-func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) {
+func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error {
tc.mu.Lock()
defer tc.mu.Unlock()
+ // The transaction must not have already been used in this epoch.
+ if !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty() {
+ return errors.WithContextTags(errors.AssertionFailedf(
+ "cannot set fixed timestamp, txn %s already performed reads", tc.mu.txn), ctx)
+ }
+ if tc.mu.txn.Sequence != 0 {
+ return errors.WithContextTags(errors.AssertionFailedf(
+ "cannot set fixed timestamp, txn %s already performed writes", tc.mu.txn), ctx)
+ }
+
tc.mu.txn.ReadTimestamp = ts
tc.mu.txn.WriteTimestamp = ts
tc.mu.txn.GlobalUncertaintyLimit = ts
@@ -1019,6 +1029,7 @@ func (tc *TxnCoordSender) SetFixedTimestamp(ctx context.Context, ts hlc.Timestam
// Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed
// timestamp. This ensures that the MinTimestamp is always <= the other timestamps.
tc.mu.txn.MinTimestamp.Backward(ts)
+ return nil
}
// RequiredFrontier is part of the client.TxnSender interface.
diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
index c19659d09041..3bc03b680640 100644
--- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
+++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
@@ -2672,3 +2672,91 @@ func TestTxnManualRefresh(t *testing.T) {
})
}
}
+
+// TestTxnCoordSenderSetFixedTimestamp tests that SetFixedTimestamp cannot be
+// called after a transaction has already been used in the current epoch to read
+// or write.
+func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+
+ for _, test := range []struct {
+ name string
+ before func(*testing.T, *kv.Txn)
+ expErr string
+ }{
+ {
+ name: "nothing before",
+ before: func(t *testing.T, txn *kv.Txn) {},
+ },
+ {
+ name: "read before",
+ before: func(t *testing.T, txn *kv.Txn) {
+ _, err := txn.Get(ctx, "k")
+ require.NoError(t, err)
+ },
+ expErr: "cannot set fixed timestamp, .* already performed reads",
+ },
+ {
+ name: "write before",
+ before: func(t *testing.T, txn *kv.Txn) {
+ require.NoError(t, txn.Put(ctx, "k", "v"))
+ },
+ expErr: "cannot set fixed timestamp, .* already performed writes",
+ },
+ {
+ name: "read and write before",
+ before: func(t *testing.T, txn *kv.Txn) {
+ _, err := txn.Get(ctx, "k")
+ require.NoError(t, err)
+ require.NoError(t, txn.Put(ctx, "k", "v"))
+ },
+ expErr: "cannot set fixed timestamp, .* already performed reads",
+ },
+ {
+ name: "read before, in prior epoch",
+ before: func(t *testing.T, txn *kv.Txn) {
+ _, err := txn.Get(ctx, "k")
+ require.NoError(t, err)
+ txn.ManualRestart(ctx, txn.ReadTimestamp().Next())
+ },
+ },
+ {
+ name: "write before, in prior epoch",
+ before: func(t *testing.T, txn *kv.Txn) {
+ require.NoError(t, txn.Put(ctx, "k", "v"))
+ txn.ManualRestart(ctx, txn.ReadTimestamp().Next())
+ },
+ },
+ {
+ name: "read and write before, in prior epoch",
+ before: func(t *testing.T, txn *kv.Txn) {
+ _, err := txn.Get(ctx, "k")
+ require.NoError(t, err)
+ require.NoError(t, txn.Put(ctx, "k", "v"))
+ txn.ManualRestart(ctx, txn.ReadTimestamp().Next())
+ },
+ },
+ } {
+ t.Run(test.name, func(t *testing.T) {
+ s := createTestDB(t)
+ defer s.Stop()
+
+ txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
+ test.before(t, txn)
+
+ ts := s.Clock.Now()
+ err := txn.SetFixedTimestamp(ctx, ts)
+ if test.expErr != "" {
+ require.Error(t, err)
+ require.Regexp(t, test.expErr, err)
+ require.False(t, txn.CommitTimestampFixed())
+ } else {
+ require.NoError(t, err)
+ require.True(t, txn.CommitTimestampFixed())
+ require.Equal(t, ts, txn.CommitTimestamp())
+ }
+ })
+ }
+}
diff --git a/pkg/kv/kvclient/rangefeed/db_adapter.go b/pkg/kv/kvclient/rangefeed/db_adapter.go
index 5e1cbfaad504..7bf47154392c 100644
--- a/pkg/kv/kvclient/rangefeed/db_adapter.go
+++ b/pkg/kv/kvclient/rangefeed/db_adapter.go
@@ -74,7 +74,9 @@ func (dbc *dbAdapter) Scan(
ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue),
) error {
return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
- txn.SetFixedTimestamp(ctx, asOf)
+ if err := txn.SetFixedTimestamp(ctx, asOf); err != nil {
+ return err
+ }
sp := span
var b kv.Batch
for {
diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go
index 29d102407c1b..7dad74e8e880 100644
--- a/pkg/kv/kvserver/client_replica_test.go
+++ b/pkg/kv/kvserver/client_replica_test.go
@@ -3174,7 +3174,7 @@ func TestStrictGCEnforcement(t *testing.T) {
}
mkStaleTxn = func() *kv.Txn {
txn := db.NewTxn(ctx, "foo")
- txn.SetFixedTimestamp(ctx, tenSecondsAgo)
+ require.NoError(t, txn.SetFixedTimestamp(ctx, tenSecondsAgo))
return txn
}
getRejectedMsg = func() string {
diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go
index 0d56a46f0e63..bc1f835116ed 100644
--- a/pkg/kv/kvserver/rangefeed/processor.go
+++ b/pkg/kv/kvserver/rangefeed/processor.go
@@ -50,8 +50,9 @@ func newErrBufferCapacityExceeded() *roachpb.Error {
// Config encompasses the configuration required to create a Processor.
type Config struct {
log.AmbientContext
- Clock *hlc.Clock
- Span roachpb.RSpan
+ Clock *hlc.Clock
+ RangeID roachpb.RangeID
+ Span roachpb.RSpan
TxnPusher TxnPusher
// PushTxnsInterval specifies the interval at which a Processor will push
@@ -193,7 +194,7 @@ type IteratorConstructor func() storage.SimpleMVCCIterator
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) {
ctx := p.AnnotateCtx(context.Background())
if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) {
- p.run(ctx, rtsIterFunc, stopper)
+ p.run(ctx, p.RangeID, rtsIterFunc, stopper)
}); err != nil {
pErr := roachpb.NewError(err)
p.reg.DisconnectWithErr(all, pErr)
@@ -203,7 +204,10 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor
// run is called from Start and runs the rangefeed.
func (p *Processor) run(
- ctx context.Context, rtsIterFunc IteratorConstructor, stopper *stop.Stopper,
+ ctx context.Context,
+ _forStacks roachpb.RangeID,
+ rtsIterFunc IteratorConstructor,
+ stopper *stop.Stopper,
) {
defer close(p.stoppedC)
ctx, cancelOutputLoops := context.WithCancel(ctx)
@@ -256,7 +260,7 @@ func (p *Processor) run(
// Run an output loop for the registry.
runOutputLoop := func(ctx context.Context) {
- r.runOutputLoop(ctx)
+ r.runOutputLoop(ctx, p.RangeID)
select {
case p.unregC <- &r:
case <-p.stoppedC:
diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go
index d6041ad62faf..cff3d8ce6af5 100644
--- a/pkg/kv/kvserver/rangefeed/registry.go
+++ b/pkg/kv/kvserver/rangefeed/registry.go
@@ -264,7 +264,7 @@ func (r *registration) outputLoop(ctx context.Context) error {
}
}
-func (r *registration) runOutputLoop(ctx context.Context) {
+func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.RangeID) {
r.mu.Lock()
ctx, r.mu.outputLoopCancelFn = context.WithCancel(ctx)
r.mu.Unlock()
diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go
index c52983006bcf..ced4abaabdd9 100644
--- a/pkg/kv/kvserver/rangefeed/registry_test.go
+++ b/pkg/kv/kvserver/rangefeed/registry_test.go
@@ -142,7 +142,7 @@ func TestRegistrationBasic(t *testing.T) {
noCatchupReg.publish(ev1)
noCatchupReg.publish(ev2)
require.Equal(t, len(noCatchupReg.buf), 2)
- go noCatchupReg.runOutputLoop(context.Background())
+ go noCatchupReg.runOutputLoop(context.Background(), 0)
require.NoError(t, noCatchupReg.waitForCaughtUp())
require.Equal(t, []*roachpb.RangeFeedEvent{ev1, ev2}, noCatchupReg.stream.Events())
noCatchupReg.disconnect(nil)
@@ -158,7 +158,7 @@ func TestRegistrationBasic(t *testing.T) {
catchupReg.publish(ev1)
catchupReg.publish(ev2)
require.Equal(t, len(catchupReg.buf), 2)
- go catchupReg.runOutputLoop(context.Background())
+ go catchupReg.runOutputLoop(context.Background(), 0)
require.NoError(t, catchupReg.waitForCaughtUp())
events := catchupReg.stream.Events()
require.Equal(t, 6, len(events))
@@ -171,7 +171,7 @@ func TestRegistrationBasic(t *testing.T) {
disconnectReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false)
disconnectReg.publish(ev1)
disconnectReg.publish(ev2)
- go disconnectReg.runOutputLoop(context.Background())
+ go disconnectReg.runOutputLoop(context.Background(), 0)
require.NoError(t, disconnectReg.waitForCaughtUp())
discErr := roachpb.NewError(fmt.Errorf("disconnection error"))
disconnectReg.disconnect(discErr)
@@ -183,7 +183,7 @@ func TestRegistrationBasic(t *testing.T) {
for i := 0; i < cap(overflowReg.buf)+3; i++ {
overflowReg.publish(ev1)
}
- go overflowReg.runOutputLoop(context.Background())
+ go overflowReg.runOutputLoop(context.Background(), 0)
err = <-overflowReg.errC
require.Equal(t, newErrBufferCapacityExceeded(), err)
require.Equal(t, cap(overflowReg.buf), len(overflowReg.Events()))
@@ -192,7 +192,7 @@ func TestRegistrationBasic(t *testing.T) {
streamErrReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false)
streamErr := fmt.Errorf("stream error")
streamErrReg.stream.SetSendErr(streamErr)
- go streamErrReg.runOutputLoop(context.Background())
+ go streamErrReg.runOutputLoop(context.Background(), 0)
streamErrReg.publish(ev1)
err = <-streamErrReg.errC
require.Equal(t, streamErr.Error(), err.GoError().Error())
@@ -200,7 +200,7 @@ func TestRegistrationBasic(t *testing.T) {
// Stream Context Canceled.
streamCancelReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false)
streamCancelReg.stream.Cancel()
- go streamCancelReg.runOutputLoop(context.Background())
+ go streamCancelReg.runOutputLoop(context.Background(), 0)
require.NoError(t, streamCancelReg.waitForCaughtUp())
err = <-streamCancelReg.errC
require.Equal(t, streamCancelReg.stream.Context().Err().Error(), err.GoError().Error())
@@ -337,10 +337,10 @@ func TestRegistryBasic(t *testing.T) {
rBC := newTestRegistration(spBC, hlc.Timestamp{}, nil, true /* withDiff */)
rCD := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */)
rAC := newTestRegistration(spAC, hlc.Timestamp{}, nil, false /* withDiff */)
- go rAB.runOutputLoop(context.Background())
- go rBC.runOutputLoop(context.Background())
- go rCD.runOutputLoop(context.Background())
- go rAC.runOutputLoop(context.Background())
+ go rAB.runOutputLoop(context.Background(), 0)
+ go rBC.runOutputLoop(context.Background(), 0)
+ go rCD.runOutputLoop(context.Background(), 0)
+ go rAC.runOutputLoop(context.Background(), 0)
defer rAB.disconnect(nil)
defer rBC.disconnect(nil)
defer rCD.disconnect(nil)
@@ -446,11 +446,11 @@ func TestRegistryPublishAssertsPopulatedInformation(t *testing.T) {
reg := makeRegistry()
rNoDiff := newTestRegistration(spAB, hlc.Timestamp{}, nil, false /* withDiff */)
- go rNoDiff.runOutputLoop(context.Background())
+ go rNoDiff.runOutputLoop(context.Background(), 0)
reg.Register(&rNoDiff.registration)
rWithDiff := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */)
- go rWithDiff.runOutputLoop(context.Background())
+ go rWithDiff.runOutputLoop(context.Background(), 0)
reg.Register(&rWithDiff.registration)
key := roachpb.Key("a")
@@ -498,7 +498,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) {
reg := makeRegistry()
r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, false)
- go r.runOutputLoop(context.Background())
+ go r.runOutputLoop(context.Background(), 0)
reg.Register(&r.registration)
// Publish a value with a timestamp beneath the registration's start
diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go
index a7a9c0d7a553..ae1becf2a18b 100644
--- a/pkg/kv/kvserver/replica_rangefeed.go
+++ b/pkg/kv/kvserver/replica_rangefeed.go
@@ -139,6 +139,14 @@ func (i iteratorWithCloser) Close() {
// of rangefeeds using catchup iterators at the same time.
func (r *Replica) RangeFeed(
args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer,
+) *roachpb.Error {
+ return r.rangeFeedWithRangeID(r.RangeID, args, stream)
+}
+
+func (r *Replica) rangeFeedWithRangeID(
+ _forStacks roachpb.RangeID,
+ args *roachpb.RangeFeedRequest,
+ stream roachpb.Internal_RangeFeedServer,
) *roachpb.Error {
if !r.isSystemRange() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) {
return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
@@ -352,6 +360,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
cfg := rangefeed.Config{
AmbientContext: r.AmbientContext,
Clock: r.Clock(),
+ RangeID: r.RangeID,
Span: desc.RSpan(),
TxnPusher: &tp,
PushTxnsInterval: r.store.TestingKnobs().RangeFeedPushTxnsInterval,
diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go
index 3676e178a0cb..aed2b116f2eb 100644
--- a/pkg/kv/kvserver/replica_rangefeed_test.go
+++ b/pkg/kv/kvserver/replica_rangefeed_test.go
@@ -220,7 +220,9 @@ func TestReplicaRangefeed(t *testing.T) {
// Insert a second key transactionally.
ts3 := initTime.Add(0, 3)
if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
- txn.SetFixedTimestamp(ctx, ts3)
+ if err := txn.SetFixedTimestamp(ctx, ts3); err != nil {
+ return err
+ }
return txn.Put(ctx, roachpb.Key("m"), []byte("val3"))
}); err != nil {
t.Fatal(err)
@@ -240,7 +242,9 @@ func TestReplicaRangefeed(t *testing.T) {
// Update the originally incremented key transactionally.
ts5 := initTime.Add(0, 5)
if err := store1.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
- txn.SetFixedTimestamp(ctx, ts5)
+ if err := txn.SetFixedTimestamp(ctx, ts5); err != nil {
+ return err
+ }
_, err := txn.Inc(ctx, incArgs.Key, 7)
return err
}); err != nil {
diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go
index d6e55d141b3d..6313e8cabb73 100644
--- a/pkg/kv/kvserver/replica_send.go
+++ b/pkg/kv/kvserver/replica_send.go
@@ -55,7 +55,7 @@ func (r *Replica) Send(
//
// github.com/cockroachdb/cockroach/pkg/storage.(*Replica).sendWithRangeID(0xc420d1a000, 0x64bfb80, 0xc421564b10, 0x15, 0x153fd4634aeb0193, 0x0, 0x100000001, 0x1, 0x15, 0x0, ...)
func (r *Replica) sendWithRangeID(
- ctx context.Context, rangeID roachpb.RangeID, ba *roachpb.BatchRequest,
+ ctx context.Context, _forStacks roachpb.RangeID, ba *roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
var br *roachpb.BatchResponse
if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {
diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go
index abb99de4ab0b..eacffe47115b 100644
--- a/pkg/kv/mock_transactional_sender.go
+++ b/pkg/kv/mock_transactional_sender.go
@@ -109,11 +109,11 @@ func (m *MockTransactionalSender) CommitTimestamp() hlc.Timestamp {
// CommitTimestampFixed is part of the TxnSender interface.
func (m *MockTransactionalSender) CommitTimestampFixed() bool {
- panic("unimplemented")
+ return m.txn.CommitTimestampFixed
}
// SetFixedTimestamp is part of the TxnSender interface.
-func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Timestamp) {
+func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Timestamp) error {
m.txn.WriteTimestamp = ts
m.txn.ReadTimestamp = ts
m.txn.GlobalUncertaintyLimit = ts
@@ -122,6 +122,7 @@ func (m *MockTransactionalSender) SetFixedTimestamp(_ context.Context, ts hlc.Ti
// Set the MinTimestamp to the minimum of the existing MinTimestamp and the fixed
// timestamp. This ensures that the MinTimestamp is always <= the other timestamps.
m.txn.MinTimestamp.Backward(ts)
+ return nil
}
// RequiredFrontier is part of the TxnSender interface.
diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go
index 5097cff213e1..9bf6cdf424a7 100644
--- a/pkg/kv/sender.go
+++ b/pkg/kv/sender.go
@@ -170,11 +170,12 @@ type TxnSender interface {
// such that the transaction can't be pushed to a different
// timestamp.
//
- // This is used to support historical queries (AS OF SYSTEM TIME
- // queries and backups). This method must be called on every
- // transaction retry (but note that retries should be rare for
- // read-only queries with no clock uncertainty).
- SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp)
+ // This is used to support historical queries (AS OF SYSTEM TIME queries
+ // and backups). This method must be called on every transaction retry
+ // (but note that retries should be rare for read-only queries with no
+ // clock uncertainty). The method must not be called after the
+ // transaction has been used in the current epoch to read or write.
+ SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error
// ManualRestart bumps the transactions epoch, and can upgrade the
// timestamp and priority.
diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go
index 688d0ee41d1c..2975e5b2f97c 100644
--- a/pkg/kv/txn.go
+++ b/pkg/kv/txn.go
@@ -328,6 +328,14 @@ func (txn *Txn) CommitTimestamp() hlc.Timestamp {
return txn.mu.sender.CommitTimestamp()
}
+// CommitTimestampFixed returns true if the commit timestamp has
+// been fixed to the start timestamp and cannot be pushed forward.
+func (txn *Txn) CommitTimestampFixed() bool {
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
+ return txn.mu.sender.CommitTimestampFixed()
+}
+
// ProvisionalCommitTimestamp returns the transaction's provisional
// commit timestamp. This can evolve throughout a txn's lifecycle. See
// the comment on the WriteTimestamp field of TxnMeta for details.
@@ -1179,16 +1187,19 @@ func (txn *Txn) recordPreviousTxnIDLocked(prevTxnID uuid.UUID) {
// This is used to support historical queries (AS OF SYSTEM TIME queries and
// backups). This method must be called on every transaction retry (but note
// that retries should be rare for read-only queries with no clock uncertainty).
-func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) {
+func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error {
if txn.typ != RootTxn {
- panic(errors.WithContextTags(
- errors.AssertionFailedf("SetFixedTimestamp() called on leaf txn"), ctx))
+ return errors.WithContextTags(errors.AssertionFailedf(
+ "SetFixedTimestamp() called on leaf txn"), ctx)
}
if ts.IsEmpty() {
- log.Fatalf(ctx, "empty timestamp is invalid for SetFixedTimestamp()")
+ return errors.WithContextTags(errors.AssertionFailedf(
+ "empty timestamp is invalid for SetFixedTimestamp()"), ctx)
}
- txn.mu.sender.SetFixedTimestamp(ctx, ts)
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
+ return txn.mu.sender.SetFixedTimestamp(ctx, ts)
}
// GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError that will
diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel
index dc18dc949792..a2d1e68fb3b9 100644
--- a/pkg/server/BUILD.bazel
+++ b/pkg/server/BUILD.bazel
@@ -145,6 +145,7 @@ go_library(
"//pkg/sql/sqlinstance/instanceprovider",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slprovider",
+ "//pkg/sql/sqlstats/persistedsqlstats",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/sql/stmtdiagnostics",
diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go
index 7a934c0d6dbb..69b8c5f24b4c 100644
--- a/pkg/server/server_sql.go
+++ b/pkg/server/server_sql.go
@@ -77,6 +77,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instanceprovider"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
@@ -548,6 +549,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
if cfg.TestingKnobs.JobsTestingKnobs != nil {
distSQLCfg.TestingKnobs.JobsTestingKnobs = cfg.TestingKnobs.JobsTestingKnobs
}
+
distSQLServer := distsql.NewServer(ctx, distSQLCfg, cfg.flowScheduler)
execinfrapb.RegisterDistSQLServer(cfg.grpcServer, distSQLServer)
@@ -708,6 +710,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
if indexUsageStatsKnobs := cfg.TestingKnobs.IndexUsageStatsKnobs; indexUsageStatsKnobs != nil {
execCfg.IndexUsageStatsTestingKnobs = indexUsageStatsKnobs.(*idxusage.TestingKnobs)
}
+ if sqlStatsKnobs := cfg.TestingKnobs.SQLStatsKnobs; sqlStatsKnobs != nil {
+ execCfg.SQLStatsTestingKnobs = sqlStatsKnobs.(*persistedsqlstats.TestingKnobs)
+ }
statsRefresher := stats.MakeRefresher(
cfg.Settings,
diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel
index da7cdb6573f3..16568354a35a 100644
--- a/pkg/sql/BUILD.bazel
+++ b/pkg/sql/BUILD.bazel
@@ -354,6 +354,7 @@ go_library(
"//pkg/sql/sqlfsm",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlstats",
+ "//pkg/sql/sqlstats/persistedsqlstats",
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil",
"//pkg/sql/sqlstats/sslocal",
"//pkg/sql/sqltelemetry",
diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go
index 7c8f9d1129bc..76633df7d470 100644
--- a/pkg/sql/backfill.go
+++ b/pkg/sql/backfill.go
@@ -186,7 +186,9 @@ func (sc *SchemaChanger) fixedTimestampTxn(
retryable func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error,
) error {
return sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
- txn.SetFixedTimestamp(ctx, readAsOf)
+ if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil {
+ return err
+ }
return retryable(ctx, txn, descriptors)
})
}
diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go
index 5a608e71db41..ad1a66ac1d70 100644
--- a/pkg/sql/catalog/lease/lease.go
+++ b/pkg/sql/catalog/lease/lease.go
@@ -733,7 +733,9 @@ func (m *Manager) resolveName(
if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil {
return err
}
- txn.SetFixedTimestamp(ctx, timestamp)
+ if err := txn.SetFixedTimestamp(ctx, timestamp); err != nil {
+ return err
+ }
var found bool
var err error
found, id, err = catalogkv.LookupObjectID(ctx, txn, m.storage.codec, parentID, parentSchemaID, name)
diff --git a/pkg/sql/catalog/lease/lease_internal_test.go b/pkg/sql/catalog/lease/lease_internal_test.go
index d9b3c345c91e..24a56d8730c3 100644
--- a/pkg/sql/catalog/lease/lease_internal_test.go
+++ b/pkg/sql/catalog/lease/lease_internal_test.go
@@ -303,7 +303,9 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
update := func(catalog.MutableDescriptor) error { return nil }
logEvent := func(txn *kv.Txn) error {
txn2 := kvDB.NewTxn(ctx, "future-read")
- txn2.SetFixedTimestamp(ctx, futureTime.Prev())
+ if err := txn2.SetFixedTimestamp(ctx, futureTime.Prev()); err != nil {
+ return err
+ }
if _, err := txn2.Get(ctx, "key"); err != nil {
return errors.Wrap(err, "read from other txn in future")
}
diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go
index 3b3a53e8c1b7..fb33df96758a 100644
--- a/pkg/sql/catalog/lease/lease_test.go
+++ b/pkg/sql/catalog/lease/lease_test.go
@@ -1685,7 +1685,7 @@ CREATE TABLE t.test0 (k CHAR PRIMARY KEY, v CHAR);
log.Infof(ctx, "checking version %d", table.GetVersion())
txn := kv.NewTxn(ctx, t.kvDB, roachpb.NodeID(0))
// Make the txn look back at the known modification timestamp.
- txn.SetFixedTimestamp(ctx, table.GetModificationTime())
+ require.NoError(t, txn.SetFixedTimestamp(ctx, table.GetModificationTime()))
// Look up the descriptor.
descKey := catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, descID)
diff --git a/pkg/sql/catalog/lease/storage.go b/pkg/sql/catalog/lease/storage.go
index 4123bba4b6e5..00b0eb5b0a9f 100644
--- a/pkg/sql/catalog/lease/storage.go
+++ b/pkg/sql/catalog/lease/storage.go
@@ -211,9 +211,12 @@ func (s storage) getForExpiration(
ctx context.Context, expiration hlc.Timestamp, id descpb.ID,
) (catalog.Descriptor, error) {
var desc catalog.Descriptor
- err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
+ err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
prevTimestamp := expiration.Prev()
- txn.SetFixedTimestamp(ctx, prevTimestamp)
+ err := txn.SetFixedTimestamp(ctx, prevTimestamp)
+ if err != nil {
+ return err
+ }
desc, err = catalogkv.MustGetDescriptorByID(ctx, txn, s.codec, id)
if err != nil {
return err
diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go
index 00f2fa1fceb8..1c7856ba2786 100644
--- a/pkg/sql/catalog/systemschema/system.go
+++ b/pkg/sql/catalog/systemschema/system.go
@@ -377,16 +377,13 @@ CREATE TABLE system.join_tokens (
CREATE TABLE system.statement_statistics (
aggregated_ts TIMESTAMPTZ NOT NULL,
fingerprint_id BYTES NOT NULL,
- plan_hash INT NOT NULL,
+ plan_hash INT8 NOT NULL,
app_name STRING NOT NULL,
- node_id INT NOT NULL,
+ node_id INT8 NOT NULL,
- count INT NOT NULL,
agg_interval INTERVAL NOT NULL,
-
metadata JSONB NOT NULL,
statistics JSONB NOT NULL,
-
plan JSONB NOT NULL,
PRIMARY KEY (aggregated_ts, fingerprint_id, plan_hash, app_name, node_id)
@@ -399,7 +396,6 @@ CREATE TABLE system.statement_statistics (
plan_hash,
app_name,
node_id,
- count,
agg_interval,
metadata,
statistics,
@@ -413,11 +409,9 @@ CREATE TABLE system.transaction_statistics (
aggregated_ts TIMESTAMPTZ NOT NULL,
fingerprint_id BYTES NOT NULL,
app_name STRING NOT NULL,
- node_id INT NOT NULL,
+ node_id INT8 NOT NULL,
- count INT NOT NULL,
agg_interval INTERVAL NOT NULL,
-
metadata JSONB NOT NULL,
statistics JSONB NOT NULL,
@@ -430,7 +424,6 @@ CREATE TABLE system.transaction_statistics (
fingerprint_id,
app_name,
node_id,
- count,
agg_interval,
metadata,
statistics
@@ -1928,21 +1921,20 @@ var (
{Name: "plan_hash", ID: 3, Type: types.Int, Nullable: false},
{Name: "app_name", ID: 4, Type: types.String, Nullable: false},
{Name: "node_id", ID: 5, Type: types.Int, Nullable: false},
- {Name: "count", ID: 6, Type: types.Int, Nullable: false},
- {Name: "agg_interval", ID: 7, Type: types.Interval, Nullable: false},
- {Name: "metadata", ID: 8, Type: types.Jsonb, Nullable: false},
- {Name: "statistics", ID: 9, Type: types.Jsonb, Nullable: false},
- {Name: "plan", ID: 10, Type: types.Jsonb, Nullable: false},
+ {Name: "agg_interval", ID: 6, Type: types.Interval, Nullable: false},
+ {Name: "metadata", ID: 7, Type: types.Jsonb, Nullable: false},
+ {Name: "statistics", ID: 8, Type: types.Jsonb, Nullable: false},
+ {Name: "plan", ID: 9, Type: types.Jsonb, Nullable: false},
{
Name: "crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_shard_8",
- ID: 11,
+ ID: 10,
Type: types.Int4,
Nullable: false,
ComputeExpr: &sqlStmtHashComputeExpr,
Hidden: true,
},
},
- NextColumnID: 12,
+ NextColumnID: 11,
Families: []descpb.ColumnFamilyDescriptor{
{
Name: "primary",
@@ -1950,9 +1942,9 @@ var (
ColumnNames: []string{
"crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_shard_8",
"aggregated_ts", "fingerprint_id", "plan_hash", "app_name", "node_id",
- "count", "agg_interval", "metadata", "statistics", "plan",
+ "agg_interval", "metadata", "statistics", "plan",
},
- ColumnIDs: []descpb.ColumnID{11, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+ ColumnIDs: []descpb.ColumnID{10, 1, 2, 3, 4, 5, 6, 7, 8, 9},
DefaultColumnID: 0,
},
},
@@ -1977,7 +1969,7 @@ var (
descpb.IndexDescriptor_ASC,
descpb.IndexDescriptor_ASC,
},
- KeyColumnIDs: []descpb.ColumnID{11, 1, 2, 3, 4, 5},
+ KeyColumnIDs: []descpb.ColumnID{10, 1, 2, 3, 4, 5},
Version: descpb.StrictIndexColumnIDGuaranteesVersion,
Sharded: descpb.ShardedDescriptor{
IsSharded: true,
@@ -1997,7 +1989,7 @@ var (
Expr: "crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_shard_8 IN (0:::INT8, 1:::INT8, 2:::INT8, 3:::INT8, 4:::INT8, 5:::INT8, 6:::INT8, 7:::INT8)",
Name: "check_crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_shard_8",
Validity: descpb.ConstraintValidity_Validated,
- ColumnIDs: []descpb.ColumnID{11},
+ ColumnIDs: []descpb.ColumnID{10},
IsNonNullConstraint: false,
Hidden: true,
},
@@ -2022,7 +2014,7 @@ var (
descpb.IndexDescriptor_ASC,
},
KeyColumnIDs: []descpb.ColumnID{2, 1, 3, 4, 5},
- KeySuffixColumnIDs: []descpb.ColumnID{11},
+ KeySuffixColumnIDs: []descpb.ColumnID{10},
Version: descpb.StrictIndexColumnIDGuaranteesVersion,
},
},
@@ -2046,20 +2038,19 @@ var (
{Name: "fingerprint_id", ID: 2, Type: types.Bytes, Nullable: false},
{Name: "app_name", ID: 3, Type: types.String, Nullable: false},
{Name: "node_id", ID: 4, Type: types.Int, Nullable: false},
- {Name: "count", ID: 5, Type: types.Int, Nullable: false},
- {Name: "agg_interval", ID: 6, Type: types.Interval, Nullable: false},
- {Name: "metadata", ID: 7, Type: types.Jsonb, Nullable: false},
- {Name: "statistics", ID: 8, Type: types.Jsonb, Nullable: false},
+ {Name: "agg_interval", ID: 5, Type: types.Interval, Nullable: false},
+ {Name: "metadata", ID: 6, Type: types.Jsonb, Nullable: false},
+ {Name: "statistics", ID: 7, Type: types.Jsonb, Nullable: false},
{
Name: "crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8",
- ID: 9,
+ ID: 8,
Type: types.Int4,
Nullable: false,
ComputeExpr: &sqlTxnHashComputeExpr,
Hidden: true,
},
},
- NextColumnID: 10,
+ NextColumnID: 9,
Families: []descpb.ColumnFamilyDescriptor{
{
Name: "primary",
@@ -2067,10 +2058,9 @@ var (
ColumnNames: []string{
"crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8",
"aggregated_ts", "fingerprint_id", "app_name", "node_id",
- "count", "agg_interval", "metadata", "statistics",
+ "agg_interval", "metadata", "statistics",
},
- ColumnIDs: []descpb.ColumnID{9, 1, 2, 3, 4, 5, 6, 7, 8},
-
+ ColumnIDs: []descpb.ColumnID{8, 1, 2, 3, 4, 5, 6, 7},
DefaultColumnID: 0,
},
},
@@ -2093,7 +2083,7 @@ var (
descpb.IndexDescriptor_ASC,
descpb.IndexDescriptor_ASC,
},
- KeyColumnIDs: []descpb.ColumnID{9, 1, 2, 3, 4},
+ KeyColumnIDs: []descpb.ColumnID{8, 1, 2, 3, 4},
Version: descpb.StrictIndexColumnIDGuaranteesVersion,
Sharded: descpb.ShardedDescriptor{
IsSharded: true,
@@ -2112,7 +2102,7 @@ var (
Expr: "crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 IN (0:::INT8, 1:::INT8, 2:::INT8, 3:::INT8, 4:::INT8, 5:::INT8, 6:::INT8, 7:::INT8)",
Name: "check_crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8",
Validity: descpb.ConstraintValidity_Validated,
- ColumnIDs: []descpb.ColumnID{9},
+ ColumnIDs: []descpb.ColumnID{8},
IsNonNullConstraint: false,
Hidden: true,
},
@@ -2135,7 +2125,7 @@ var (
descpb.IndexDescriptor_ASC,
},
KeyColumnIDs: []descpb.ColumnID{2, 1, 3, 4},
- KeySuffixColumnIDs: []descpb.ColumnID{9},
+ KeySuffixColumnIDs: []descpb.ColumnID{8},
Version: descpb.StrictIndexColumnIDGuaranteesVersion,
},
},
diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go
index 5c2583767e05..7518c8036d4e 100644
--- a/pkg/sql/conn_executor.go
+++ b/pkg/sql/conn_executor.go
@@ -49,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/sql/types"
@@ -321,7 +322,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
nil, /* reportedProvider */
)
reportedSQLStatsController := reportedSQLStats.GetController(cfg.SQLStatusServer)
- sqlStats := sslocal.New(
+ memSQLStats := sslocal.New(
cfg.Settings,
sqlstats.MaxMemSQLStatsStmtFingerprints,
sqlstats.MaxMemSQLStatsTxnFingerprints,
@@ -331,14 +332,11 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
sqlstats.SQLStatReset,
reportedSQLStats,
)
- sqlStatsController := sqlStats.GetController(cfg.SQLStatusServer)
s := &Server{
cfg: cfg,
Metrics: metrics,
InternalMetrics: makeMetrics(cfg, true /* internal */),
pool: pool,
- sqlStats: sqlStats,
- sqlStatsController: sqlStatsController,
reportedStats: reportedSQLStats,
reportedStatsController: reportedSQLStatsController,
reCache: tree.NewRegexpCache(512),
@@ -349,6 +347,20 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
}),
}
+ sqlStatsInternalExecutor := MakeInternalExecutor(context.Background(), s, MemoryMetrics{}, cfg.Settings)
+ persistedSQLStats := persistedsqlstats.New(&persistedsqlstats.Config{
+ Settings: s.cfg.Settings,
+ InternalExecutor: &sqlStatsInternalExecutor,
+ KvDB: cfg.DB,
+ SQLIDContainer: cfg.NodeID,
+ Knobs: cfg.SQLStatsTestingKnobs,
+ FlushCounter: metrics.StatsMetrics.SQLStatsFlushStarted,
+ FailureCounter: metrics.StatsMetrics.SQLStatsFlushFailure,
+ FlushDuration: metrics.StatsMetrics.SQLStatsFlushDuration,
+ }, memSQLStats)
+
+ s.sqlStats = persistedSQLStats
+ s.sqlStatsController = persistedSQLStats.GetController(cfg.SQLStatusServer)
return s
}
@@ -395,7 +407,12 @@ func makeMetrics(cfg *ExecutorConfig, internal bool) Metrics {
),
ReportedSQLStatsMemoryCurBytesCount: metric.NewGauge(
getMetricMeta(MetaReportedSQLStatsMemCurBytes, internal)),
- DiscardedStatsCount: metric.NewCounter(getMetricMeta(MetaDiscardedSQLStats, internal)),
+ DiscardedStatsCount: metric.NewCounter(getMetricMeta(MetaDiscardedSQLStats, internal)),
+ SQLStatsFlushStarted: metric.NewCounter(getMetricMeta(MetaSQLStatsFlushStarted, internal)),
+ SQLStatsFlushFailure: metric.NewCounter(getMetricMeta(MetaSQLStatsFlushFailure, internal)),
+ SQLStatsFlushDuration: metric.NewLatency(
+ getMetricMeta(MetaSQLStatsFlushDuration, internal), 6*metricsSampleInterval,
+ ),
},
}
}
@@ -418,6 +435,11 @@ func (s *Server) GetSQLStatsController() *sslocal.Controller {
return s.sqlStatsController
}
+// GetSQLStatsProvider returns the provider for the sqlstats subsystem.
+func (s *Server) GetSQLStatsProvider() sqlstats.Provider {
+ return s.sqlStats
+}
+
// GetReportedSQLStatsController returns the sqlstats.Controller for the current
// sql.Server's reported SQL Stats.
func (s *Server) GetReportedSQLStatsController() *sslocal.Controller {
@@ -442,7 +464,7 @@ func (s *Server) GetUnscrubbedStmtStats(
ctx context.Context,
) ([]roachpb.CollectedStatementStatistics, error) {
var stmtStats []roachpb.CollectedStatementStatistics
- stmtStatsVisitor := func(stat *roachpb.CollectedStatementStatistics) error {
+ stmtStatsVisitor := func(_ context.Context, stat *roachpb.CollectedStatementStatistics) error {
stmtStats = append(stmtStats, *stat)
return nil
}
@@ -462,7 +484,7 @@ func (s *Server) GetUnscrubbedTxnStats(
ctx context.Context,
) ([]roachpb.CollectedTransactionStatistics, error) {
var txnStats []roachpb.CollectedTransactionStatistics
- txnStatsVisitor := func(_ roachpb.TransactionFingerprintID, stat *roachpb.CollectedTransactionStatistics) error {
+ txnStatsVisitor := func(_ context.Context, _ roachpb.TransactionFingerprintID, stat *roachpb.CollectedTransactionStatistics) error {
txnStats = append(txnStats, *stat)
return nil
}
@@ -490,7 +512,7 @@ func (s *Server) getScrubbedStmtStats(
salt := ClusterSecret.Get(&s.cfg.Settings.SV)
var scrubbedStats []roachpb.CollectedStatementStatistics
- stmtStatsVisitor := func(stat *roachpb.CollectedStatementStatistics) error {
+ stmtStatsVisitor := func(_ context.Context, stat *roachpb.CollectedStatementStatistics) error {
// Scrub the statement itself.
scrubbedQueryStr, ok := scrubStmtStatKey(s.cfg.VirtualSchemas, stat.Key.Query)
@@ -2204,7 +2226,9 @@ func (ex *connExecutor) setTransactionModes(
return errors.AssertionFailedf("expected an evaluated AS OF timestamp")
}
if !asOfTs.IsEmpty() {
- ex.state.setHistoricalTimestamp(ex.Ctx(), asOfTs)
+ if err := ex.state.setHistoricalTimestamp(ex.Ctx(), asOfTs); err != nil {
+ return err
+ }
ex.state.sqlTimestamp = asOfTs.GoTime()
if rwMode == tree.UnspecifiedReadWriteMode {
rwMode = tree.ReadOnly
diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go
index 2a3a4eef6b3b..22676d158e6a 100644
--- a/pkg/sql/conn_executor_exec.go
+++ b/pkg/sql/conn_executor_exec.go
@@ -588,7 +588,9 @@ func (ex *connExecutor) execStmtInOpenState(
if asOf != nil {
p.extendedEvalCtx.AsOfSystemTime = asOf
p.extendedEvalCtx.SetTxnTimestamp(asOf.Timestamp.GoTime())
- ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp)
+ if err := ex.state.setHistoricalTimestamp(ctx, asOf.Timestamp); err != nil {
+ return makeErrEvent(err)
+ }
}
} else {
// If we're in an explicit txn, we allow AOST but only if it matches with
diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go
index b45b9b434d6b..bf20cedfd8fb 100644
--- a/pkg/sql/conn_executor_prepare.go
+++ b/pkg/sql/conn_executor_prepare.go
@@ -275,7 +275,9 @@ func (ex *connExecutor) populatePrepared(
"bounded staleness queries do not yet work with prepared statements",
)
}
- txn.SetFixedTimestamp(ctx, asOf.Timestamp)
+ if err := txn.SetFixedTimestamp(ctx, asOf.Timestamp); err != nil {
+ return 0, err
+ }
}
// PREPARE has a limited subset of statements it can be run with. Postgres
diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go
index 25cef80f6414..89866caeae9f 100644
--- a/pkg/sql/crdb_internal.go
+++ b/pkg/sql/crdb_internal.go
@@ -917,7 +917,7 @@ CREATE TABLE crdb_internal.node_statement_statistics (
nodeID, _ := p.execCfg.NodeID.OptionalNodeID() // zero if not available
- statementVisitor := func(stats *roachpb.CollectedStatementStatistics) error {
+ statementVisitor := func(_ context.Context, stats *roachpb.CollectedStatementStatistics) error {
anonymized := tree.DNull
anonStr, ok := scrubStmtStatKey(p.getVirtualTabler(), stats.Key.Query)
if ok {
@@ -1052,7 +1052,7 @@ CREATE TABLE crdb_internal.node_transaction_statistics (
nodeID, _ := p.execCfg.NodeID.OptionalNodeID() // zero if not available
- transactionVisitor := func(txnFingerprintID roachpb.TransactionFingerprintID, stats *roachpb.CollectedTransactionStatistics) error {
+ transactionVisitor := func(_ context.Context, txnFingerprintID roachpb.TransactionFingerprintID, stats *roachpb.CollectedTransactionStatistics) error {
stmtFingerprintIDsDatum := tree.NewDArray(types.String)
for _, stmtFingerprintID := range stats.StatementFingerprintIDs {
if err := stmtFingerprintIDsDatum.Append(tree.NewDString(strconv.FormatUint(uint64(stmtFingerprintID), 10))); err != nil {
diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go
index 3da365b71df2..560e0cf34a8b 100644
--- a/pkg/sql/create_stats.go
+++ b/pkg/sql/create_stats.go
@@ -543,7 +543,9 @@ func (r *createStatsResumer) Resume(ctx context.Context, execCtx interface{}) er
if details.AsOf != nil {
p.ExtendedEvalContext().AsOfSystemTime = &tree.AsOfSystemTime{Timestamp: *details.AsOf}
p.ExtendedEvalContext().SetTxnTimestamp(details.AsOf.GoTime())
- txn.SetFixedTimestamp(ctx, *details.AsOf)
+ if err := txn.SetFixedTimestamp(ctx, *details.AsOf); err != nil {
+ return err
+ }
}
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, txn, true /* distribute */)
diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go
index b37cec975686..c0da555bdedc 100644
--- a/pkg/sql/exec_util.go
+++ b/pkg/sql/exec_util.go
@@ -77,6 +77,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/sql/types"
@@ -855,6 +856,24 @@ var (
Measurement: "Discarded SQL Stats",
Unit: metric.Unit_COUNT,
}
+ MetaSQLStatsFlushStarted = metric.Metadata{
+ Name: "sql.stats.flush.count",
+ Help: "Number of times SQL Stats are flushed to persistent storage",
+ Measurement: "SQL Stats Flush",
+ Unit: metric.Unit_COUNT,
+ }
+ MetaSQLStatsFlushFailure = metric.Metadata{
+ Name: "sql.stats.flush.error",
+ Help: "Number of errors encountered when flushing SQL Stats",
+ Measurement: "SQL Stats Flush",
+ Unit: metric.Unit_COUNT,
+ }
+ MetaSQLStatsFlushDuration = metric.Metadata{
+ Name: "sql.stats.flush.duration",
+ Help: "Time took to in nanoseconds to complete SQL Stats flush",
+ Measurement: "SQL Stats Flush",
+ Unit: metric.Unit_NANOSECONDS,
+ }
)
func getMetricMeta(meta metric.Metadata, internal bool) metric.Metadata {
@@ -932,6 +951,7 @@ type ExecutorConfig struct {
TenantTestingKnobs *TenantTestingKnobs
BackupRestoreTestingKnobs *BackupRestoreTestingKnobs
IndexUsageStatsTestingKnobs *idxusage.TestingKnobs
+ SQLStatsTestingKnobs *persistedsqlstats.TestingKnobs
// HistogramWindowInterval is (server.Config).HistogramWindowInterval.
HistogramWindowInterval time.Duration
diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go
index a07a31511ce6..c14612b38df4 100644
--- a/pkg/sql/executor_statement_metrics.go
+++ b/pkg/sql/executor_statement_metrics.go
@@ -65,6 +65,10 @@ type StatsMetrics struct {
ReportedSQLStatsMemoryCurBytesCount *metric.Gauge
DiscardedStatsCount *metric.Counter
+
+ SQLStatsFlushStarted *metric.Counter
+ SQLStatsFlushFailure *metric.Counter
+ SQLStatsFlushDuration *metric.Histogram
}
// StatsMetrics is part of the metric.Struct interface.
diff --git a/pkg/sql/index_backfiller.go b/pkg/sql/index_backfiller.go
index 4658bc05e2c1..c11b537dc566 100644
--- a/pkg/sql/index_backfiller.go
+++ b/pkg/sql/index_backfiller.go
@@ -98,7 +98,9 @@ func (ib *IndexBackfillPlanner) scanTargetSpansToPushTimestampCache(
) error {
const pageSize = 10000
return ib.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
- txn.SetFixedTimestamp(ctx, backfillTimestamp)
+ if err := txn.SetFixedTimestamp(ctx, backfillTimestamp); err != nil {
+ return err
+ }
for _, span := range targetSpans {
// TODO(dt): a Count() request would be nice here if the target isn't
// empty, since we don't need to drag all the results back just to
diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema
index 4becda098bb7..db8d2c36cc89 100644
--- a/pkg/sql/logictest/testdata/logic_test/information_schema
+++ b/pkg/sql/logictest/testdata/logic_test/information_schema
@@ -1536,7 +1536,6 @@ system public 630200280_35_3_not_null
system public 630200280_35_5_not_null system public statement_diagnostics_requests CHECK NO NO
system public primary system public statement_diagnostics_requests PRIMARY KEY NO NO
system public 630200280_42_10_not_null system public statement_statistics CHECK NO NO
-system public 630200280_42_11_not_null system public statement_statistics CHECK NO NO
system public 630200280_42_1_not_null system public statement_statistics CHECK NO NO
system public 630200280_42_2_not_null system public statement_statistics CHECK NO NO
system public 630200280_42_3_not_null system public statement_statistics CHECK NO NO
@@ -1571,7 +1570,6 @@ system public 630200280_43_5_not_null
system public 630200280_43_6_not_null system public transaction_statistics CHECK NO NO
system public 630200280_43_7_not_null system public transaction_statistics CHECK NO NO
system public 630200280_43_8_not_null system public transaction_statistics CHECK NO NO
-system public 630200280_43_9_not_null system public transaction_statistics CHECK NO NO
system public check_crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 system public transaction_statistics CHECK NO NO
system public primary system public transaction_statistics PRIMARY KEY NO NO
system public 630200280_14_1_not_null system public ui CHECK NO NO
@@ -1714,24 +1712,22 @@ system public 630200280_40_5_not_null
system public 630200280_41_1_not_null id IS NOT NULL
system public 630200280_41_2_not_null secret IS NOT NULL
system public 630200280_41_3_not_null expiration IS NOT NULL
-system public 630200280_42_10_not_null plan IS NOT NULL
system public 630200280_42_1_not_null aggregated_ts IS NOT NULL
system public 630200280_42_2_not_null fingerprint_id IS NOT NULL
system public 630200280_42_3_not_null plan_hash IS NOT NULL
system public 630200280_42_4_not_null app_name IS NOT NULL
system public 630200280_42_5_not_null node_id IS NOT NULL
-system public 630200280_42_6_not_null count IS NOT NULL
-system public 630200280_42_7_not_null agg_interval IS NOT NULL
-system public 630200280_42_8_not_null metadata IS NOT NULL
-system public 630200280_42_9_not_null statistics IS NOT NULL
+system public 630200280_42_6_not_null agg_interval IS NOT NULL
+system public 630200280_42_7_not_null metadata IS NOT NULL
+system public 630200280_42_8_not_null statistics IS NOT NULL
+system public 630200280_42_9_not_null plan IS NOT NULL
system public 630200280_43_1_not_null aggregated_ts IS NOT NULL
system public 630200280_43_2_not_null fingerprint_id IS NOT NULL
system public 630200280_43_3_not_null app_name IS NOT NULL
system public 630200280_43_4_not_null node_id IS NOT NULL
-system public 630200280_43_5_not_null count IS NOT NULL
-system public 630200280_43_6_not_null agg_interval IS NOT NULL
-system public 630200280_43_7_not_null metadata IS NOT NULL
-system public 630200280_43_8_not_null statistics IS NOT NULL
+system public 630200280_43_5_not_null agg_interval IS NOT NULL
+system public 630200280_43_6_not_null metadata IS NOT NULL
+system public 630200280_43_7_not_null statistics IS NOT NULL
system public 630200280_44_1_not_null database_id IS NOT NULL
system public 630200280_44_2_not_null role_name IS NOT NULL
system public 630200280_44_3_not_null settings IS NOT NULL
@@ -2055,17 +2051,16 @@ system public statement_diagnostics_requests id
system public statement_diagnostics_requests requested_at 5
system public statement_diagnostics_requests statement_diagnostics_id 4
system public statement_diagnostics_requests statement_fingerprint 3
-system public statement_statistics agg_interval 7
+system public statement_statistics agg_interval 6
system public statement_statistics aggregated_ts 1
system public statement_statistics app_name 4
-system public statement_statistics count 6
-system public statement_statistics crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_shard_8 11
+system public statement_statistics crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_shard_8 10
system public statement_statistics fingerprint_id 2
-system public statement_statistics metadata 8
+system public statement_statistics metadata 7
system public statement_statistics node_id 5
-system public statement_statistics plan 10
+system public statement_statistics plan 9
system public statement_statistics plan_hash 3
-system public statement_statistics statistics 9
+system public statement_statistics statistics 8
system public table_statistics columnIDs 4
system public table_statistics createdAt 5
system public table_statistics distinctCount 7
@@ -2095,15 +2090,14 @@ system public tenant_usage total_write_reques
system public tenants active 2
system public tenants id 1
system public tenants info 3
-system public transaction_statistics agg_interval 6
+system public transaction_statistics agg_interval 5
system public transaction_statistics aggregated_ts 1
system public transaction_statistics app_name 3
-system public transaction_statistics count 5
-system public transaction_statistics crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 9
+system public transaction_statistics crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 8
system public transaction_statistics fingerprint_id 2
-system public transaction_statistics metadata 7
+system public transaction_statistics metadata 6
system public transaction_statistics node_id 4
-system public transaction_statistics statistics 8
+system public transaction_statistics statistics 7
system public ui key 1
system public ui lastUpdated 3
system public ui value 2
diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y
index 6328e7f773ad..07b3dbbad6d8 100644
--- a/pkg/sql/parser/sql.y
+++ b/pkg/sql/parser/sql.y
@@ -6176,7 +6176,7 @@ alter_schema_stmt:
// FAMILY , CREATE [IF NOT EXISTS] FAMILY []
// REFERENCES [( )] [ON DELETE {NO ACTION | RESTRICT}] [ON UPDATE {NO ACTION | RESTRICT}]
// COLLATE
-// AS ( ) STORED
+// AS ( ) { STORED | VIRTUAL }
//
// Interleave clause:
// INTERLEAVE IN PARENT ( ) [CASCADE | RESTRICT]
diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go
index 82c3b17f460a..bd4b89b86c33 100644
--- a/pkg/sql/row/fetcher.go
+++ b/pkg/sql/row/fetcher.go
@@ -629,7 +629,9 @@ func (rf *Fetcher) StartInconsistentScan(
)
}
txn := kv.NewTxnWithSteppingEnabled(ctx, db, 0 /* gatewayNodeID */)
- txn.SetFixedTimestamp(ctx, txnTimestamp)
+ if err := txn.SetFixedTimestamp(ctx, txnTimestamp); err != nil {
+ return err
+ }
if log.V(1) {
log.Infof(ctx, "starting inconsistent scan at timestamp %v", txnTimestamp)
}
@@ -644,7 +646,9 @@ func (rf *Fetcher) StartInconsistentScan(
txnTimestamp = txnTimestamp.Add(now.Sub(txnStartTime).Nanoseconds(), 0 /* logical */)
txnStartTime = now
txn = kv.NewTxnWithSteppingEnabled(ctx, db, 0 /* gatewayNodeID */)
- txn.SetFixedTimestamp(ctx, txnTimestamp)
+ if err := txn.SetFixedTimestamp(ctx, txnTimestamp); err != nil {
+ return nil, err
+ }
if log.V(1) {
log.Infof(ctx, "bumped inconsistent scan timestamp to %v", txnTimestamp)
diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go
index 4f1a76c45e0a..1a33e4961e11 100644
--- a/pkg/sql/row/row_converter.go
+++ b/pkg/sql/row/row_converter.go
@@ -268,7 +268,9 @@ func (c *DatumRowConverter) getSequenceAnnotation(
err := evalCtx.DB.Txn(evalCtx.Context, func(ctx context.Context, txn *kv.Txn) error {
seqNameToMetadata = make(map[string]*SequenceMetadata)
seqIDToMetadata = make(map[descpb.ID]*SequenceMetadata)
- txn.SetFixedTimestamp(ctx, hlc.Timestamp{WallTime: evalCtx.TxnTimestamp.UnixNano()})
+ if err := txn.SetFixedTimestamp(ctx, hlc.Timestamp{WallTime: evalCtx.TxnTimestamp.UnixNano()}); err != nil {
+ return err
+ }
for seqID := range sequenceIDs {
seqDesc, err := catalogkv.MustGetTableDescByID(ctx, txn, evalCtx.Codec, seqID)
if err != nil {
diff --git a/pkg/sql/rowexec/indexbackfiller.go b/pkg/sql/rowexec/indexbackfiller.go
index b22ef934671f..902ea184d948 100644
--- a/pkg/sql/rowexec/indexbackfiller.go
+++ b/pkg/sql/rowexec/indexbackfiller.go
@@ -432,7 +432,9 @@ func (ib *indexBackfiller) buildIndexEntryBatch(
start := timeutil.Now()
var entries []rowenc.IndexEntry
if err := ib.flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
- txn.SetFixedTimestamp(ctx, readAsOf)
+ if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil {
+ return err
+ }
// TODO(knz): do KV tracing in DistSQL processors.
var err error
diff --git a/pkg/sql/schema_change_plan_node.go b/pkg/sql/schema_change_plan_node.go
index 1a4f029fa265..a8dad8a622a1 100644
--- a/pkg/sql/schema_change_plan_node.go
+++ b/pkg/sql/schema_change_plan_node.go
@@ -84,7 +84,9 @@ func (p *planner) WaitForDescriptorSchemaChanges(
if err := p.ExecCfg().CollectionFactory.Txn(
ctx, p.ExecCfg().InternalExecutor, p.ExecCfg().DB,
func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
- txn.SetFixedTimestamp(ctx, now)
+ if err := txn.SetFixedTimestamp(ctx, now); err != nil {
+ return err
+ }
table, err := descriptors.GetImmutableTableByID(ctx, txn, descID,
tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go
index bbb34fef730a..95c1fccdc41d 100644
--- a/pkg/sql/schema_changer.go
+++ b/pkg/sql/schema_changer.go
@@ -247,7 +247,9 @@ func (sc *SchemaChanger) backfillQueryIntoTable(
}
return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
- txn.SetFixedTimestamp(ctx, ts)
+ if err := txn.SetFixedTimestamp(ctx, ts); err != nil {
+ return err
+ }
// Create an internal planner as the planner used to serve the user query
// would have committed by this point.
diff --git a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
new file mode 100644
index 000000000000..c8a28a0a2374
--- /dev/null
+++ b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
@@ -0,0 +1,59 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "persistedsqlstats",
+ srcs = [
+ "cluster_settings.go",
+ "flush.go",
+ "provider.go",
+ "test_utils.go",
+ ],
+ importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/base",
+ "//pkg/kv",
+ "//pkg/roachpb:with-mocks",
+ "//pkg/security",
+ "//pkg/settings",
+ "//pkg/settings/cluster",
+ "//pkg/sql/sem/tree",
+ "//pkg/sql/sessiondata",
+ "//pkg/sql/sqlstats",
+ "//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil",
+ "//pkg/sql/sqlstats/sslocal",
+ "//pkg/sql/sqlutil",
+ "//pkg/util/log",
+ "//pkg/util/metric",
+ "//pkg/util/stop",
+ "//pkg/util/timeutil",
+ "@com_github_cockroachdb_errors//:errors",
+ ],
+)
+
+go_test(
+ name = "persistedsqlstats_test",
+ srcs = [
+ "flush_test.go",
+ "main_test.go",
+ ],
+ deps = [
+ ":persistedsqlstats",
+ "//pkg/base",
+ "//pkg/roachpb:with-mocks",
+ "//pkg/security",
+ "//pkg/security/securitytest",
+ "//pkg/server",
+ "//pkg/sql",
+ "//pkg/sql/sqlstats",
+ "//pkg/testutils/serverutils",
+ "//pkg/testutils/sqlutils",
+ "//pkg/testutils/testcluster",
+ "//pkg/util/leaktest",
+ "//pkg/util/log",
+ "//pkg/util/stop",
+ "//pkg/util/syncutil",
+ "//pkg/util/timeutil",
+ "@com_github_stretchr_testify//require",
+ ],
+)
diff --git a/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go
new file mode 100644
index 000000000000..2e65b24291a8
--- /dev/null
+++ b/pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go
@@ -0,0 +1,26 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package persistedsqlstats
+
+import (
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/settings"
+)
+
+// SQLStatsFlushInterval is the cluster setting that controls how often the SQL
+// stats are flushed to system table.
+var SQLStatsFlushInterval = settings.RegisterDurationSetting(
+ "sql.stats.flush.interval",
+ "the interval at which SQL execution statistics are flushed to disk",
+ time.Hour,
+ settings.NonNegativeDurationWithMaximum(time.Hour*24),
+).WithPublic()
diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush.go b/pkg/sql/sqlstats/persistedsqlstats/flush.go
new file mode 100644
index 000000000000..7d599b2404ef
--- /dev/null
+++ b/pkg/sql/sqlstats/persistedsqlstats/flush.go
@@ -0,0 +1,528 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package persistedsqlstats
+
+import (
+ "context"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/kv"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/security"
+ "github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
+ "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/cockroachdb/errors"
+)
+
+// Flush flushes in-memory sql stats into system table. Any errors encountered
+// during the flush will be logged as warning.
+func (s *PersistedSQLStats) Flush(ctx context.Context) {
+ log.Infof(ctx, "flushing %d stmt/txn fingerprints (%d bytes) after %s",
+ s.SQLStats.GetTotalFingerprintCount(), s.SQLStats.GetTotalFingerprintBytes(), timeutil.Since(s.lastFlushStarted))
+
+ // The flush routine directly logs errors if they are encountered. Therefore,
+ // no error is returned here.
+ _ = s.IterateStatementStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
+ s.doFlush(ctx, func() error {
+ return s.doFlushSingleStmtStats(ctx, statistics)
+ }, "failed to flush statement statistics" /* errMsg */)
+
+ return nil
+ })
+ _ = s.IterateTransactionStats(ctx, &sqlstats.IteratorOptions{}, func(ctx context.Context, key roachpb.TransactionFingerprintID, statistics *roachpb.CollectedTransactionStatistics) error {
+ s.doFlush(ctx, func() error {
+ return s.doFlushSingleTxnStats(ctx, key, statistics)
+ }, "failed to flush transaction statistics" /* errMsg */)
+
+ return nil
+ })
+
+ if err := s.SQLStats.Reset(ctx); err != nil {
+ log.Warningf(ctx, "fail to reset in-memory SQL Stats: %s", err)
+ }
+
+ s.lastFlushStarted = s.getTimeNow()
+}
+
+func (s *PersistedSQLStats) doFlush(ctx context.Context, workFn func() error, errMsg string) {
+ var err error
+ flushBegin := s.getTimeNow()
+
+ defer func() {
+ if err != nil {
+ s.cfg.FailureCounter.Inc(1)
+ log.Warningf(ctx, "%s: %s", errMsg, err)
+ }
+ flushDuration := s.getTimeNow().Sub(flushBegin)
+ s.cfg.FlushDuration.RecordValue(flushDuration.Nanoseconds())
+ s.cfg.FlushCounter.Inc(1)
+ }()
+
+ err = workFn()
+}
+
+func (s *PersistedSQLStats) doFlushSingleTxnStats(
+ ctx context.Context,
+ key roachpb.TransactionFingerprintID,
+ stats *roachpb.CollectedTransactionStatistics,
+) error {
+ return s.cfg.KvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
+ // Explicitly copy the stats variable so the txn closure is retryable.
+ scopedStats := *stats
+
+ aggregatedTs := s.computeAggregatedTs()
+ serializedFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(key))
+
+ insertFn := func(ctx context.Context, txn *kv.Txn) (alreadyExists bool, err error) {
+ rowsAffected, err := s.insertTransactionStats(ctx, txn, aggregatedTs, serializedFingerprintID, &scopedStats)
+
+ if err != nil {
+ return false /* alreadyExists */, err
+ }
+
+ if rowsAffected == 0 {
+ return true /* alreadyExists */, nil /* err */
+ }
+
+ return false /* alreadyExists */, nil /* err */
+ }
+
+ readFn := func(ctx context.Context, txn *kv.Txn) error {
+ persistedData := roachpb.TransactionStatistics{}
+ err := s.fetchPersistedTransactionStats(ctx, txn, aggregatedTs, serializedFingerprintID, scopedStats.App, &persistedData)
+ if err != nil {
+ return err
+ }
+
+ scopedStats.Stats.Add(&persistedData)
+ return nil
+ }
+
+ updateFn := func(ctx context.Context, txn *kv.Txn) error {
+ return s.updateTransactionStats(ctx, txn, aggregatedTs, serializedFingerprintID, &scopedStats)
+ }
+
+ err := s.doInsertElseDoUpdate(ctx, txn, insertFn, readFn, updateFn)
+ if err != nil {
+ return errors.Wrapf(err, "flushing transaction %d's statistics", key)
+ }
+ return nil
+ })
+}
+
+func (s *PersistedSQLStats) doFlushSingleStmtStats(
+ ctx context.Context, stats *roachpb.CollectedStatementStatistics,
+) error {
+ return s.cfg.KvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
+ // Explicitly copy the stats so that this closure is retryable.
+ scopedStats := *stats
+
+ aggregatedTs := s.computeAggregatedTs()
+ serializedFingerprintID := sqlstatsutil.EncodeUint64ToBytes(uint64(scopedStats.ID))
+
+ insertFn := func(ctx context.Context, txn *kv.Txn) (alreadyExists bool, err error) {
+ rowsAffected, err := s.insertStatementStats(ctx, txn, aggregatedTs, serializedFingerprintID, &scopedStats)
+
+ if err != nil {
+ return false /* alreadyExists */, err
+ }
+
+ if rowsAffected == 0 {
+ return true /* alreadyExists */, nil /* err */
+ }
+
+ return false /* alreadyExists */, nil /* err */
+ }
+
+ readFn := func(ctx context.Context, txn *kv.Txn) error {
+ persistedData := roachpb.StatementStatistics{}
+ err := s.fetchPersistedStatementStats(ctx, txn, aggregatedTs, serializedFingerprintID, &scopedStats.Key, &persistedData)
+ if err != nil {
+ return err
+ }
+
+ scopedStats.Stats.Add(&persistedData)
+ return nil
+ }
+
+ updateFn := func(ctx context.Context, txn *kv.Txn) error {
+ return s.updateStatementStats(ctx, txn, aggregatedTs, serializedFingerprintID, &scopedStats)
+ }
+
+ err := s.doInsertElseDoUpdate(ctx, txn, insertFn, readFn, updateFn)
+ if err != nil {
+ return errors.Wrapf(err, "flush statement %d's statistics", scopedStats.ID)
+ }
+ return nil
+ })
+}
+
+func (s *PersistedSQLStats) doInsertElseDoUpdate(
+ ctx context.Context,
+ txn *kv.Txn,
+ insertFn func(context.Context, *kv.Txn) (alreadyExists bool, err error),
+ readFn func(context.Context, *kv.Txn) error,
+ updateFn func(context.Context, *kv.Txn) error,
+) error {
+ alreadyExists, err := insertFn(ctx, txn)
+ if err != nil {
+ return err
+ }
+
+ if alreadyExists {
+ err = readFn(ctx, txn)
+ if err != nil {
+ return err
+ }
+
+ err = updateFn(ctx, txn)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (s *PersistedSQLStats) computeAggregatedTs() time.Time {
+ interval := SQLStatsFlushInterval.Get(&s.cfg.Settings.SV)
+ now := s.getTimeNow()
+
+ aggTs := now.Truncate(interval)
+
+ return aggTs
+}
+
+func (s *PersistedSQLStats) getTimeNow() time.Time {
+ if s.cfg.Knobs != nil && s.cfg.Knobs.StubTimeNow != nil {
+ return s.cfg.Knobs.StubTimeNow()
+ }
+
+ return timeutil.Now()
+}
+
+func (s *PersistedSQLStats) insertTransactionStats(
+ ctx context.Context,
+ txn *kv.Txn,
+ aggregatedTs time.Time,
+ serializedFingerprintID []byte,
+ stats *roachpb.CollectedTransactionStatistics,
+) (rowsAffected int, err error) {
+ insertStmt := `
+INSERT INTO system.transaction_statistics
+VALUES ($1, $2, $3, $4, $5, $6, $7)
+ON CONFLICT (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8, aggregated_ts, fingerprint_id, app_name, node_id)
+DO NOTHING
+`
+
+ aggInterval := SQLStatsFlushInterval.Get(&s.cfg.Settings.SV)
+
+ // Prepare data for insertion.
+ metadataJSON, err := sqlstatsutil.BuildTxnMetadataJSON(stats)
+ if err != nil {
+ return 0 /* rowsAffected */, err
+ }
+ metadata := tree.NewDJSON(metadataJSON)
+
+ statisticsJSON, err := sqlstatsutil.BuildTxnStatisticsJSON(stats)
+ if err != nil {
+ return 0 /* rowsAffected */, err
+ }
+ statistics := tree.NewDJSON(statisticsJSON)
+
+ rowsAffected, err = s.cfg.InternalExecutor.ExecEx(
+ ctx,
+ "insert-txn-stats",
+ txn, /* txn */
+ sessiondata.InternalExecutorOverride{
+ User: security.NodeUserName(),
+ },
+ insertStmt,
+ aggregatedTs, // aggregated_ts
+ serializedFingerprintID, // fingerprint_id
+ stats.App, // app_name
+ s.cfg.SQLIDContainer.SQLInstanceID(), // node_id
+ aggInterval, // agg_interval
+ metadata, // metadata
+ statistics, // statistics
+ )
+
+ return rowsAffected, err
+}
+func (s *PersistedSQLStats) updateTransactionStats(
+ ctx context.Context,
+ txn *kv.Txn,
+ aggregatedTs time.Time,
+ serializedFingerprintID []byte,
+ stats *roachpb.CollectedTransactionStatistics,
+) error {
+ updateStmt := `
+UPDATE system.transaction_statistics
+SET statistics = $1
+WHERE fingerprint_id = $2
+ AND aggregated_ts = $3
+ AND app_name = $4
+ AND node_id = $5
+`
+
+ statisticsJSON, err := sqlstatsutil.BuildTxnStatisticsJSON(stats)
+ if err != nil {
+ return err
+ }
+ statistics := tree.NewDJSON(statisticsJSON)
+
+ rowsAffected, err := s.cfg.InternalExecutor.ExecEx(
+ ctx,
+ "update-stmt-stats",
+ txn, /* txn */
+ sessiondata.InternalExecutorOverride{
+ User: security.NodeUserName(),
+ },
+ updateStmt,
+ statistics, // statistics
+ serializedFingerprintID, // fingerprint_id
+ aggregatedTs, // aggregated_ts
+ stats.App, // app_name
+ s.cfg.SQLIDContainer.SQLInstanceID(), // node_id
+ )
+
+ if err != nil {
+ return err
+ }
+
+ if rowsAffected == 0 {
+ return errors.AssertionFailedf("failed to update transaction statistics for fingerprint_id: %s, app: %s, aggregated_ts: %s, node_id: %d",
+ serializedFingerprintID, stats.App, aggregatedTs,
+ s.cfg.SQLIDContainer.SQLInstanceID())
+ }
+
+ return nil
+}
+
+func (s *PersistedSQLStats) updateStatementStats(
+ ctx context.Context,
+ txn *kv.Txn,
+ aggregatedTs time.Time,
+ serializedFingerprintID []byte,
+ stats *roachpb.CollectedStatementStatistics,
+) error {
+ updateStmt := `
+UPDATE system.statement_statistics
+SET statistics = $1
+WHERE fingerprint_id = $2
+ AND aggregated_ts = $3
+ AND app_name = $4
+ AND plan_hash = $5
+ AND node_id = $6
+`
+
+ statisticsJSON, err := sqlstatsutil.BuildStmtStatisticsJSON(&stats.Stats)
+ if err != nil {
+ return err
+ }
+ statistics := tree.NewDJSON(statisticsJSON)
+
+ rowsAffected, err := s.cfg.InternalExecutor.ExecEx(
+ ctx,
+ "update-stmt-stats",
+ txn, /* txn */
+ sessiondata.InternalExecutorOverride{
+ User: security.NodeUserName(),
+ },
+ updateStmt,
+ statistics, // statistics
+ serializedFingerprintID, // fingerprint_id
+ aggregatedTs, // aggregated_ts
+ stats.Key.App, // app_name
+ dummyPlanHash, // plan_id
+ s.cfg.SQLIDContainer.SQLInstanceID(), // node_id
+ )
+
+ if err != nil {
+ return err
+ }
+
+ if rowsAffected == 0 {
+ return errors.AssertionFailedf("failed to update statement statistics fo fingerprint_id: %s, app: %s, aggregated_ts: %s, plan_hash: %d, node_id: %d",
+ serializedFingerprintID, stats.Key.App, aggregatedTs, dummyPlanHash,
+ s.cfg.SQLIDContainer.SQLInstanceID())
+ }
+
+ return nil
+}
+
+func (s *PersistedSQLStats) insertStatementStats(
+ ctx context.Context,
+ txn *kv.Txn,
+ aggregatedTs time.Time,
+ serializedFingerprintID []byte,
+ stats *roachpb.CollectedStatementStatistics,
+) (rowsAffected int, err error) {
+ insertStmt := `
+INSERT INTO system.statement_statistics
+VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
+ON CONFLICT (crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_shard_8, aggregated_ts, fingerprint_id, app_name, plan_hash, node_id)
+DO NOTHING
+`
+ aggInterval := SQLStatsFlushInterval.Get(&s.cfg.Settings.SV)
+
+ // Prepare data for insertion.
+ metadataJSON, err := sqlstatsutil.BuildStmtMetadataJSON(stats)
+ if err != nil {
+ return 0 /* rowsAffected */, err
+ }
+ metadata := tree.NewDJSON(metadataJSON)
+
+ statisticsJSON, err := sqlstatsutil.BuildStmtStatisticsJSON(&stats.Stats)
+ if err != nil {
+ return 0 /* rowsAffected */, err
+ }
+ statistics := tree.NewDJSON(statisticsJSON)
+
+ plan := tree.NewDJSON(sqlstatsutil.ExplainTreePlanNodeToJSON(&stats.Stats.SensitiveInfo.MostRecentPlanDescription))
+
+ rowsAffected, err = s.cfg.InternalExecutor.ExecEx(
+ ctx,
+ "insert-stmt-stats",
+ txn, /* txn */
+ sessiondata.InternalExecutorOverride{
+ User: security.NodeUserName(),
+ },
+ insertStmt,
+ aggregatedTs, // aggregated_ts
+ serializedFingerprintID, // fingerprint_id
+ dummyPlanHash, // plan_hash
+ stats.Key.App, // app_name
+ s.cfg.SQLIDContainer.SQLInstanceID(), // node_id
+ aggInterval, // agg_internal
+ metadata, // metadata
+ statistics, // statistics
+ plan, // plan
+ )
+
+ return rowsAffected, err
+}
+
+func (s *PersistedSQLStats) fetchPersistedTransactionStats(
+ ctx context.Context,
+ txn *kv.Txn,
+ aggregatedTs time.Time,
+ serializedFingerprintID []byte,
+ appName string,
+ result *roachpb.TransactionStatistics,
+) error {
+ // We use `SELECT ... FOR UPDATE` statement because we are going to perform
+ // and `UPDATE` on the stats for the given fingerprint later.
+ readStmt := `
+SELECT
+ statistics
+FROM
+ system.transaction_statistics
+WHERE fingerprint_id = $1
+ AND app_name = $2
+ AND aggregated_ts = $3
+ AND node_id = $4
+FOR UPDATE
+`
+
+ row, err := s.cfg.InternalExecutor.QueryRowEx(
+ ctx,
+ "fetch-txn-stats",
+ txn, /* txn */
+ sessiondata.InternalExecutorOverride{
+ User: security.NodeUserName(),
+ },
+ readStmt, // stmt
+ serializedFingerprintID, // fingerprint_id
+ appName, // app_name
+ aggregatedTs, // aggregated_ts
+ s.cfg.SQLIDContainer.SQLInstanceID(), // node_id
+ )
+
+ if err != nil {
+ return err
+ }
+
+ if row == nil {
+ return errors.AssertionFailedf("transaction statistics not found for fingerprint_id: %s, app: %s, aggregated_ts: %s, node_id: %d",
+ serializedFingerprintID, appName, aggregatedTs,
+ s.cfg.SQLIDContainer.SQLInstanceID())
+ }
+
+ if len(row) != 1 {
+ return errors.AssertionFailedf("unexpectedly found %d returning columns for fingerprint_id: %s, app: %s, aggregated_ts: %s, node_id: %d",
+ len(row), serializedFingerprintID, appName, aggregatedTs,
+ s.cfg.SQLIDContainer.SQLInstanceID())
+ }
+
+ statistics := tree.MustBeDJSON(row[0])
+ return sqlstatsutil.DecodeTxnStatsStatisticsJSON(statistics.JSON, result)
+}
+
+func (s *PersistedSQLStats) fetchPersistedStatementStats(
+ ctx context.Context,
+ txn *kv.Txn,
+ aggregatedTs time.Time,
+ serializedFingerprintID []byte,
+ key *roachpb.StatementStatisticsKey,
+ result *roachpb.StatementStatistics,
+) error {
+ readStmt := `
+SELECT
+ statistics
+FROM
+ system.statement_statistics
+WHERE fingerprint_id = $1
+ AND app_name = $2
+ AND aggregated_ts = $3
+ AND plan_hash = $4
+ AND node_id = $5
+FOR UPDATE
+`
+ row, err := s.cfg.InternalExecutor.QueryRowEx(
+ ctx,
+ "fetch-stmt-stats",
+ txn, /* txn */
+ sessiondata.InternalExecutorOverride{
+ User: security.NodeUserName(),
+ },
+ readStmt, // stmt
+ serializedFingerprintID, // fingerprint_id
+ key.App, // app_name
+ aggregatedTs, // aggregated_ts
+ dummyPlanHash, // plan_hash
+ s.cfg.SQLIDContainer.SQLInstanceID(), // node_id
+ )
+
+ if err != nil {
+ return err
+ }
+
+ if row == nil {
+ return errors.AssertionFailedf(
+ "statement statistics not found fingerprint_id: %s, app: %s, aggregated_ts: %s, plan_hash: %d, node_id: %d",
+ serializedFingerprintID, key.App, aggregatedTs, dummyPlanHash, s.cfg.SQLIDContainer.SQLInstanceID())
+ }
+
+ if len(row) != 1 {
+ return errors.AssertionFailedf("unexpectedly found %d returning columns for fingerprint_id: %s, app: %s, aggregated_ts: %s, plan_hash %d, node_id: %d",
+ len(row), serializedFingerprintID, key.App, aggregatedTs, dummyPlanHash,
+ s.cfg.SQLIDContainer.SQLInstanceID())
+ }
+
+ statistics := tree.MustBeDJSON(row[0])
+
+ return sqlstatsutil.DecodeStmtStatsStatisticsJSON(statistics.JSON, result)
+}
diff --git a/pkg/sql/sqlstats/persistedsqlstats/flush_test.go b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go
new file mode 100644
index 000000000000..1e1a1de33737
--- /dev/null
+++ b/pkg/sql/sqlstats/persistedsqlstats/flush_test.go
@@ -0,0 +1,363 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package persistedsqlstats_test
+
+import (
+ "context"
+ gosql "database/sql"
+ "fmt"
+ "net/url"
+ "testing"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/base"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/security"
+ "github.com/cockroachdb/cockroach/pkg/sql"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/stop"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/stretchr/testify/require"
+)
+
+type testCase struct {
+ query string
+ fingerprint string
+ count int64
+}
+
+func TestSQLStatsFlush(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ testCases := []testCase{
+ {
+ query: "SELECT 1",
+ fingerprint: "SELECT _",
+ count: 3,
+ },
+ {
+ query: "SELECT 1, 2, 3",
+ fingerprint: "SELECT _, _, _",
+ count: 10,
+ },
+ {
+ query: "SELECT 1, 1 WHERE 1 < 10",
+ fingerprint: "SELECT _, _ WHERE _ < _",
+ count: 7,
+ },
+ }
+
+ fakeTime := stubTime{
+ aggInterval: time.Hour,
+ }
+ fakeTime.setTime(timeutil.Now())
+
+ testCluster := serverutils.StartNewTestCluster(t, 3 /* numNodes */, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{
+ Knobs: base.TestingKnobs{
+ SQLStatsKnobs: &persistedsqlstats.TestingKnobs{
+ StubTimeNow: fakeTime.StubTimeNow,
+ },
+ },
+ },
+ })
+
+ ctx := context.Background()
+ defer testCluster.Stopper().Stop(ctx)
+
+ stopper := stop.NewStopper()
+ defer stopper.Stop(ctx)
+
+ firstServer := testCluster.Server(0 /* idx */)
+ secondServer := testCluster.Server(1 /* idx */)
+
+ firstPgURL, firstServerConnCleanup := sqlutils.PGUrl(
+ t, firstServer.ServingSQLAddr(), "CreateConnections" /* prefix */, url.User(security.RootUser))
+ defer firstServerConnCleanup()
+
+ secondPgURL, secondServerConnCleanup := sqlutils.PGUrl(
+ t, secondServer.ServingSQLAddr(), "CreateConnections" /* prefix */, url.User(security.RootUser))
+ defer secondServerConnCleanup()
+
+ pgFirstSQLConn, err := gosql.Open("postgres", firstPgURL.String())
+ require.NoError(t, err)
+ firstSQLConn := sqlutils.MakeSQLRunner(pgFirstSQLConn)
+
+ pgSecondSQLConn, err := gosql.Open("postgres", secondPgURL.String())
+ require.NoError(t, err)
+ secondSQLConn := sqlutils.MakeSQLRunner(pgSecondSQLConn)
+
+ firstServerSQLStats := firstServer.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats)
+ secondServerSQLStats := secondServer.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats)
+
+ defer func() {
+ err := pgFirstSQLConn.Close()
+ require.NoError(t, err)
+ err = pgSecondSQLConn.Close()
+ require.NoError(t, err)
+ }()
+ firstSQLConn.Exec(t, "SET application_name = 'flush_unit_test'")
+ secondSQLConn.Exec(t, "SET application_name = 'flush_unit_test'")
+ require.NoError(t, err)
+
+ // Regular inserts.
+ {
+ for _, tc := range testCases {
+ for i := int64(0); i < tc.count; i++ {
+ firstSQLConn.Exec(t, tc.query)
+ }
+ }
+
+ verifyInMemoryStatsCorrectness(t, testCases, firstServerSQLStats)
+ verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats)
+
+ firstServerSQLStats.Flush(ctx)
+ secondServerSQLStats.Flush(ctx)
+
+ verifyInMemoryStatsEmpty(t, testCases, firstServerSQLStats)
+ verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats)
+
+ // For each test case, we verify that it's being properly inserted exactly
+ // once and it is exactly executed tc.count number of times.
+ for _, tc := range testCases {
+ verifyNumOfInsertedEntries(t, secondSQLConn, tc.fingerprint, firstServer.NodeID(), 1 /* expectedStmtEntryCnt */, 1 /* expectedTxnEntryCtn */)
+ verifyInsertedFingerprintExecCount(t, secondSQLConn, tc.fingerprint, fakeTime.getAggTimeTs(), firstServer.NodeID(), tc.count)
+ }
+ }
+
+ // We insert the same data during the same aggregation window to ensure that
+ // no new entries will be created but the statistics is updated.
+ {
+ for i := range testCases {
+ // Increment the execution count.
+ testCases[i].count++
+ for execCnt := int64(0); execCnt < testCases[i].count; execCnt++ {
+ firstSQLConn.Exec(t, testCases[i].query)
+ }
+ }
+ verifyInMemoryStatsCorrectness(t, testCases, firstServerSQLStats)
+ verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats)
+
+ firstServerSQLStats.Flush(ctx)
+ secondServerSQLStats.Flush(ctx)
+
+ verifyInMemoryStatsEmpty(t, testCases, firstServerSQLStats)
+ verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats)
+
+ for _, tc := range testCases {
+ verifyNumOfInsertedEntries(t, secondSQLConn, tc.fingerprint, firstServer.NodeID(), 1 /* expectedStmtEntryCnt */, 1 /* expectedTxnEntryCtn */)
+ // The execution count is doubled here because we execute all of the
+ // statements here in the same aggregation interval.
+ verifyInsertedFingerprintExecCount(t, secondSQLConn, tc.fingerprint, fakeTime.getAggTimeTs(), firstServer.NodeID(), tc.count+tc.count-1 /* expectedCount */)
+ }
+ }
+
+ // We change the time to be in a different aggregation window.
+ {
+ fakeTime.setTime(fakeTime.StubTimeNow().Add(time.Hour * 3))
+
+ for _, tc := range testCases {
+ for i := int64(0); i < tc.count; i++ {
+ firstSQLConn.Exec(t, tc.query)
+ }
+ }
+ verifyInMemoryStatsCorrectness(t, testCases, firstServerSQLStats)
+ verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats)
+
+ firstServerSQLStats.Flush(ctx)
+ secondServerSQLStats.Flush(ctx)
+
+ verifyInMemoryStatsEmpty(t, testCases, firstServerSQLStats)
+ verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats)
+
+ for _, tc := range testCases {
+ // We expect exactly 2 entries since we are in a different aggregation window.
+ verifyNumOfInsertedEntries(t, secondSQLConn, tc.fingerprint, firstServer.NodeID(), 2 /* expectedStmtEntryCnt */, 2 /* expectedTxnEntryCtn */)
+ verifyInsertedFingerprintExecCount(t, secondSQLConn, tc.fingerprint, fakeTime.getAggTimeTs(), firstServer.NodeID(), tc.count)
+ }
+ }
+
+ // We run queries in a different server and trigger the flush.
+ {
+ for _, tc := range testCases {
+ for i := int64(0); i < tc.count; i++ {
+ secondSQLConn.Exec(t, tc.query)
+ require.NoError(t, err)
+ }
+ }
+ verifyInMemoryStatsEmpty(t, testCases, firstServerSQLStats)
+ verifyInMemoryStatsCorrectness(t, testCases, secondServerSQLStats)
+
+ firstServerSQLStats.Flush(ctx)
+ secondServerSQLStats.Flush(ctx)
+
+ verifyInMemoryStatsEmpty(t, testCases, firstServerSQLStats)
+ verifyInMemoryStatsEmpty(t, testCases, secondServerSQLStats)
+
+ // Ensure that we encode the correct node_id for the new entry and did not
+ // accidentally tamper the entries written by another server.
+ for _, tc := range testCases {
+ verifyNumOfInsertedEntries(t, firstSQLConn, tc.fingerprint, secondServer.NodeID(), 1 /* expectedStmtEntryCnt */, 1 /* expectedTxnEntryCtn */)
+ verifyInsertedFingerprintExecCount(t, firstSQLConn, tc.fingerprint, fakeTime.getAggTimeTs(), secondServer.NodeID(), tc.count)
+ verifyNumOfInsertedEntries(t, secondSQLConn, tc.fingerprint, firstServer.NodeID(), 2 /* expectedStmtEntryCnt */, 2 /* expectedTxnEntryCtn */)
+ verifyInsertedFingerprintExecCount(t, secondSQLConn, tc.fingerprint, fakeTime.getAggTimeTs(), firstServer.NodeID(), tc.count)
+ }
+ }
+}
+
+type stubTime struct {
+ syncutil.RWMutex
+ t time.Time
+ aggInterval time.Duration
+}
+
+func (s *stubTime) setTime(t time.Time) {
+ s.RWMutex.Lock()
+ defer s.RWMutex.Unlock()
+ s.t = t
+}
+
+func (s *stubTime) getAggTimeTs() time.Time {
+ s.RWMutex.Lock()
+ defer s.RWMutex.Unlock()
+ return s.t.Truncate(s.aggInterval)
+}
+
+// StubTimeNow implements the testing knob interface for persistedsqlstats.Provider.
+func (s *stubTime) StubTimeNow() time.Time {
+ s.RWMutex.RLock()
+ defer s.RWMutex.RUnlock()
+ return s.t
+}
+
+func verifyInsertedFingerprintExecCount(
+ t *testing.T,
+ sqlConn *sqlutils.SQLRunner,
+ fingerprint string,
+ ts time.Time,
+ nodeID roachpb.NodeID,
+ expectedCount int64,
+) {
+ row := sqlConn.Query(t,
+ `
+SELECT
+ (S.statistics -> 'statistics' ->> 'cnt')::INT AS stmtCount,
+ (T.statistics -> 'statistics' ->> 'cnt')::INT AS txnCount
+FROM
+ system.transaction_statistics T,
+ system.statement_statistics S
+WHERE S.metadata ->> 'query' = $1
+ AND T.aggregated_ts = $2
+ AND T.node_id = $3
+ AND T.app_name = 'flush_unit_test'
+ AND decode(T.metadata -> 'stmtFingerprintIDs' ->> 0, 'hex') = S.fingerprint_id
+ AND S.node_id = T.node_id
+ AND S.aggregated_ts = T.aggregated_ts
+ AND S.app_name = T.app_name
+`, fingerprint, ts, nodeID)
+
+ require.True(t, row.Next(), "no stats found for fingerprint: %s", fingerprint)
+
+ var actualTxnExecCnt int64
+ var actualStmtExecCnt int64
+ err := row.Scan(&actualStmtExecCnt, &actualTxnExecCnt)
+ require.NoError(t, err)
+ require.Equal(t, expectedCount, actualStmtExecCnt, "fingerprint: %s", fingerprint)
+ require.Equal(t, expectedCount, actualTxnExecCnt, "fingerprint: %s", fingerprint)
+ require.False(t, row.Next(), "more than one rows found for fingerprint: %s", fingerprint)
+ require.NoError(t, row.Close())
+}
+
+func verifyNumOfInsertedEntries(
+ t *testing.T,
+ sqlConn *sqlutils.SQLRunner,
+ fingerprint string,
+ nodeID roachpb.NodeID,
+ expectedStmtEntryCnt, expectedTxnEntryCnt int64,
+) {
+ row2 := sqlConn.DB.QueryRowContext(context.Background(),
+ `
+SELECT
+ encode(fingerprint_id, 'hex'),
+ count(*)
+FROM
+ system.statement_statistics
+WHERE
+ metadata ->> 'query' = $1 AND
+ node_id = $2 AND
+ app_name = 'flush_unit_test'
+GROUP BY
+ (fingerprint_id, node_id)
+`, fingerprint, nodeID)
+
+ var stmtFingerprintID string
+ var numOfInsertedStmtEntry int64
+
+ e := row2.Scan(&stmtFingerprintID, &numOfInsertedStmtEntry)
+ require.NoError(t, e)
+ require.Equal(t, expectedStmtEntryCnt, numOfInsertedStmtEntry, "fingerprint: %s", fingerprint)
+
+ row1 := sqlConn.DB.QueryRowContext(context.Background(), fmt.Sprintf(
+ `
+SELECT
+ count(*)
+FROM
+ system.transaction_statistics
+WHERE
+ (metadata -> 'stmtFingerprintIDs' ->> 0) = '%s' AND
+ node_id = $1 AND
+ app_name = 'flush_unit_test'
+GROUP BY
+ (fingerprint_id, node_id)
+`, stmtFingerprintID), nodeID)
+
+ var numOfInsertedTxnEntry int64
+ err := row1.Scan(&numOfInsertedTxnEntry)
+ require.NoError(t, err)
+ require.Equal(t, expectedTxnEntryCnt, numOfInsertedTxnEntry, "fingerprint: %s", fingerprint)
+}
+
+func verifyInMemoryStatsCorrectness(
+ t *testing.T, tcs []testCase, statsProvider *persistedsqlstats.PersistedSQLStats,
+) {
+ for _, tc := range tcs {
+ err := statsProvider.IterateStatementStats(context.Background(), &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
+ if tc.fingerprint == statistics.Key.Query {
+ require.Equal(t, tc.count, statistics.Stats.Count, "fingerprint: %s", tc.fingerprint)
+ }
+ return nil
+ })
+
+ require.NoError(t, err)
+ }
+}
+
+func verifyInMemoryStatsEmpty(
+ t *testing.T, tcs []testCase, statsProvider *persistedsqlstats.PersistedSQLStats,
+) {
+ for _, tc := range tcs {
+ err := statsProvider.IterateStatementStats(context.Background(), &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
+ if tc.fingerprint == statistics.Key.Query {
+ require.Equal(t, 0 /* expected */, statistics.Stats.Count, "fingerprint: %s", tc.fingerprint)
+ }
+ return nil
+ })
+
+ require.NoError(t, err)
+ }
+}
diff --git a/pkg/sql/sqlstats/persistedsqlstats/main_test.go b/pkg/sql/sqlstats/persistedsqlstats/main_test.go
new file mode 100644
index 000000000000..054cb0471b64
--- /dev/null
+++ b/pkg/sql/sqlstats/persistedsqlstats/main_test.go
@@ -0,0 +1,29 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package persistedsqlstats_test
+
+import (
+ "os"
+ "testing"
+
+ "github.com/cockroachdb/cockroach/pkg/security"
+ "github.com/cockroachdb/cockroach/pkg/security/securitytest"
+ "github.com/cockroachdb/cockroach/pkg/server"
+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
+)
+
+func TestMain(m *testing.M) {
+ security.SetAssetLoader(securitytest.EmbeddedAssets)
+ serverutils.InitTestServerFactory(server.TestServerFactory)
+ serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
+ os.Exit(m.Run())
+}
diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go
new file mode 100644
index 000000000000..e64f7d8ddc37
--- /dev/null
+++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go
@@ -0,0 +1,74 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+//
+// persistedsqlstats is a subsystem that is responsible for flushing node-local
+// in-memory stats into persisted system tables.
+
+package persistedsqlstats
+
+import (
+ "context"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/base"
+ "github.com/cockroachdb/cockroach/pkg/kv"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
+ "github.com/cockroachdb/cockroach/pkg/util/metric"
+ "github.com/cockroachdb/cockroach/pkg/util/stop"
+)
+
+// TODO(azhng): currently we do not have the ability to compute a hash for
+// query plan. This is currently being worked on by the SQL Queries team.
+// Once we are able get consistent hash value from a query plan, we should
+// update this.
+const dummyPlanHash = int64(0)
+
+// Config is a configuration struct for the persisted SQL stats subsystem.
+type Config struct {
+ Settings *cluster.Settings
+ InternalExecutor sqlutil.InternalExecutor
+ KvDB *kv.DB
+ SQLIDContainer *base.SQLIDContainer
+ Knobs *TestingKnobs
+ FlushCounter *metric.Counter
+ FlushDuration *metric.Histogram
+ FailureCounter *metric.Counter
+}
+
+// PersistedSQLStats is a sqlstats.Provider that wraps a node-local in-memory
+// sslocal.SQLStats. It behaves similar to a sslocal.SQLStats. However, it
+// periodically writes the in-memory SQL stats into system table for
+// persistence. It also performs the flush operation if it detects memory
+// pressure.
+type PersistedSQLStats struct {
+ *sslocal.SQLStats
+
+ cfg *Config
+
+ lastFlushStarted time.Time
+}
+
+var _ sqlstats.Provider = &PersistedSQLStats{}
+
+// New returns a new instance of the PersistedSQLStats.
+func New(cfg *Config, memSQLStats *sslocal.SQLStats) *PersistedSQLStats {
+ return &PersistedSQLStats{
+ SQLStats: memSQLStats,
+ cfg: cfg,
+ }
+}
+
+// Start implements sqlstats.Provider interface.
+func (s *PersistedSQLStats) Start(ctx context.Context, stopper *stop.Stopper) {
+ s.SQLStats.Start(ctx, stopper)
+}
diff --git a/pkg/sql/sqlstats/persistedsqlstats/test_utils.go b/pkg/sql/sqlstats/persistedsqlstats/test_utils.go
new file mode 100644
index 000000000000..ed49d099059b
--- /dev/null
+++ b/pkg/sql/sqlstats/persistedsqlstats/test_utils.go
@@ -0,0 +1,27 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package persistedsqlstats
+
+import "time"
+
+// TestingKnobs provides hooks and knobs for unit tests.
+type TestingKnobs struct {
+ // OnStatsFlushFinished is a callback that is triggered when a single
+ // statistics object is flushed.
+ OnStatsFlushFinished func(error)
+
+ // StubTimeNow allows tests to override the timeutil.Now() function used
+ // by the flush operation to calculate aggregated_ts timestamp.
+ StubTimeNow func() time.Time
+}
+
+// ModuleTestingKnobs implements base.ModuleTestingKnobs interface.
+func (*TestingKnobs) ModuleTestingKnobs() {}
diff --git a/pkg/sql/sqlstats/sslocal/sql_stats.go b/pkg/sql/sqlstats/sslocal/sql_stats.go
index f73822ac2361..57b1a174d68d 100644
--- a/pkg/sql/sqlstats/sslocal/sql_stats.go
+++ b/pkg/sql/sqlstats/sslocal/sql_stats.go
@@ -13,6 +13,7 @@ package sslocal
import (
"context"
"math"
+ "sync/atomic"
"time"
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -100,6 +101,21 @@ func newSQLStats(
return s
}
+// GetTotalFingerprintCount returns total number of unique statement and
+// transaction fingerprints stored in the currnet SQLStats.
+func (s *SQLStats) GetTotalFingerprintCount() int64 {
+ return atomic.LoadInt64(&s.atomic.uniqueStmtFingerprintCount) + atomic.LoadInt64(&s.atomic.uniqueTxnFingerprintCount)
+}
+
+// GetTotalFingerprintBytes returns the total amount of bytes currently
+// allocated for storing statistics for both statement and transaction
+// fingerprints.
+func (s *SQLStats) GetTotalFingerprintBytes() int64 {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.mu.mon.AllocBytes()
+}
+
func (s *SQLStats) getStatsForApplication(appName string) *ssmemstorage.Container {
s.mu.Lock()
defer s.mu.Unlock()
diff --git a/pkg/sql/sqlstats/sslocal/sslocal_provider.go b/pkg/sql/sqlstats/sslocal/sslocal_provider.go
index 15ea964e8588..10d1affd533e 100644
--- a/pkg/sql/sqlstats/sslocal/sslocal_provider.go
+++ b/pkg/sql/sqlstats/sslocal/sslocal_provider.go
@@ -125,14 +125,14 @@ func (s *SQLStats) GetLastReset() time.Time {
// IterateStatementStats implements sqlstats.Provider interface.
func (s *SQLStats) IterateStatementStats(
- _ context.Context, options *sqlstats.IteratorOptions, visitor sqlstats.StatementVisitor,
+ ctx context.Context, options *sqlstats.IteratorOptions, visitor sqlstats.StatementVisitor,
) error {
appNames := s.getAppNames(options.SortedAppNames)
for _, appName := range appNames {
statsContainer := s.getStatsForApplication(appName)
- err := statsContainer.IterateStatementStats(appName, options.SortedKey, visitor)
+ err := statsContainer.IterateStatementStats(ctx, appName, options.SortedKey, visitor)
if err != nil {
return fmt.Errorf("sql stats iteration abort: %s", err)
}
@@ -142,14 +142,14 @@ func (s *SQLStats) IterateStatementStats(
// IterateTransactionStats implements sqlstats.Provider interface.
func (s *SQLStats) IterateTransactionStats(
- _ context.Context, options *sqlstats.IteratorOptions, visitor sqlstats.TransactionVisitor,
+ ctx context.Context, options *sqlstats.IteratorOptions, visitor sqlstats.TransactionVisitor,
) error {
appNames := s.getAppNames(options.SortedAppNames)
for _, appName := range appNames {
statsContainer := s.getStatsForApplication(appName)
- err := statsContainer.IterateTransactionStats(appName, options.SortedKey, visitor)
+ err := statsContainer.IterateTransactionStats(ctx, appName, options.SortedKey, visitor)
if err != nil {
return fmt.Errorf("sql stats iteration abort: %s", err)
}
diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go
index dcfeb62f48e4..3cda730e5d8b 100644
--- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go
+++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go
@@ -128,7 +128,7 @@ func New(
// IterateStatementStats iterates through the stored statement statistics
// stored in this Container.
func (s *Container) IterateStatementStats(
- appName string, orderedKey bool, visitor sqlstats.StatementVisitor,
+ ctx context.Context, appName string, orderedKey bool, visitor sqlstats.StatementVisitor,
) error {
var stmtKeys stmtList
s.mu.Lock()
@@ -176,7 +176,7 @@ func (s *Container) IterateStatementStats(
Stats: data,
}
- err := visitor(&collectedStats)
+ err := visitor(ctx, &collectedStats)
if err != nil {
return fmt.Errorf("sql stats iteration abort: %s", err)
}
@@ -187,7 +187,7 @@ func (s *Container) IterateStatementStats(
// IterateTransactionStats iterates through the stored transaction statistics
// stored in this Container.
func (s *Container) IterateTransactionStats(
- appName string, orderedKey bool, visitor sqlstats.TransactionVisitor,
+ ctx context.Context, appName string, orderedKey bool, visitor sqlstats.TransactionVisitor,
) error {
// Retrieve the transaction keys and optionally sort them.
var txnKeys txnList
@@ -221,7 +221,7 @@ func (s *Container) IterateTransactionStats(
}
txnStats.mu.Unlock()
- err := visitor(txnKey, &collectedStats)
+ err := visitor(ctx, txnKey, &collectedStats)
if err != nil {
return fmt.Errorf("sql stats iteration abort: %s", err)
}
diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go
index f32419ed9ced..f5fab1b8ebcc 100644
--- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go
+++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go
@@ -21,6 +21,21 @@ import (
"github.com/cockroachdb/errors"
)
+var (
+ // ErrMemoryPressure is returned from the Container when we have reached
+ // the memory limit allowed.
+ ErrMemoryPressure = errors.New("insufficient sql stats memory")
+
+ // ErrFingerprintLimitReached is returned from the Container when we have
+ // more fingerprints than the limit specified in the cluster setting.
+ ErrFingerprintLimitReached = errors.New("sql stats fingerprint limit reached")
+
+ // ErrExecStatsFingerprintFlushed is returned from the Container when the
+ // stats object for the fingerprint has been flushed to system table before
+ // the roachpb.ExecStats can be recorded.
+ ErrExecStatsFingerprintFlushed = errors.New("stmtStats flushed before execution stats can be recorded")
+)
+
var _ sqlstats.Writer = &Container{}
// RecordStatement implements sqlstats.Writer interface.
@@ -59,7 +74,7 @@ func (s *Container) RecordStatement(
// This means we have reached the limit of unique fingerprintstats. We don't
// record anything and abort the operation.
if throttled {
- return stmtFingerprintID, errors.New("unique fingerprint limit has been reached")
+ return stmtFingerprintID, ErrFingerprintLimitReached
}
// This statement was below the latency threshold or sql stats aren't being
@@ -118,7 +133,7 @@ func (s *Container) RecordStatement(
// memory budget, delete the entry that we just created and report the error.
if err := s.mu.acc.Grow(ctx, estimatedMemoryAllocBytes); err != nil {
delete(s.mu.stmts, statementKey)
- return stats.ID, err
+ return stats.ID, ErrMemoryPressure
}
}
@@ -132,7 +147,7 @@ func (s *Container) RecordStatementExecStats(
stmtStats, _, _, _, _ :=
s.getStatsForStmt(key.Query, key.ImplicitTxn, key.Database, key.Failed, false /* createIfNotExists */)
if stmtStats == nil {
- return errors.New("stmtStats flushed before execution stats can be recorded")
+ return ErrExecStatsFingerprintFlushed
}
stmtStats.recordExecStats(stats)
return nil
@@ -169,7 +184,7 @@ func (s *Container) RecordTransaction(
stats, created, throttled := s.getStatsForTxnWithKey(key, value.StatementFingerprintIDs, true /* createIfNonexistent */)
if throttled {
- return errors.New("unique fingerprint limit has been reached")
+ return ErrFingerprintLimitReached
}
// Collect the per-transaction statistics.
@@ -188,7 +203,7 @@ func (s *Container) RecordTransaction(
if err := s.mu.acc.Grow(ctx, estimatedMemAllocBytes); err != nil {
delete(s.mu.txns, key)
s.mu.Unlock()
- return err
+ return ErrMemoryPressure
}
s.mu.Unlock()
}
diff --git a/pkg/sql/sqlstats/ssprovider.go b/pkg/sql/sqlstats/ssprovider.go
index 3619bceeb136..4146e56d5633 100644
--- a/pkg/sql/sqlstats/ssprovider.go
+++ b/pkg/sql/sqlstats/ssprovider.go
@@ -100,12 +100,12 @@ type IteratorOptions struct {
// StatementVisitor is the callback that is invoked when caller iterate through
// all statement statistics using IterateStatementStats(). If an error is
// encountered when calling the visitor, the iteration is aborted.
-type StatementVisitor func(*roachpb.CollectedStatementStatistics) error
+type StatementVisitor func(context.Context, *roachpb.CollectedStatementStatistics) error
// TransactionVisitor is the callback that is invoked when caller iterate through
// all transaction statistics using IterateTransactionStats(). If an error is
// encountered when calling the visitor, the iteration is aborted.
-type TransactionVisitor func(roachpb.TransactionFingerprintID, *roachpb.CollectedTransactionStatistics) error
+type TransactionVisitor func(context.Context, roachpb.TransactionFingerprintID, *roachpb.CollectedTransactionStatistics) error
// AggregatedTransactionVisitor is the callback invoked when iterate through
// transaction statistics collected at the application level using
diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go
index 878f1d6572db..ff986599c537 100644
--- a/pkg/sql/txn_state.go
+++ b/pkg/sql/txn_state.go
@@ -202,7 +202,9 @@ func (ts *txnState) resetForNewSQLTxn(
ts.mu.txnStart = timeutil.Now()
ts.mu.Unlock()
if historicalTimestamp != nil {
- ts.setHistoricalTimestamp(ts.Ctx, *historicalTimestamp)
+ if err := ts.setHistoricalTimestamp(ts.Ctx, *historicalTimestamp); err != nil {
+ panic(err)
+ }
}
if err := ts.setReadOnlyMode(readOnly); err != nil {
panic(err)
@@ -262,11 +264,16 @@ func (ts *txnState) finishExternalTxn() {
ts.mu.Unlock()
}
-func (ts *txnState) setHistoricalTimestamp(ctx context.Context, historicalTimestamp hlc.Timestamp) {
+func (ts *txnState) setHistoricalTimestamp(
+ ctx context.Context, historicalTimestamp hlc.Timestamp,
+) error {
ts.mu.Lock()
- ts.mu.txn.SetFixedTimestamp(ctx, historicalTimestamp)
- ts.mu.Unlock()
+ defer ts.mu.Unlock()
+ if err := ts.mu.txn.SetFixedTimestamp(ctx, historicalTimestamp); err != nil {
+ return err
+ }
ts.isHistorical = true
+ return nil
}
// getReadTimestamp returns the transaction's current read timestamp.
diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go
index 7873940285fc..8fe18efe6949 100644
--- a/pkg/ts/catalog/chart_catalog.go
+++ b/pkg/ts/catalog/chart_catalog.go
@@ -1785,6 +1785,30 @@ var charts = []sectionDescription{
Title: "Number of internal fingerprint statistics being discarded",
Metrics: []string{"sql.stats.discarded.current.internal"},
},
+ {
+ Title: "Number of times SQL Stats are flushed to persistent storage",
+ Metrics: []string{"sql.stats.flush.count"},
+ },
+ {
+ Title: "Number of errors encountered when flushing SQL Stats",
+ Metrics: []string{"sql.stats.flush.error"},
+ },
+ {
+ Title: "Time took to complete SQL Stats flush",
+ Metrics: []string{"sql.stats.flush.duration"},
+ },
+ {
+ Title: "Number of times internal SQL Stats are flushed to persistent storage",
+ Metrics: []string{"sql.stats.flush.count.internal"},
+ },
+ {
+ Title: "Number of errors encountered when flushing internal SQL Stats",
+ Metrics: []string{"sql.stats.flush.error.internal"},
+ },
+ {
+ Title: "Time took to complete internal SQL Stats flush",
+ Metrics: []string{"sql.stats.flush.duration.internal"},
+ },
},
},
{