diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 91e20abc851c..62e19c53780c 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -76,6 +76,7 @@ server.web_session.purge.max_deletions_per_cycle integer 10 the maximum number o
server.web_session.purge.period duration 1h0m0s the time until old sessions are deleted
server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged
server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid
+sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use
sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed
sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed
sql.cross_db_sequence_references.enabled boolean false if true, sequences referenced by tables from other databases are allowed
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index aca61f0835ac..2dd61ec7793d 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -81,6 +81,7 @@
server.web_session.purge.period | duration | 1h0m0s | the time until old sessions are deleted |
server.web_session.purge.ttl | duration | 1h0m0s | if nonzero, entries in system.web_sessions older than this duration are periodically purged |
server.web_session_timeout | duration | 168h0m0s | the duration that a newly created web session will be valid |
+sql.contention.txn_id_cache.max_size | byte size | 64 MiB | the maximum byte size TxnID cache will use |
sql.cross_db_fks.enabled | boolean | false | if true, creating foreign key references across databases is allowed |
sql.cross_db_sequence_owners.enabled | boolean | false | if true, creating sequences owned by tables from other databases is allowed |
sql.cross_db_sequence_references.enabled | boolean | false | if true, sequences referenced by tables from other databases are allowed |
diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel
index fb184e62743f..19a099348167 100644
--- a/pkg/BUILD.bazel
+++ b/pkg/BUILD.bazel
@@ -231,6 +231,7 @@ ALL_TESTS = [
"//pkg/sql/colflow/colrpc:colrpc_test",
"//pkg/sql/colflow:colflow_test",
"//pkg/sql/colmem:colmem_test",
+ "//pkg/sql/contention/txnidcache:txnidcache_test",
"//pkg/sql/contention:contention_test",
"//pkg/sql/covering:covering_test",
"//pkg/sql/distsql:distsql_test",
diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel
index ffb340b64ca6..155442eeb97b 100644
--- a/pkg/sql/BUILD.bazel
+++ b/pkg/sql/BUILD.bazel
@@ -307,6 +307,7 @@ go_library(
"//pkg/sql/colexec",
"//pkg/sql/colflow",
"//pkg/sql/contention",
+ "//pkg/sql/contention/txnidcache",
"//pkg/sql/covering",
"//pkg/sql/delegate",
"//pkg/sql/distsql",
diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go
index dcd3b526e636..ec22e8aab7aa 100644
--- a/pkg/sql/conn_executor.go
+++ b/pkg/sql/conn_executor.go
@@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
+ "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/idxusage"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
@@ -64,6 +65,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"golang.org/x/net/trace"
@@ -280,6 +282,10 @@ type Server struct {
// node as gateway node.
indexUsageStats *idxusage.LocalIndexUsageStats
+ // txnIDCache stores the mapping from transaction ID to transaction
+ //fingerprint IDs for all recently executed.
+ txnIDCache txnidcache.Provider
+
// Metrics is used to account normal queries.
Metrics Metrics
@@ -320,6 +326,10 @@ type Metrics struct {
type ServerMetrics struct {
// StatsMetrics contains metrics for SQL statistics collection.
StatsMetrics StatsMetrics
+
+ // ContentionSubsystemMetrics contains metrics related to contention
+ // subsystem.
+ ContentionSubsystemMetrics txnidcache.Metrics
}
// NewServer creates a new Server. Start() needs to be called before the Server
@@ -362,6 +372,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
ChannelSize: idxusage.DefaultChannelSize,
Setting: cfg.Settings,
}),
+ txnIDCache: txnidcache.NewTxnIDCache(cfg.Settings, &serverMetrics.ContentionSubsystemMetrics),
}
telemetryLoggingMetrics := &TelemetryLoggingMetrics{}
@@ -452,6 +463,7 @@ func makeServerMetrics(cfg *ExecutorConfig) ServerMetrics {
MetaSQLTxnStatsCollectionOverhead, 6*metricsSampleInterval,
),
},
+ ContentionSubsystemMetrics: txnidcache.NewMetrics(),
}
}
@@ -463,6 +475,8 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
// accumulated in the reporter when the telemetry server fails.
// Usually it is telemetry's reporter's job to clear the reporting SQL Stats.
s.reportedStats.Start(ctx, stopper)
+
+ s.txnIDCache.Start(ctx, stopper)
}
// GetSQLStatsController returns the persistedsqlstats.Controller for current
@@ -488,6 +502,11 @@ func (s *Server) GetReportedSQLStatsController() *sslocal.Controller {
return s.reportedStatsController
}
+// GetTxnIDCache returns the txnidcache.Cache for the current sql.Server.
+func (s *Server) GetTxnIDCache() txnidcache.Provider {
+ return s.txnIDCache
+}
+
// GetScrubbedStmtStats returns the statement statistics by app, with the
// queries scrubbed of their identifiers. Any statements which cannot be
// scrubbed will be omitted from the returned map.
@@ -813,6 +832,7 @@ func (s *Server) newConnExecutor(
hasCreatedTemporarySchema: false,
stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder,
indexUsageStats: s.indexUsageStats,
+ txnIDCacheWriter: s.txnIDCache.GetWriter(),
}
ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
@@ -983,6 +1003,7 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{})
func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.sessionEventf(ctx, "finishing connExecutor")
+ ex.txnIDCacheWriter.Close()
txnEv := noEvent
if _, noTxn := ex.machine.CurState().(stateNoTxn); !noTxn {
@@ -1018,7 +1039,8 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.state.finishExternalTxn()
}
- if err := ex.resetExtraTxnState(ctx, txnEv); err != nil {
+ // Since we are closing connExecutor, no need to fetch txnID.
+ if err := ex.resetExtraTxnState(ctx, txnEv, uuid.UUID{}); err != nil {
log.Warningf(ctx, "error while cleaning up connExecutor: %s", err)
}
@@ -1400,6 +1422,9 @@ type connExecutor struct {
// indexUsageStats is used to track index usage stats.
indexUsageStats *idxusage.LocalIndexUsageStats
+
+ // TODO(azhng): wip
+ txnIDCacheWriter txnidcache.Writer
}
// ctxHolder contains a connection's context and, while session tracing is
@@ -1516,7 +1541,9 @@ func (ns *prepStmtNamespace) resetTo(
// resetExtraTxnState resets the fields of ex.extraTxnState when a transaction
// commits, rolls back or restarts.
-func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) error {
+func (ex *connExecutor) resetExtraTxnState(
+ ctx context.Context, ev txnEvent, txnID uuid.UUID,
+) error {
ex.extraTxnState.jobs = nil
ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}
ex.extraTxnState.schemaChangerState = SchemaChangerState{
@@ -1542,7 +1569,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err
delete(ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals, name)
}
ex.extraTxnState.savepoints.clear()
- ex.onTxnFinish(ctx, ev)
+ ex.onTxnFinish(ctx, ev, txnID)
case txnRestart:
ex.onTxnRestart()
ex.state.mu.Lock()
@@ -2173,8 +2200,8 @@ func (ex *connExecutor) execCopyIn(
}
} else {
txnOpt = copyTxnOpt{
- resetExtraTxnState: func(ctx context.Context) error {
- return ex.resetExtraTxnState(ctx, noEvent)
+ resetExtraTxnState: func(ctx context.Context, txnID uuid.UUID) error {
+ return ex.resetExtraTxnState(ctx, noEvent, txnID)
},
}
}
@@ -2598,6 +2625,14 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
txnIsOpen = true
}
+ // TODO(azhng): wip: should we be smarter about this to reduce locking?
+ ex.state.mu.RLock()
+ var txnID uuid.UUID
+ if ex.state.mu.txn != nil {
+ txnID = ex.state.mu.txn.ID()
+ }
+ ex.state.mu.RUnlock()
+
ex.mu.Lock()
err := ex.machine.ApplyWithPayload(withStatement(ex.Ctx(), ex.curStmtAST), ev, payload)
ex.mu.Unlock()
@@ -2718,7 +2753,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
fallthrough
case txnRestart, txnRollback:
- if err := ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent); err != nil {
+ if err := ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent, txnID); err != nil {
return advanceInfo{}, err
}
default:
diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go
index 668c57d02760..9e12ad186590 100644
--- a/pkg/sql/conn_executor_exec.go
+++ b/pkg/sql/conn_executor_exec.go
@@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
+ "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain"
@@ -51,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)
@@ -1909,7 +1911,7 @@ func (ex *connExecutor) recordTransactionStart() {
}
}
-func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
+func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnID uuid.UUID) {
if ex.extraTxnState.shouldExecuteOnTxnFinish {
ex.extraTxnState.shouldExecuteOnTxnFinish = false
txnStart := ex.extraTxnState.txnFinishClosure.txnStartTime
@@ -1923,7 +1925,14 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
transactionFingerprintID,
)
}
- err := ex.recordTransaction(ctx, transactionFingerprintID, ev, implicit, txnStart)
+ if ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded != nil {
+ ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded(
+ ex.sessionData(),
+ txnID,
+ transactionFingerprintID,
+ )
+ }
+ err := ex.recordTransaction(ctx, txnID, transactionFingerprintID, ev, implicit, txnStart)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to record transaction stats: %s", err)
@@ -1954,6 +1963,7 @@ func (ex *connExecutor) onTxnRestart() {
func (ex *connExecutor) recordTransaction(
ctx context.Context,
+ transactionID uuid.UUID,
transactionFingerprintID roachpb.TransactionFingerprintID,
ev txnEvent,
implicit bool,
@@ -1961,7 +1971,7 @@ func (ex *connExecutor) recordTransaction(
) error {
recordingStart := timeutil.Now()
defer func() {
- recordingOverhead := timeutil.Since(recordingStart)
+ recordingOverhead := timeutil.Now().Sub(recordingStart)
ex.server.
ServerMetrics.
StatsMetrics.
@@ -1973,6 +1983,11 @@ func (ex *connExecutor) recordTransaction(
ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1)
ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds())
+ ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
+ TxnID: transactionID,
+ TxnFingerprintID: transactionFingerprintID,
+ })
+
txnServiceLat := ex.phaseTimes.GetTransactionServiceLatency()
txnRetryLat := ex.phaseTimes.GetTransactionRetryLatency()
commitLat := ex.phaseTimes.GetCommitLatency()
diff --git a/pkg/sql/contention/txnidcache/BUILD.bazel b/pkg/sql/contention/txnidcache/BUILD.bazel
new file mode 100644
index 000000000000..daa9bb7107cb
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/BUILD.bazel
@@ -0,0 +1,56 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "txnidcache",
+ srcs = [
+ "cluster_settings.go",
+ "metrics.go",
+ "provider.go",
+ "sharded_store.go",
+ "txn_id_cache.go",
+ "txn_id_cache_shard.go",
+ "txn_id_cache_write_buffer.go",
+ "writer_pool.go",
+ ],
+ importpath = "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/roachpb:with-mocks",
+ "//pkg/settings",
+ "//pkg/settings/cluster",
+ "//pkg/util/cache",
+ "//pkg/util/encoding",
+ "//pkg/util/metric",
+ "//pkg/util/stop",
+ "//pkg/util/syncutil",
+ "//pkg/util/uuid",
+ ],
+)
+
+go_test(
+ name = "txnidcache_test",
+ srcs = [
+ "main_test.go",
+ "txn_id_cache_test.go",
+ ],
+ deps = [
+ ":txnidcache",
+ "//pkg/kv",
+ "//pkg/roachpb:with-mocks",
+ "//pkg/security",
+ "//pkg/security/securitytest",
+ "//pkg/server",
+ "//pkg/sql",
+ "//pkg/sql/sessiondata",
+ "//pkg/sql/tests",
+ "//pkg/testutils",
+ "//pkg/testutils/serverutils",
+ "//pkg/testutils/sqlutils",
+ "//pkg/testutils/testcluster",
+ "//pkg/util/leaktest",
+ "//pkg/util/log",
+ "//pkg/util/uuid",
+ "@com_github_cockroachdb_errors//:errors",
+ "@com_github_stretchr_testify//require",
+ ],
+)
diff --git a/pkg/sql/contention/txnidcache/cluster_settings.go b/pkg/sql/contention/txnidcache/cluster_settings.go
new file mode 100644
index 000000000000..63d3d04fb0a6
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/cluster_settings.go
@@ -0,0 +1,20 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import "github.com/cockroachdb/cockroach/pkg/settings"
+
+// MaxSize limits the maximum byte size can be used by the TxnIDCache.
+var MaxSize = settings.RegisterByteSizeSetting(
+ `sql.contention.txn_id_cache.max_size`,
+ "the maximum byte size TxnID cache will use",
+ 64*1024*1024, // 64 MB
+).WithPublic()
diff --git a/pkg/sql/contention/txnidcache/main_test.go b/pkg/sql/contention/txnidcache/main_test.go
new file mode 100644
index 000000000000..c5b305067a8f
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/main_test.go
@@ -0,0 +1,29 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache_test
+
+import (
+ "os"
+ "testing"
+
+ "github.com/cockroachdb/cockroach/pkg/security"
+ "github.com/cockroachdb/cockroach/pkg/security/securitytest"
+ "github.com/cockroachdb/cockroach/pkg/server"
+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
+)
+
+func TestMain(m *testing.M) {
+ security.SetAssetLoader(securitytest.EmbeddedAssets)
+ serverutils.InitTestServerFactory(server.TestServerFactory)
+ serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
+ os.Exit(m.Run())
+}
diff --git a/pkg/sql/contention/txnidcache/metrics.go b/pkg/sql/contention/txnidcache/metrics.go
new file mode 100644
index 000000000000..e59105d41a9f
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/metrics.go
@@ -0,0 +1,50 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import "github.com/cockroachdb/cockroach/pkg/util/metric"
+
+// Metrics is a structs that include all metrics related to contention
+// subsystem.
+type Metrics struct {
+ TxnIDCacheSize *metric.Gauge
+ DiscardedResolvedTxnIDCount *metric.Counter
+ EvictedTxnIDCacheCount *metric.Counter
+}
+
+var _ metric.Struct = Metrics{}
+
+// MetricStruct implements the metric.Struct interface.
+func (Metrics) MetricStruct() {}
+
+// NewMetrics returns a new instance of Metrics.
+func NewMetrics() Metrics {
+ return Metrics{
+ TxnIDCacheSize: metric.NewGauge(metric.Metadata{
+ Name: "sql.contention.txn_id_cache.size",
+ Help: "Current memory usage for TxnID Cache",
+ Measurement: "Memory",
+ Unit: metric.Unit_BYTES,
+ }),
+ DiscardedResolvedTxnIDCount: metric.NewCounter(metric.Metadata{
+ Name: "sql.contention.txn_id_cache.discarded_count",
+ Help: "Number of discarded resolved transaction IDs",
+ Measurement: "Discarded Resolved Transaction IDs",
+ Unit: metric.Unit_COUNT,
+ }),
+ EvictedTxnIDCacheCount: metric.NewCounter(metric.Metadata{
+ Name: "sql.contention.txn_id_cache.evicted_count",
+ Help: "Number of evicted resolved transaction IDs",
+ Measurement: "Evicted Resolved Transaction IDs",
+ Unit: metric.Unit_COUNT,
+ }),
+ }
+}
diff --git a/pkg/sql/contention/txnidcache/provider.go b/pkg/sql/contention/txnidcache/provider.go
new file mode 100644
index 000000000000..0d1f44249785
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/provider.go
@@ -0,0 +1,83 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import (
+ "context"
+
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/util/stop"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
+)
+
+// Provider is the main interface for txnidcache.
+type Provider interface {
+ // Start starts the txnidcache.Provider.
+ Start(ctx context.Context, stopper *stop.Stopper)
+
+ reader
+ messageSink
+ writerPool
+}
+
+// Writer is the interface that can be used to write to txnidcache.
+type Writer interface {
+ writer
+
+ // Close closes the Writer and flushes any pending data. The Writer should
+ // not be used after its closed.
+ Close()
+}
+
+type disconnector interface {
+ // disconnect allows a Writer to be disconnected from its attached target.
+ disconnect(Writer)
+}
+
+type writerPool interface {
+ // GetWriter returns a Writer to the caller. After returned, the Writer
+ // is attached to the writerPool, and can be disconnected using the
+ // disconnector.
+ GetWriter() Writer
+
+ disconnector
+}
+
+// messageSink is implemented by the top-level cache so that the Writers
+// can push message blocks into.
+type messageSink interface {
+ // push allows the writer to push a message block into the sink.
+ push(messageBlock)
+
+ disconnector
+}
+
+type writer interface {
+ // Record writes a pair of transactionID and transaction fingerprint ID
+ // into a temporary buffer. This buffer will eventually be flushed into
+ // the transaction ID cache asynchronously.
+ Record(resolvedTxnID ResolvedTxnID)
+
+ // Flush starts the flushing process of writer's temporary buffer.
+ Flush()
+}
+
+type reader interface {
+ // Lookup returns the corresponding transaction fingerprint ID for a given txnID,
+ // if the given txnID has no entry in the Cache, the returned "found" boolean
+ // will be false.
+ Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID, found bool)
+}
+
+type storage interface {
+ reader
+ writer
+}
diff --git a/pkg/sql/contention/txnidcache/sharded_store.go b/pkg/sql/contention/txnidcache/sharded_store.go
new file mode 100644
index 000000000000..8a80c661cc6d
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/sharded_store.go
@@ -0,0 +1,50 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import (
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
+)
+
+type shardedCache [shardCount]storage
+
+var _ storage = shardedCache{}
+
+func newShardedCache(fn func() storage) shardedCache {
+ s := shardedCache{}
+ for i := 0; i < shardCount; i++ {
+ s[i] = fn()
+ }
+ return s
+}
+
+// Record implements the writer interface.
+func (s shardedCache) Record(resolvedTxnID ResolvedTxnID) {
+ shard := s[hashTxnID(resolvedTxnID.TxnID)]
+ shard.Record(resolvedTxnID)
+}
+
+// Flush implements the writer interface.
+func (s shardedCache) Flush() {
+ for _, shard := range s {
+ shard.Flush()
+ }
+}
+
+// Lookup implements the reader interface.
+func (s shardedCache) Lookup(
+ txnID uuid.UUID,
+) (result roachpb.TransactionFingerprintID, found bool) {
+ shardIdx := hashTxnID(txnID)
+
+ return s[shardIdx].Lookup(txnID)
+}
diff --git a/pkg/sql/contention/txnidcache/txn_id_cache.go b/pkg/sql/contention/txnidcache/txn_id_cache.go
new file mode 100644
index 000000000000..9589918d7f69
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/txn_id_cache.go
@@ -0,0 +1,178 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import (
+ "context"
+
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/util/encoding"
+ "github.com/cockroachdb/cockroach/pkg/util/stop"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
+)
+
+const (
+ shardCount = 256
+ channelSize = 128
+)
+
+// Cache stores the mapping from the Transaction IDs (UUID) of recently
+// executed transactions to their corresponding Transaction Fingerprint ID (uint64).
+// The size of Cache is controlled via sql.contention.txn_id_cache.max_size
+// cluster setting, and it follows FIFO eviction policy once the cache size
+// reaches the limit defined by the cluster setting.
+//
+// Cache's overall architecture is as follows:
+// +------------------------------------+
+// | connExecutor ---------* |
+// | +-------------+ | writes to |
+// | | Writer |<-* |
+// | +-----+-------+ |
+// +------------|-----------------------+
+// |
+// when full, Writer flushes into a channel.
+// |
+// v
+// channel
+// ^
+// |
+// Cache runs a goroutine that polls the channel --*
+// | and sends the message to its |
+// | corresponding shard |
+// | *-----------------------------*
+// +----------------------------|---+
+// | Cache | |
+// | | +-------------v--------------------------------------*
+// | *----> | shard1 [writeBuffer] ----------- when full ---* |
+// | | | | |
+// | | | [cache.UnorderedCache] <---- flush ----* |
+// | | +----------------------------------------------------*
+// | *----> shard2 |
+// | | |
+// | *----> shard2 |
+// | | |
+// | ....... |
+// | | |
+// | *----> shard256 |
+// +--------------------------------+
+type Cache struct {
+ st *cluster.Settings
+
+ msgChan chan messageBlock
+
+ store storage
+ pool writerPool
+
+ metrics *Metrics
+}
+
+var (
+ entrySize = int64(uuid.UUID{}.Size()) +
+ roachpb.TransactionFingerprintID(0).Size() + 8 // the size of a hash.
+
+ _ Provider = &Cache{}
+)
+
+// NewTxnIDCache creates a new instance of Cache.
+func NewTxnIDCache(st *cluster.Settings, metrics *Metrics) *Cache {
+ t := &Cache{
+ st: st,
+ metrics: metrics,
+ msgChan: make(chan messageBlock, channelSize),
+ }
+
+ t.pool = newWriterPoolImpl(func() Writer {
+ return t.createNewWriteBuffer()
+ })
+
+ t.store = newShardedCache(func() storage {
+ return newTxnIDCacheShard(t)
+ })
+
+ return t
+}
+
+// Start implements the Provider interface.
+func (t *Cache) Start(ctx context.Context, stopper *stop.Stopper) {
+ _ = stopper.RunAsyncTask(ctx, "txn-id-cache-ingest", func(ctx context.Context) {
+ for {
+ select {
+ case msgBlock := <-t.msgChan:
+ for i := range msgBlock.data {
+ t.store.Record(msgBlock.data[i])
+ }
+ if msgBlock.forceImmediateFlushIntoCache {
+ t.store.Flush()
+ }
+ case <-stopper.ShouldQuiesce():
+ return
+ }
+ }
+ })
+}
+
+// Lookup implements the reader interface.
+func (t *Cache) Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID, found bool) {
+ return t.store.Lookup(txnID)
+}
+
+// GetWriter implements the writerPool interface.
+func (t *Cache) GetWriter() Writer {
+ return t.pool.GetWriter()
+}
+
+// push implements the messageSink interface.
+func (t *Cache) push(msg messageBlock) {
+ select {
+ case t.msgChan <- msg:
+ // noop.
+ default:
+ t.metrics.DiscardedResolvedTxnIDCount.Inc(messageBlockSize)
+ }
+}
+
+// disconnect implements the messageSink interface.
+func (t *Cache) disconnect(w Writer) {
+ t.pool.disconnect(w)
+}
+
+func (t *Cache) createNewWriteBuffer() *ConcurrentWriteBuffer {
+ writeBuffer := &ConcurrentWriteBuffer{
+ sink: t,
+ }
+ writeBuffer.flushDone.L = writeBuffer.flushSyncLock.RLocker()
+ return writeBuffer
+}
+
+// FlushActiveWritersForTest is only used in test to flush all writers so the
+// content of the Cache can be inspected.
+func (t *Cache) FlushActiveWritersForTest() {
+ poolImpl := t.pool.(*writerPoolImpl)
+ poolImpl.mu.Lock()
+ defer poolImpl.mu.Unlock()
+ for txnIDCacheWriteBuffer := range poolImpl.mu.activeWriters {
+ txnIDCacheWriteBuffer.(*ConcurrentWriteBuffer).flushForTest()
+ }
+}
+
+func hashTxnID(txnID uuid.UUID) int {
+ b := txnID.GetBytes()
+ b, val1, err := encoding.DecodeUint64Descending(b)
+ if err != nil {
+ panic(err)
+ }
+ _, val2, err := encoding.DecodeUint64Descending(b)
+ if err != nil {
+ panic(err)
+ }
+ return int((val1 ^ val2) % shardCount)
+}
diff --git a/pkg/sql/contention/txnidcache/txn_id_cache_shard.go b/pkg/sql/contention/txnidcache/txn_id_cache_shard.go
new file mode 100644
index 000000000000..deca72d3ca59
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/txn_id_cache_shard.go
@@ -0,0 +1,102 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import (
+ "sync/atomic"
+
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/util/cache"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
+)
+
+const cacheShardWriteBufferSize = 24
+
+// cacheShard is a shard of the txnidcache.Cache. The idea behind sharding
+// the cache is to reduce the mutex contention.
+type cacheShard struct {
+ mu struct {
+ syncutil.RWMutex
+ store *cache.UnorderedCache
+ }
+
+ atomic struct {
+ allocatedMem int64
+ }
+ writeBuffer []ResolvedTxnID
+ txnIDCache *Cache
+}
+
+var _ storage = &cacheShard{}
+
+type ResolvedTxnID struct {
+ TxnID uuid.UUID
+ TxnFingerprintID roachpb.TransactionFingerprintID
+}
+
+func newTxnIDCacheShard(t *Cache) *cacheShard {
+ shard := &cacheShard{
+ writeBuffer: make([]ResolvedTxnID, 0, cacheShardWriteBufferSize),
+ txnIDCache: t,
+ }
+ shard.mu.store = cache.NewUnorderedCache(cache.Config{
+ Policy: cache.CacheFIFO,
+ ShouldEvict: func(_ int, _, _ interface{}) bool {
+ limit := MaxSize.Get(&t.st.SV)
+ limitPerShard := limit / shardCount
+ return atomic.LoadInt64(&shard.atomic.allocatedMem) > limitPerShard
+ },
+ OnEvictedEntry: func(_ *cache.Entry) {
+ atomic.AddInt64(&shard.atomic.allocatedMem, -entrySize)
+ t.metrics.TxnIDCacheSize.Dec(entrySize)
+ t.metrics.EvictedTxnIDCacheCount.Inc(1)
+ },
+ })
+ return shard
+}
+
+func (s *cacheShard) flushLocked() {
+ memUsed := entrySize * int64(len(s.writeBuffer))
+ atomic.AddInt64(&s.atomic.allocatedMem, memUsed)
+ s.txnIDCache.metrics.TxnIDCacheSize.Inc(memUsed)
+
+ for _, v := range s.writeBuffer {
+ s.mu.store.Add(v.TxnID, v.TxnFingerprintID)
+ }
+ s.writeBuffer = s.writeBuffer[:0]
+}
+
+// Record implements the writer interface.
+func (s *cacheShard) Record(msg ResolvedTxnID) {
+ s.writeBuffer = append(s.writeBuffer, msg)
+ if len(s.writeBuffer) == cap(s.writeBuffer) {
+ s.Flush()
+ }
+}
+
+// Flush implements the writer interface.
+func (s *cacheShard) Flush() {
+ s.mu.Lock()
+ s.flushLocked()
+ s.mu.Unlock()
+}
+
+// Lookup implements the reader interface.
+func (s *cacheShard) Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID, found bool) {
+ s.mu.RLock()
+ value, found := s.mu.store.Get(txnID)
+ s.mu.RUnlock()
+ if !found {
+ return result, found
+ }
+ return value.(roachpb.TransactionFingerprintID), found
+}
diff --git a/pkg/sql/contention/txnidcache/txn_id_cache_test.go b/pkg/sql/contention/txnidcache/txn_id_cache_test.go
new file mode 100644
index 000000000000..1b3861bea105
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/txn_id_cache_test.go
@@ -0,0 +1,215 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache_test
+
+import (
+ "context"
+ "strings"
+ "testing"
+
+ "github.com/cockroachdb/cockroach/pkg/kv"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/security"
+ "github.com/cockroachdb/cockroach/pkg/sql"
+ "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
+ "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
+ "github.com/cockroachdb/cockroach/pkg/sql/tests"
+ "github.com/cockroachdb/cockroach/pkg/testutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
+ "github.com/cockroachdb/errors"
+ "github.com/stretchr/testify/require"
+)
+
+// TestTransactionIDCache tests the correctness of the txnidcache.Cache.
+func TestTransactionIDCache(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ ctx := context.Background()
+ params, _ := tests.CreateTestServerParams()
+
+ appName := "txnIDCacheTest"
+ expectedTxnIDToUUIDMapping := make(map[uuid.UUID]roachpb.TransactionFingerprintID)
+
+ params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
+ BeforeTxnStatsRecorded: func(
+ sessionData *sessiondata.SessionData,
+ txnID uuid.UUID,
+ txnFingerprintID roachpb.TransactionFingerprintID,
+ ) {
+ if strings.Contains(sessionData.ApplicationName, appName) {
+ expectedTxnIDToUUIDMapping[txnID] = txnFingerprintID
+ }
+ },
+ }
+
+ testServer, sqlConn, kvDB := serverutils.StartServer(t, params)
+ defer func() {
+ require.NoError(t, sqlConn.Close())
+ testServer.Stopper().Stop(ctx)
+ }()
+
+ testConn := sqlutils.MakeSQLRunner(sqlConn)
+
+ // Set the cache size limit to a very generous amount to prevent premature
+ // eviction.
+ testConn.Exec(t, "SET CLUSTER SETTING sql.contention.txn_id_cache.max_size = '1GB'")
+
+ testConn.Exec(t, "CREATE DATABASE txnIDTest")
+ testConn.Exec(t, "USE txnIDTest")
+ testConn.Exec(t, "CREATE TABLE t AS SELECT generate_series(1, 10)")
+ testConn.Exec(t, "SET application_name = $1", appName)
+
+ testCases := []struct {
+ stmts []string
+ explicit bool
+ }{
+ // Implicit transactions that will have same statement fingerprint.
+ {
+ stmts: []string{"SELECT 1"},
+ explicit: false,
+ },
+ {
+ stmts: []string{"SELECT 2"},
+ explicit: false,
+ },
+
+ // Implicit transaction that have different statement fingerprints.
+ {
+ stmts: []string{"SELECT 1, 1"},
+ explicit: false,
+ },
+ {
+ stmts: []string{"SELECT 1, 1, 2"},
+ explicit: false,
+ },
+
+ // Explicit Transactions.
+ {
+ stmts: []string{"SELECT 1"},
+ explicit: true,
+ },
+ {
+ stmts: []string{"SELECT 5"},
+ explicit: true,
+ },
+ {
+ stmts: []string{"SELECT 5", "SELECT 6, 7"},
+ explicit: true,
+ },
+ }
+
+ // Send test statements into both regular SQL connection and internal
+ // executor to test both code paths.
+ for _, tc := range testCases {
+ if tc.explicit {
+ testConn.Exec(t, "BEGIN")
+ }
+ {
+ for _, stmt := range tc.stmts {
+ testConn.Exec(t, stmt)
+ }
+ }
+ if tc.explicit {
+ testConn.Exec(t, "COMMIT")
+ }
+ }
+
+ ie := testServer.InternalExecutor().(*sql.InternalExecutor)
+
+ for _, tc := range testCases {
+ // Send statements one by one since internal executor doesn't support
+ // sending a batch of statements.
+ for _, stmt := range tc.stmts {
+ var txn *kv.Txn
+ if tc.explicit {
+ txn = kvDB.NewTxn(ctx, "")
+ }
+ _, err := ie.QueryRowEx(
+ ctx,
+ appName,
+ txn,
+ sessiondata.InternalExecutorOverride{
+ User: security.RootUserName(),
+ },
+ stmt,
+ )
+ require.NoError(t, err)
+ if tc.explicit {
+ require.NoError(t, txn.Commit(ctx))
+ }
+
+ require.NoError(t, err)
+ }
+ }
+
+ // Ensure we have intercepted all transactions, the expected size is
+ // calculated as:
+ // # stmt executed in regular executor
+ // + # stmt executed in internal executor
+ // + 1 (`SET application_name` executed previously in regular SQL Conn)
+ // - 3 (explicit txns executed in internal executor due to
+ // https://github.com/cockroachdb/cockroach/issues/73091)
+ expectedTxnIDCacheSize := len(testCases)*2 + 1 - 3
+ require.Equal(t, expectedTxnIDCacheSize, len(expectedTxnIDToUUIDMapping))
+
+ sqlServer := testServer.SQLServer().(*sql.Server)
+ contentionMetrics := sqlServer.ServerMetrics.ContentionSubsystemMetrics
+ txnIDCache := sqlServer.GetTxnIDCache()
+
+ txnIDCache.(*txnidcache.Cache).FlushActiveWritersForTest()
+ testutils.SucceedsSoon(t, func() error {
+ for txnID, expectedTxnFingerprintID := range expectedTxnIDToUUIDMapping {
+ actualTxnFingerprintID, ok := txnIDCache.Lookup(txnID)
+ if !ok {
+ return errors.Newf("expected to find txn(%s) with fingerprintID: "+
+ "%d, but it was not found. (%d number of messageBlock has been discarded "+
+ "and %d number of ResolvedTxnID has been discarded "+
+ "and %d / %d bytes of memory is being used)",
+ txnID, expectedTxnFingerprintID,
+ contentionMetrics.DiscardedResolvedTxnIDCount.Count(),
+ contentionMetrics.EvictedTxnIDCacheCount.Count(),
+ contentionMetrics.TxnIDCacheSize.Value(),
+ txnidcache.MaxSize.Get(&sqlServer.GetExecutorConfig().Settings.SV),
+ )
+ }
+ if expectedTxnFingerprintID != actualTxnFingerprintID {
+ return errors.Newf("expected to find txn(%s) with fingerprintID: %d, but the actual fingerprintID is: %d", txnID, expectedTxnFingerprintID, actualTxnFingerprintID)
+ }
+ }
+
+ return nil
+ })
+
+ // We now test the enforcement of the cache size limit.
+ previousCacheSize := contentionMetrics.TxnIDCacheSize.Value()
+
+ // Limit the size to 128 bytes.
+ testConn.Exec(t, "SET CLUSTER SETTING sql.contention.txn_id_cache.max_size = '128B'")
+
+ // Execute a statement to ensure that Cache has a chance to evict older
+ // entries.
+ testConn.Exec(t, "SELECT 1")
+
+ testutils.SucceedsSoon(t, func() error {
+ currentCacheSize := contentionMetrics.TxnIDCacheSize.Value()
+
+ if currentCacheSize < previousCacheSize {
+ return errors.Newf("expected currentCacheSize (%d) to be less than the"+
+ "previousCacheSize (%d), but it was not", currentCacheSize, previousCacheSize)
+ }
+ return nil
+ })
+}
diff --git a/pkg/sql/contention/txnidcache/txn_id_cache_write_buffer.go b/pkg/sql/contention/txnidcache/txn_id_cache_write_buffer.go
new file mode 100644
index 000000000000..87946348bb8d
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/txn_id_cache_write_buffer.go
@@ -0,0 +1,128 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import (
+ "sync"
+ "sync/atomic"
+
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+)
+
+const messageBlockSize = 1024
+
+type messageBlock struct {
+ data [messageBlockSize]ResolvedTxnID
+ forceImmediateFlushIntoCache bool
+}
+
+// ConcurrentWriteBuffer is a data structure that optimizes for concurrent
+// writes and also implements the Writer interface.
+//
+// Any write requests initially starts by holding a read lock (flushSyncLock)
+// and then reserves an index to the messageBlock (a fixed-length array). If the
+// reserved index is valid, ConcurrentWriteBuffer immediately writes to the
+// array at the reserved index. However, if the reserved index is not valid,
+// (that is, array index out of bound), there are two scenarios:
+// 1. If the reserved index == size of the array, then the caller of Record()
+// method is responsible for flushing the entire array into the channel. The
+// caller does so by upgrading the read-lock to a write-lock, therefore
+// blocks all future writers from writing to the shared array. After the
+// flush is performed, the write-lock is then downgraded to a read-lock.
+// 2. If the reserved index > size of the array, then the caller of Record()
+// is blocked until the array is flushed. This is achieved by waiting on the
+// conditional variable (flushDone) while holding onto the read-lock. After
+// the flush is completed, the writer is unblocked and allowed to retry.
+type ConcurrentWriteBuffer struct {
+ flushSyncLock syncutil.RWMutex
+ flushDone sync.Cond
+
+ // msgBlock is the temporary buffer that ConcurrentWriteBuffer uses to batch
+ // write requests before sending them into the channel.
+ msgBlock messageBlock
+
+ // atomicIdx is the index pointing into the fixed-length array within the
+ // msgBlock.This should only be accessed using atomic package.
+ atomicIdx int64
+
+ // sink is the flush target that ConcurrentWriteBuffer flushes to once
+ // msgBlock is full.
+ sink messageSink
+}
+
+var _ Writer = &ConcurrentWriteBuffer{}
+
+// Record records a mapping from txnID to its corresponding transaction
+// fingerprint ID. Record is safe to be used concurrently.
+func (c *ConcurrentWriteBuffer) Record(resolvedTxnID ResolvedTxnID) {
+ c.flushSyncLock.RLock()
+ defer c.flushSyncLock.RUnlock()
+ for {
+ reservedIdx := c.reserveMsgBlockIndex()
+ if reservedIdx < messageBlockSize {
+ c.msgBlock.data[reservedIdx] = resolvedTxnID
+ return
+ } else if reservedIdx == messageBlockSize {
+ c.flushMsgBlockToChannelRLocked()
+ } else {
+ c.flushDone.Wait()
+ }
+ }
+}
+
+// Close forces the ConcurrentWriteBuffer to be flushed into the channel and
+// delete itself from txnidcache.Cache's list of active write buffer. This
+// allows the ConcurrentWriteBuffer to be GC'd next time when garbage collector
+// runs. It implements the txnidcache.Writer interface.
+func (c *ConcurrentWriteBuffer) Close() {
+ c.Flush()
+ c.sink.disconnect(c)
+}
+
+// Flush flushes ConcurrentWriteBuffer into the channel. It implements the
+// txnidcache.Writer interface.
+func (c *ConcurrentWriteBuffer) Flush() {
+ c.flushSyncLock.Lock()
+ c.flushMsgBlockToChannelWLocked(false /* forceImmediateFlush */)
+ c.flushSyncLock.Unlock()
+}
+
+func (c *ConcurrentWriteBuffer) flushMsgBlockToChannelRLocked() {
+ // We upgrade the read-lock to a write-lock, then when we are done flushing,
+ // the lock is downgraded to a read-lock.
+ c.flushSyncLock.RUnlock()
+ defer c.flushSyncLock.RLock()
+ c.flushSyncLock.Lock()
+ defer c.flushSyncLock.Unlock()
+ c.flushMsgBlockToChannelWLocked(false /* forceImmediateFlush */)
+}
+
+func (c *ConcurrentWriteBuffer) flushMsgBlockToChannelWLocked(forceImmediateFlush bool) {
+ if forceImmediateFlush {
+ c.msgBlock.forceImmediateFlushIntoCache = true
+ defer func() {
+ c.msgBlock.forceImmediateFlushIntoCache = false
+ }()
+ }
+ c.sink.push(c.msgBlock)
+ c.flushDone.Broadcast()
+ atomic.StoreInt64(&c.atomicIdx, 0)
+}
+
+func (c *ConcurrentWriteBuffer) flushForTest() {
+ c.flushSyncLock.Lock()
+ c.flushMsgBlockToChannelWLocked(true /* forceImmediateFlush */)
+ c.flushSyncLock.Unlock()
+}
+
+func (c *ConcurrentWriteBuffer) reserveMsgBlockIndex() int64 {
+ return atomic.AddInt64(&c.atomicIdx, 1) - 1 // since array is 0-indexed.
+}
diff --git a/pkg/sql/contention/txnidcache/writer_pool.go b/pkg/sql/contention/txnidcache/writer_pool.go
new file mode 100644
index 000000000000..6d6a797f3c65
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/writer_pool.go
@@ -0,0 +1,48 @@
+// Copyright 2021 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+
+type writerPoolImpl struct {
+ mu struct {
+ syncutil.Mutex
+ activeWriters map[Writer]struct{}
+ }
+
+ New func() Writer
+}
+
+var _ writerPool = &writerPoolImpl{}
+
+func newWriterPoolImpl(new func() Writer) *writerPoolImpl {
+ w := &writerPoolImpl{
+ New: new,
+ }
+ w.mu.activeWriters = make(map[Writer]struct{})
+ return w
+}
+
+// GetWriter implements the writerPool interface.
+func (w *writerPoolImpl) GetWriter() Writer {
+ writeBuffer := w.New()
+ w.mu.Lock()
+ w.mu.activeWriters[writeBuffer] = struct{}{}
+ w.mu.Unlock()
+ return writeBuffer
+}
+
+// disconnect implements the writerPool interface.
+func (w *writerPoolImpl) disconnect(writer Writer) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ delete(w.mu.activeWriters, writer)
+}
diff --git a/pkg/sql/copy.go b/pkg/sql/copy.go
index 8dabac401c64..7d4afa41506b 100644
--- a/pkg/sql/copy.go
+++ b/pkg/sql/copy.go
@@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
@@ -218,7 +219,7 @@ type copyTxnOpt struct {
// resetExecutor should be called upon completing a batch from the copy
// machine when the copy machine handles its own transaction.
- resetExtraTxnState func(ctx context.Context) error
+ resetExtraTxnState func(ctx context.Context, txnID uuid.UUID) error
}
// run consumes all the copy-in data from the network connection and inserts it
@@ -630,7 +631,7 @@ func (p *planner) preparePlannerForCopy(
defer func() {
// Note: combine errors will return nil if both are nil and the
// non-nil error in the case that there's just one.
- err = errors.CombineErrors(err, txnOpt.resetExtraTxnState(ctx))
+ err = errors.CombineErrors(err, txnOpt.resetExtraTxnState(ctx, txn.ID()))
}()
}
if prevErr == nil {
diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go
index 214c61b10dd2..c47825738393 100644
--- a/pkg/sql/exec_util.go
+++ b/pkg/sql/exec_util.go
@@ -1316,6 +1316,14 @@ type ExecutorTestingKnobs struct {
// OnTxnRetry, if set, will be called if there is a transaction retry.
OnTxnRetry func(autoRetryReason error, evalCtx *tree.EvalContext)
+
+ // BeforeTxnStatsRecorded, if set, will be called before the statistics
+ // of a transaction is being recorded.
+ BeforeTxnStatsRecorded func(
+ sessionData *sessiondata.SessionData,
+ txnID uuid.UUID,
+ txnFingerprintID roachpb.TransactionFingerprintID,
+ )
}
// PGWireTestingKnobs contains knobs for the pgwire module.
diff --git a/pkg/sql/pgwire/server.go b/pkg/sql/pgwire/server.go
index 2b143c263c5f..937a8e512bc8 100644
--- a/pkg/sql/pgwire/server.go
+++ b/pkg/sql/pgwire/server.go
@@ -373,6 +373,7 @@ func (s *Server) Metrics() (res []interface{}) {
&s.SQLServer.InternalMetrics.EngineMetrics,
&s.SQLServer.InternalMetrics.GuardrailMetrics,
&s.SQLServer.ServerMetrics.StatsMetrics,
+ &s.SQLServer.ServerMetrics.ContentionSubsystemMetrics,
}
}
diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go
index 10915eacb61d..82634b2d8cf9 100644
--- a/pkg/ts/catalog/chart_catalog.go
+++ b/pkg/ts/catalog/chart_catalog.go
@@ -1860,6 +1860,23 @@ var charts = []sectionDescription{
},
},
},
+ {
+ Organization: [][]string{{SQLLayer, "Contention"}},
+ Charts: []chartDescription{
+ {
+ Title: "Current memory usage for TxnID Cache",
+ Metrics: []string{"sql.contention.txn_id_cache.size"},
+ },
+ {
+ Title: "Number of discarded resolved transaction IDs",
+ Metrics: []string{"sql.contention.txn_id_cache.discarded_count"},
+ },
+ {
+ Title: "Evicted Resolved Transaction IDs",
+ Metrics: []string{"sql.contention.txn_id_cache.evicted_count"},
+ },
+ },
+ },
{
Organization: [][]string{{SQLLayer, "Bulk"}},
Charts: []chartDescription{