From f2806625a4c0fefb5caa48a14c9976d392ac854f Mon Sep 17 00:00:00 2001 From: Azhng Date: Thu, 11 Nov 2021 15:07:58 -0500 Subject: [PATCH] sql: introduce Transaction ID Cache Previously, it was impossible to correlate an individual execution of a transaction (identified via transaction ID) to this historical execution statistics (identified via transaction fingerprint ID). This commit introduces Transaction ID Cache (TxnIDCache), a FIFO cache that stores the mapping from transaction ID to transaction fingerprint ID. This buffer records the mapping at the end of the transaction execution. The oldest entry in the buffer will be evicted through FIFO policy. The default size of this Transaction ID Cache is capped at 64 MB and it is configurable via the sql.contention.txn_id_cache.max_size cluster setting. Release note (sql change): Transaction ID to Transaction Fingerprint ID mapping is now stored in the new Transaction ID Cache, a FIFO unordered in-memory buffer. The size of the buffer is 64 MB by default and configurable via sql.contention.txn_id_cache.max_size cluster setting. Consequentially, two additioanl metrics are introduced: * sql.contention.txn_id_cache.size: tracks the current memory usage of transaction ID Cache * sql.contention.txn_id_cache.discarded_count: number of resolved transaction IDs that are dropped due to memory constraints. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/BUILD.bazel | 1 + pkg/roachpb/app_stats.go | 3 + pkg/sql/BUILD.bazel | 1 + pkg/sql/conn_executor.go | 49 +++- pkg/sql/conn_executor_exec.go | 19 +- pkg/sql/contention/txnidcache/BUILD.bazel | 60 +++++ .../contention/txnidcache/cluster_settings.go | 20 ++ .../txnidcache/concurrent_write_buffer.go | 128 +++++++++++ pkg/sql/contention/txnidcache/main_test.go | 29 +++ pkg/sql/contention/txnidcache/metrics.go | 43 ++++ pkg/sql/contention/txnidcache/provider.go | 87 +++++++ pkg/sql/contention/txnidcache/shard.go | 102 +++++++++ pkg/sql/contention/txnidcache/shard_test.go | 134 +++++++++++ .../contention/txnidcache/sharded_store.go | 58 +++++ pkg/sql/contention/txnidcache/store_test.go | 73 ++++++ pkg/sql/contention/txnidcache/strip.go | 88 +++++++ pkg/sql/contention/txnidcache/strip_test.go | 123 ++++++++++ pkg/sql/contention/txnidcache/txn_id_cache.go | 216 ++++++++++++++++++ .../txnidcache/txn_id_cache_test.go | 189 +++++++++++++++ pkg/sql/contention/txnidcache/writer_pool.go | 59 +++++ pkg/sql/copy.go | 5 +- pkg/sql/exec_util.go | 8 + pkg/sql/pgwire/server.go | 1 + pkg/ts/catalog/chart_catalog.go | 13 ++ 26 files changed, 1501 insertions(+), 10 deletions(-) create mode 100644 pkg/sql/contention/txnidcache/BUILD.bazel create mode 100644 pkg/sql/contention/txnidcache/cluster_settings.go create mode 100644 pkg/sql/contention/txnidcache/concurrent_write_buffer.go create mode 100644 pkg/sql/contention/txnidcache/main_test.go create mode 100644 pkg/sql/contention/txnidcache/metrics.go create mode 100644 pkg/sql/contention/txnidcache/provider.go create mode 100644 pkg/sql/contention/txnidcache/shard.go create mode 100644 pkg/sql/contention/txnidcache/shard_test.go create mode 100644 pkg/sql/contention/txnidcache/sharded_store.go create mode 100644 pkg/sql/contention/txnidcache/store_test.go create mode 100644 pkg/sql/contention/txnidcache/strip.go create mode 100644 pkg/sql/contention/txnidcache/strip_test.go create mode 100644 pkg/sql/contention/txnidcache/txn_id_cache.go create mode 100644 pkg/sql/contention/txnidcache/txn_id_cache_test.go create mode 100644 pkg/sql/contention/txnidcache/writer_pool.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 91e20abc851c..62e19c53780c 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -76,6 +76,7 @@ server.web_session.purge.max_deletions_per_cycle integer 10 the maximum number o server.web_session.purge.period duration 1h0m0s the time until old sessions are deleted server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid +sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed sql.cross_db_sequence_references.enabled boolean false if true, sequences referenced by tables from other databases are allowed diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index aca61f0835ac..2dd61ec7793d 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -81,6 +81,7 @@ server.web_session.purge.periodduration1h0m0sthe time until old sessions are deleted server.web_session.purge.ttlduration1h0m0sif nonzero, entries in system.web_sessions older than this duration are periodically purged server.web_session_timeoutduration168h0m0sthe duration that a newly created web session will be valid +sql.contention.txn_id_cache.max_sizebyte size64 MiBthe maximum byte size TxnID cache will use sql.cross_db_fks.enabledbooleanfalseif true, creating foreign key references across databases is allowed sql.cross_db_sequence_owners.enabledbooleanfalseif true, creating sequences owned by tables from other databases is allowed sql.cross_db_sequence_references.enabledbooleanfalseif true, sequences referenced by tables from other databases are allowed diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index fb184e62743f..19a099348167 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -231,6 +231,7 @@ ALL_TESTS = [ "//pkg/sql/colflow/colrpc:colrpc_test", "//pkg/sql/colflow:colflow_test", "//pkg/sql/colmem:colmem_test", + "//pkg/sql/contention/txnidcache:txnidcache_test", "//pkg/sql/contention:contention_test", "//pkg/sql/covering:covering_test", "//pkg/sql/distsql:distsql_test", diff --git a/pkg/roachpb/app_stats.go b/pkg/roachpb/app_stats.go index 2722c831ae44..e5c6f02187b1 100644 --- a/pkg/roachpb/app_stats.go +++ b/pkg/roachpb/app_stats.go @@ -60,6 +60,9 @@ func ConstructStatementFingerprintID( // individual statement fingerprint IDs that comprise the transaction. type TransactionFingerprintID uint64 +// InvalidTransactionFingerprintID denotes an invalid transaction fingerprint ID. +const InvalidTransactionFingerprintID = TransactionFingerprintID(0) + // Size returns the size of the TransactionFingerprintID. func (t TransactionFingerprintID) Size() int64 { return 8 diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index ffb340b64ca6..155442eeb97b 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -307,6 +307,7 @@ go_library( "//pkg/sql/colexec", "//pkg/sql/colflow", "//pkg/sql/contention", + "//pkg/sql/contention/txnidcache", "//pkg/sql/covering", "//pkg/sql/delegate", "//pkg/sql/distsql", diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index dcd3b526e636..d7eb396ddcba 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/idxusage" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -64,6 +65,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "golang.org/x/net/trace" @@ -280,6 +282,10 @@ type Server struct { // node as gateway node. indexUsageStats *idxusage.LocalIndexUsageStats + // txnIDCache stores the mapping from transaction ID to transaction + //fingerprint IDs for all recently executed. + txnIDCache txnidcache.Provider + // Metrics is used to account normal queries. Metrics Metrics @@ -320,6 +326,10 @@ type Metrics struct { type ServerMetrics struct { // StatsMetrics contains metrics for SQL statistics collection. StatsMetrics StatsMetrics + + // ContentionSubsystemMetrics contains metrics related to contention + // subsystem. + ContentionSubsystemMetrics txnidcache.Metrics } // NewServer creates a new Server. Start() needs to be called before the Server @@ -362,6 +372,9 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { ChannelSize: idxusage.DefaultChannelSize, Setting: cfg.Settings, }), + txnIDCache: txnidcache.NewTxnIDCache( + cfg.Settings, + &serverMetrics.ContentionSubsystemMetrics), } telemetryLoggingMetrics := &TelemetryLoggingMetrics{} @@ -452,6 +465,7 @@ func makeServerMetrics(cfg *ExecutorConfig) ServerMetrics { MetaSQLTxnStatsCollectionOverhead, 6*metricsSampleInterval, ), }, + ContentionSubsystemMetrics: txnidcache.NewMetrics(), } } @@ -463,6 +477,8 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) { // accumulated in the reporter when the telemetry server fails. // Usually it is telemetry's reporter's job to clear the reporting SQL Stats. s.reportedStats.Start(ctx, stopper) + + s.txnIDCache.Start(ctx, stopper) } // GetSQLStatsController returns the persistedsqlstats.Controller for current @@ -488,6 +504,11 @@ func (s *Server) GetReportedSQLStatsController() *sslocal.Controller { return s.reportedStatsController } +// GetTxnIDCache returns the txnidcache.Cache for the current sql.Server. +func (s *Server) GetTxnIDCache() txnidcache.Provider { + return s.txnIDCache +} + // GetScrubbedStmtStats returns the statement statistics by app, with the // queries scrubbed of their identifiers. Any statements which cannot be // scrubbed will be omitted from the returned map. @@ -813,6 +834,7 @@ func (s *Server) newConnExecutor( hasCreatedTemporarySchema: false, stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder, indexUsageStats: s.indexUsageStats, + txnIDCacheWriter: s.txnIDCache.GetWriter(), } ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount @@ -983,6 +1005,7 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{}) func (ex *connExecutor) close(ctx context.Context, closeType closeType) { ex.sessionEventf(ctx, "finishing connExecutor") + ex.txnIDCacheWriter.Close() txnEv := noEvent if _, noTxn := ex.machine.CurState().(stateNoTxn); !noTxn { @@ -1018,7 +1041,8 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) { ex.state.finishExternalTxn() } - if err := ex.resetExtraTxnState(ctx, txnEv); err != nil { + // Since we are closing connExecutor, no need to fetch txnID. + if err := ex.resetExtraTxnState(ctx, txnEv, uuid.UUID{}); err != nil { log.Warningf(ctx, "error while cleaning up connExecutor: %s", err) } @@ -1400,6 +1424,10 @@ type connExecutor struct { // indexUsageStats is used to track index usage stats. indexUsageStats *idxusage.LocalIndexUsageStats + + // txnIDCacheWriter is used to write txnidcache.ResolvedTxnID to the + // Transaction ID Cache. + txnIDCacheWriter txnidcache.Writer } // ctxHolder contains a connection's context and, while session tracing is @@ -1516,7 +1544,9 @@ func (ns *prepStmtNamespace) resetTo( // resetExtraTxnState resets the fields of ex.extraTxnState when a transaction // commits, rolls back or restarts. -func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) error { +func (ex *connExecutor) resetExtraTxnState( + ctx context.Context, ev txnEvent, txnID uuid.UUID, +) error { ex.extraTxnState.jobs = nil ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{} ex.extraTxnState.schemaChangerState = SchemaChangerState{ @@ -1542,7 +1572,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err delete(ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals, name) } ex.extraTxnState.savepoints.clear() - ex.onTxnFinish(ctx, ev) + ex.onTxnFinish(ctx, ev, txnID) case txnRestart: ex.onTxnRestart() ex.state.mu.Lock() @@ -2173,8 +2203,8 @@ func (ex *connExecutor) execCopyIn( } } else { txnOpt = copyTxnOpt{ - resetExtraTxnState: func(ctx context.Context) error { - return ex.resetExtraTxnState(ctx, noEvent) + resetExtraTxnState: func(ctx context.Context, txnID uuid.UUID) error { + return ex.resetExtraTxnState(ctx, noEvent, txnID) }, } } @@ -2598,6 +2628,13 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( txnIsOpen = true } + ex.state.mu.RLock() + var txnID uuid.UUID + if ex.state.mu.txn != nil { + txnID = ex.state.mu.txn.ID() + } + ex.state.mu.RUnlock() + ex.mu.Lock() err := ex.machine.ApplyWithPayload(withStatement(ex.Ctx(), ex.curStmtAST), ev, payload) ex.mu.Unlock() @@ -2718,7 +2755,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( fallthrough case txnRestart, txnRollback: - if err := ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent); err != nil { + if err := ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent, txnID); err != nil { return advanceInfo{}, err } default: diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 668c57d02760..69abdf347638 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain" @@ -51,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/lib/pq/oid" ) @@ -1909,7 +1911,7 @@ func (ex *connExecutor) recordTransactionStart() { } } -func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { +func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnID uuid.UUID) { if ex.extraTxnState.shouldExecuteOnTxnFinish { ex.extraTxnState.shouldExecuteOnTxnFinish = false txnStart := ex.extraTxnState.txnFinishClosure.txnStartTime @@ -1923,7 +1925,14 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { transactionFingerprintID, ) } - err := ex.recordTransaction(ctx, transactionFingerprintID, ev, implicit, txnStart) + if ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded != nil { + ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded( + ex.sessionData(), + txnID, + transactionFingerprintID, + ) + } + err := ex.recordTransaction(ctx, txnID, transactionFingerprintID, ev, implicit, txnStart) if err != nil { if log.V(1) { log.Warningf(ctx, "failed to record transaction stats: %s", err) @@ -1954,6 +1963,7 @@ func (ex *connExecutor) onTxnRestart() { func (ex *connExecutor) recordTransaction( ctx context.Context, + transactionID uuid.UUID, transactionFingerprintID roachpb.TransactionFingerprintID, ev txnEvent, implicit bool, @@ -1973,6 +1983,11 @@ func (ex *connExecutor) recordTransaction( ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1) ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds()) + ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{ + TxnID: transactionID, + TxnFingerprintID: transactionFingerprintID, + }) + txnServiceLat := ex.phaseTimes.GetTransactionServiceLatency() txnRetryLat := ex.phaseTimes.GetTransactionRetryLatency() commitLat := ex.phaseTimes.GetCommitLatency() diff --git a/pkg/sql/contention/txnidcache/BUILD.bazel b/pkg/sql/contention/txnidcache/BUILD.bazel new file mode 100644 index 000000000000..14d518a8252f --- /dev/null +++ b/pkg/sql/contention/txnidcache/BUILD.bazel @@ -0,0 +1,60 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "txnidcache", + srcs = [ + "cluster_settings.go", + "concurrent_write_buffer.go", + "metrics.go", + "provider.go", + "shard.go", + "sharded_store.go", + "strip.go", + "txn_id_cache.go", + "writer_pool.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb:with-mocks", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/util/encoding", + "//pkg/util/metric", + "//pkg/util/stop", + "//pkg/util/syncutil", + "//pkg/util/uuid", + ], +) + +go_test( + name = "txnidcache_test", + srcs = [ + "main_test.go", + "shard_test.go", + "store_test.go", + "strip_test.go", + "txn_id_cache_test.go", + ], + embed = [":txnidcache"], + deps = [ + "//pkg/kv", + "//pkg/roachpb:with-mocks", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql", + "//pkg/sql/sessiondata", + "//pkg/sql/tests", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/metric", + "//pkg/util/uuid", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/sql/contention/txnidcache/cluster_settings.go b/pkg/sql/contention/txnidcache/cluster_settings.go new file mode 100644 index 000000000000..63d3d04fb0a6 --- /dev/null +++ b/pkg/sql/contention/txnidcache/cluster_settings.go @@ -0,0 +1,20 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import "github.com/cockroachdb/cockroach/pkg/settings" + +// MaxSize limits the maximum byte size can be used by the TxnIDCache. +var MaxSize = settings.RegisterByteSizeSetting( + `sql.contention.txn_id_cache.max_size`, + "the maximum byte size TxnID cache will use", + 64*1024*1024, // 64 MB +).WithPublic() diff --git a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go new file mode 100644 index 000000000000..2596cc2ad79e --- /dev/null +++ b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go @@ -0,0 +1,128 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "sync" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +const messageBlockSize = 1024 + +type messageBlock [messageBlockSize]ResolvedTxnID + +// ConcurrentWriteBuffer is a data structure that optimizes for concurrent +// writes and also implements the Writer interface. +// +// Any write requests initially starts by holding a read lock (flushSyncLock) +// and then reserves an index to the messageBlock (a fixed-length array). If the +// reserved index is valid, ConcurrentWriteBuffer immediately writes to the +// array at the reserved index. However, if the reserved index is not valid, +// (that is, array index out of bound), there are two scenarios: +// 1. If the reserved index == size of the array, then the caller of Record() +// method is responsible for flushing the entire array into the channel. The +// caller does so by upgrading the read-lock to a write-lock, therefore +// blocks all future writers from writing to the shared array. After the +// flush is performed, the write-lock is then downgraded to a read-lock. +// 2. If the reserved index > size of the array, then the caller of Record() +// is blocked until the array is flushed. This is achieved by waiting on the +// conditional variable (flushDone) while holding onto the read-lock. After +// the flush is completed, the writer is unblocked and allowed to retry. +type ConcurrentWriteBuffer struct { + flushSyncLock syncutil.RWMutex + flushDone sync.Cond + + // msgBlock is the temporary buffer that ConcurrentWriteBuffer uses to batch + // write requests before sending them into the channel. + msgBlock messageBlock + + // atomicIdx is the index pointing into the fixed-length array within the + // msgBlock.This should only be accessed using atomic package. + atomicIdx int64 + + // sink is the flush target that ConcurrentWriteBuffer flushes to once + // msgBlock is full. + sink messageSink +} + +var _ Writer = &ConcurrentWriteBuffer{} + +// NewConcurrentWriteBuffer returns a new instance of ConcurrentWriteBuffer. +func NewConcurrentWriteBuffer(sink messageSink) *ConcurrentWriteBuffer { + writeBuffer := &ConcurrentWriteBuffer{ + sink: sink, + } + writeBuffer.flushDone.L = writeBuffer.flushSyncLock.RLocker() + return writeBuffer +} + +// Record records a mapping from txnID to its corresponding transaction +// fingerprint ID. Record is safe to be used concurrently. +func (c *ConcurrentWriteBuffer) Record(resolvedTxnID ResolvedTxnID) { + c.flushSyncLock.RLock() + defer c.flushSyncLock.RUnlock() + for { + reservedIdx := c.reserveMsgBlockIndex() + if reservedIdx < messageBlockSize { + c.msgBlock[reservedIdx] = resolvedTxnID + return + } else if reservedIdx == messageBlockSize { + c.flushMsgBlockToChannelRLocked() + } else { + c.flushDone.Wait() + } + } +} + +// Close forces the ConcurrentWriteBuffer to be flushed into the channel and +// delete itself from txnidcache.Cache's list of active write buffer. This +// allows the ConcurrentWriteBuffer to be GC'd next time when garbage collector +// runs. It implements the txnidcache.Writer interface. +func (c *ConcurrentWriteBuffer) Close() { + c.Flush() + c.sink.disconnect(c) +} + +// Flush flushes ConcurrentWriteBuffer into the channel. It implements the +// txnidcache.Writer interface. +func (c *ConcurrentWriteBuffer) Flush() { + c.flushSyncLock.Lock() + c.flushMsgBlockToChannelWLocked() + c.flushSyncLock.Unlock() +} + +func (c *ConcurrentWriteBuffer) flushMsgBlockToChannelRLocked() { + // We upgrade the read-lock to a write-lock, then when we are done flushing, + // the lock is downgraded to a read-lock. + c.flushSyncLock.RUnlock() + defer c.flushSyncLock.RLock() + c.flushSyncLock.Lock() + defer c.flushSyncLock.Unlock() + c.flushMsgBlockToChannelWLocked() +} + +func (c *ConcurrentWriteBuffer) flushMsgBlockToChannelWLocked() { + c.sink.push(c.msgBlock) + c.flushDone.Broadcast() + atomic.StoreInt64(&c.atomicIdx, 0) +} + +func (c *ConcurrentWriteBuffer) flushForTest() { + c.flushSyncLock.Lock() + c.flushMsgBlockToChannelWLocked() + c.flushSyncLock.Unlock() +} + +func (c *ConcurrentWriteBuffer) reserveMsgBlockIndex() int64 { + return atomic.AddInt64(&c.atomicIdx, 1) - 1 // since array is 0-indexed. +} diff --git a/pkg/sql/contention/txnidcache/main_test.go b/pkg/sql/contention/txnidcache/main_test.go new file mode 100644 index 000000000000..c5b305067a8f --- /dev/null +++ b/pkg/sql/contention/txnidcache/main_test.go @@ -0,0 +1,29 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/sql/contention/txnidcache/metrics.go b/pkg/sql/contention/txnidcache/metrics.go new file mode 100644 index 000000000000..fc7c62372697 --- /dev/null +++ b/pkg/sql/contention/txnidcache/metrics.go @@ -0,0 +1,43 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import "github.com/cockroachdb/cockroach/pkg/util/metric" + +// Metrics is a structs that include all metrics related to contention +// subsystem. +type Metrics struct { + DiscardedResolvedTxnIDCount *metric.Counter + EvictedTxnIDCacheCount *metric.Counter +} + +var _ metric.Struct = Metrics{} + +// MetricStruct implements the metric.Struct interface. +func (Metrics) MetricStruct() {} + +// NewMetrics returns a new instance of Metrics. +func NewMetrics() Metrics { + return Metrics{ + DiscardedResolvedTxnIDCount: metric.NewCounter(metric.Metadata{ + Name: "sql.contention.txn_id_cache.discarded_count", + Help: "Number of discarded resolved transaction IDs", + Measurement: "Discarded Resolved Transaction IDs", + Unit: metric.Unit_COUNT, + }), + EvictedTxnIDCacheCount: metric.NewCounter(metric.Metadata{ + Name: "sql.contention.txn_id_cache.evicted_count", + Help: "Number of evicted resolved transaction IDs", + Measurement: "Evicted Resolved Transaction IDs", + Unit: metric.Unit_COUNT, + }), + } +} diff --git a/pkg/sql/contention/txnidcache/provider.go b/pkg/sql/contention/txnidcache/provider.go new file mode 100644 index 000000000000..9bc64030060c --- /dev/null +++ b/pkg/sql/contention/txnidcache/provider.go @@ -0,0 +1,87 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +// Provider is the main interface for txnidcache. +type Provider interface { + // Start starts the txnidcache.Provider. + Start(ctx context.Context, stopper *stop.Stopper) + + reader + messageSink + writerPool +} + +// Writer is the interface that can be used to write to txnidcache. +type Writer interface { + // Record writes a pair of transactionID and transaction fingerprint ID + // into a temporary buffer. This buffer will eventually be flushed into + // the transaction ID cache asynchronously. + Record(resolvedTxnID ResolvedTxnID) + + // Flush starts the flushing process of writer's temporary buffer. + Flush() + + // Close closes the Writer and flushes any pending data. The Writer should + // not be used after its closed. + Close() +} + +type reader interface { + // Lookup returns the corresponding transaction fingerprint ID for a given txnID, + // if the given txnID has no entry in the Cache, the returned "found" boolean + // will be false. + Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID, found bool) + + // TODO(azhng): we eventually want to implement a batch-lookup API. +} + +type disconnector interface { + // disconnect allows a Writer to be disconnected from its attached target. + disconnect(Writer) +} + +type pusher interface { + // push allows a messageBlock to be pushed into the pusher. + push(messageBlock) +} + +type writerPool interface { + // GetWriter returns a Writer to the caller. After returned, the Writer + // is attached to the writerPool, and can be disconnected using the + // disconnector. + GetWriter() Writer + + disconnector +} + +type messageSink interface { + pusher + disconnector +} + +type storage interface { + reader + pusher +} + +// capacityLimiter is a simple interface that informs a storage object of its +// current size limit. This allows the capacity of the storage object to be +// changed on the fly via cluster settings without re-allocation. +type capacityLimiter func() int64 diff --git a/pkg/sql/contention/txnidcache/shard.go b/pkg/sql/contention/txnidcache/shard.go new file mode 100644 index 000000000000..c613dfaa5b49 --- /dev/null +++ b/pkg/sql/contention/txnidcache/shard.go @@ -0,0 +1,102 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +type shard struct { + syncutil.RWMutex + strips []*strip + head int64 + tail int64 + ringSize int64 + + evictedCount *metric.Counter +} + +var _ storage = &shard{} + +func newShard(capacity capacityLimiter, ringSize int64, evictedCount *metric.Counter) *shard { + shard := &shard{ + strips: make([]*strip, ringSize), + head: 0, + tail: 0, + ringSize: ringSize, + evictedCount: evictedCount, + } + + for i := int64(0); i < ringSize; i++ { + shard.strips[i] = newStrip(func() int64 { + return capacity() / ringSize + }) + } + + return shard +} + +func (s *shard) Lookup(txnID uuid.UUID) (roachpb.TransactionFingerprintID, bool) { + s.RLock() + defer s.RUnlock() + + for i := s.head; ; i = s.prevIdx(i) { + fingerprintID, found := s.strips[i].Lookup(txnID) + if found { + return fingerprintID, found + } + if i == s.tail { + break + } + } + return roachpb.TransactionFingerprintID(0), false +} + +func (s *shard) push(block messageBlock) { + s.Lock() + defer s.Unlock() + + blockOffset := 0 + more := false + for { + strip := s.strips[s.head] + blockOffset, more = strip.tryInsertBlock(block, blockOffset) + + if more { + s.rotateRing() + } else { + break + } + } +} + +func (s *shard) rotateRing() { + s.head = s.nextIdx(s.head) + if s.head == s.tail { + s.tail = s.nextIdx(s.tail) + s.strips[s.head].clear() + s.evictedCount.Inc(s.strips[s.head].capacity()) + } +} + +func (s *shard) nextIdx(idx int64) int64 { + return (idx + 1) % s.ringSize +} + +func (s *shard) prevIdx(idx int64) int64 { + if idx == 0 { + return s.ringSize - 1 + } + return idx - 1 +} diff --git a/pkg/sql/contention/txnidcache/shard_test.go b/pkg/sql/contention/txnidcache/shard_test.go new file mode 100644 index 000000000000..1ab39b096b98 --- /dev/null +++ b/pkg/sql/contention/txnidcache/shard_test.go @@ -0,0 +1,134 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +func TestShard(t *testing.T) { + t.Run("single-strip", func(t *testing.T) { + shard := newShard(func() int64 { + return 4 + }, /* capacity */ + 1, /* ringSize */ + metric.NewCounter(metric.Metadata{})) + + expected := make(map[uuid.UUID]roachpb.TransactionFingerprintID) + block1 := messageBlock{ + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(1), + }, + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(2), + }, + } + + expected = buildExpectedLookupMapFromMessageBlock(block1, expected) + shard.push(block1) + validateReader(t, shard, expected) + }) + + t.Run("multi-strips", func(t *testing.T) { + shard := newShard(func() int64 { + return 4 + }, /* capacity */ + 2, /* ringSize */ + metric.NewCounter(metric.Metadata{})) + + // Looking up an empty shard shouldn't cause any problem. + _, found := shard.Lookup(uuid.FastMakeV4()) + require.False(t, found) + + expected := make(map[uuid.UUID]roachpb.TransactionFingerprintID) + block1 := messageBlock{ + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(1), + }, + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(2), + }, + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(3), + }, + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(4), + }, + } + expected = buildExpectedLookupMapFromMessageBlock(block1, expected) + + t.Run("normal_insert", func(t *testing.T) { + shard.push(block1) + validateReader(t, shard, expected) + }) + + // We are inserting more than the capacity of the shard. This means + // the first two ResolvedTxnIDs will be evicted from their corresponding strip. + block2 := messageBlock{ + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(5), + }, + } + + t.Run("overflow_insert1", func(t *testing.T) { + expected = buildExpectedLookupMapFromMessageBlock(block2, expected) + delete(expected, block1[0].TxnID) + delete(expected, block1[1].TxnID) + + shard.push(block2) + validateReader(t, shard, expected) + ensureNotPresentInReader(t, shard, block1[0].TxnID) + ensureNotPresentInReader(t, shard, block1[1].TxnID) + }) + + // This would result in another eviction of the strip, and this block will + // end up being stored across two strips. + block3 := messageBlock{ + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(6), + }, + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(7), + }, + } + + t.Run("overflow_insert2", func(t *testing.T) { + expected = buildExpectedLookupMapFromMessageBlock(block3, expected) + delete(expected, block1[2].TxnID) + delete(expected, block1[3].TxnID) + + shard.push(block3) + validateReader(t, shard, expected) + ensureNotPresentInReader(t, shard, block1[2].TxnID) + ensureNotPresentInReader(t, shard, block1[3].TxnID) + }) + }) +} + +func ensureNotPresentInReader(t *testing.T, reader reader, txnID uuid.UUID) { + _, found := reader.Lookup(txnID) + require.False(t, found, + "expected %s to be not found, but it was found", txnID) +} diff --git a/pkg/sql/contention/txnidcache/sharded_store.go b/pkg/sql/contention/txnidcache/sharded_store.go new file mode 100644 index 000000000000..0fbadb6c8d11 --- /dev/null +++ b/pkg/sql/contention/txnidcache/sharded_store.go @@ -0,0 +1,58 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +const shardCount = 256 + +type shardedStore [shardCount]storage + +var _ storage = shardedStore{} + +func newShardedCache(fn func() storage) shardedStore { + s := shardedStore{} + for shardIdx := 0; shardIdx < shardCount; shardIdx++ { + s[shardIdx] = fn() + } + return s +} + +// Record implements the storage interface. +func (s shardedStore) push(block messageBlock) { + var shardedBlock [shardCount]messageBlock + var shardedIndex [shardCount]int + + // We do a full pass of the block to group each ResolvedTxnID in the block + // to its corresponding shard. + for blockIdx := 0; blockIdx < messageBlockSize && block[blockIdx].valid(); blockIdx++ { + shardIdx := hashTxnID(block[blockIdx].TxnID) + shardedBlock[shardIdx][shardedIndex[shardIdx]] = block[blockIdx] + shardedIndex[shardIdx]++ + } + + // In the second pass we push each block shard into their own underlying + // storage. + for shardIdx := 0; shardIdx < shardCount; shardIdx++ { + s[shardIdx].push(shardedBlock[shardIdx]) + } +} + +// Lookup implements the reader interface. +func (s shardedStore) Lookup( + txnID uuid.UUID, +) (result roachpb.TransactionFingerprintID, found bool) { + shardIdx := hashTxnID(txnID) + return s[shardIdx].Lookup(txnID) +} diff --git a/pkg/sql/contention/txnidcache/store_test.go b/pkg/sql/contention/txnidcache/store_test.go new file mode 100644 index 000000000000..d1c9963bb626 --- /dev/null +++ b/pkg/sql/contention/txnidcache/store_test.go @@ -0,0 +1,73 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +func TestShardedStore(t *testing.T) { + store := newShardedCache(func() storage { + return newShard(func() int64 { + return int64(1024 * 1024 * 1024) + }, defaultRingSize, metric.NewCounter(metric.Metadata{})) + }) + + inputs, expectedMaps := generateMultipleBlocks() + for i := range inputs { + t.Run(fmt.Sprintf("block=%d", i), func(t *testing.T) { + store.push(inputs[i]) + validateReader(t, store, expectedMaps[i]) + }) + } +} + +func generateMultipleBlocks() ( + blocks []messageBlock, + expectedMaps []map[uuid.UUID]roachpb.TransactionFingerprintID, +) { + // Generate a random block count from the half open interval [1, 21). + testBlockCount := rand.Intn(20) + 1 + blocks = make([]messageBlock, 0, testBlockCount) + expectedMaps = make([]map[uuid.UUID]roachpb.TransactionFingerprintID, 0, testBlockCount) + + for i := 0; i < testBlockCount; i++ { + block, expected := generateRandomData() + blocks = append(blocks, block) + expectedMaps = append(expectedMaps, expected) + } + + return blocks, expectedMaps +} + +func generateRandomData() ( + block messageBlock, + expected map[uuid.UUID]roachpb.TransactionFingerprintID, +) { + expected = make(map[uuid.UUID]roachpb.TransactionFingerprintID) + for i := int64(0); i < messageBlockSize; i++ { + txnID := uuid.FastMakeV4() + txnFingerprintID := roachpb.TransactionFingerprintID(rand.Int63()) + expected[txnID] = txnFingerprintID + block[i] = ResolvedTxnID{ + TxnID: txnID, + TxnFingerprintID: txnFingerprintID, + } + } + + return block, expected +} diff --git a/pkg/sql/contention/txnidcache/strip.go b/pkg/sql/contention/txnidcache/strip.go new file mode 100644 index 000000000000..6bbf8b11e67f --- /dev/null +++ b/pkg/sql/contention/txnidcache/strip.go @@ -0,0 +1,88 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +type strip struct { + syncutil.RWMutex + data map[uuid.UUID]roachpb.TransactionFingerprintID + size int64 + capacity capacityLimiter +} + +var _ reader = &strip{} + +func newStrip(capacity capacityLimiter) *strip { + c := &strip{ + data: make(map[uuid.UUID]roachpb.TransactionFingerprintID), + capacity: capacity, + size: 0, + } + return c +} + +// tryInsertBlock takes two arguments: +// * block: the messageBlock that will be inserted into the strip. +// * blockStartingOffset: the offset into the block the strip will start reading +// at. +// The strip will starting reading the block from blockStartingOffset until +// either: +// * the strip is full: in this case, if the block is not fully consumed, +// tryInsertBlock returns the next unread blockIdx into the messageBlock +// and the boolean variable indicating that the block is not fully consumed. +// * the block is fully consumed: this can happen in two scenarios: +// 1. the messageBlock is fully populated, and we consumed the entirety +// of the block. +// 2. the messageBlock is partially populated, and we have observed an invalid +// ResolvedTxnID. +// The returning blockIdx is insufficient for the caller to decide of the +// block has been fully consumed, hence the caller relies on the second +// returned boolean variable. +func (c *strip) tryInsertBlock( + block messageBlock, blockStartingOffset int, +) (endingOffset int, more bool) { + c.Lock() + defer c.Unlock() + + blockIdx := blockStartingOffset + capn := c.capacity() + for ; blockIdx < messageBlockSize && block[blockIdx].valid() && c.size < capn; blockIdx++ { + c.data[block[blockIdx].TxnID] = block[blockIdx].TxnFingerprintID + c.size++ + } + return blockIdx, blockIdx < messageBlockSize && block[blockIdx].valid() +} + +func (c *strip) Lookup(txnID uuid.UUID) (roachpb.TransactionFingerprintID, bool) { + c.RLock() + defer c.RUnlock() + txnFingerprintID, found := c.data[txnID] + return txnFingerprintID, found +} + +func (c *strip) clear() { + c.Lock() + defer c.Unlock() + + // Instead of reallocating a new map, range-loop with delete can trigger + // golang compiler's optimization to use `memclr`. + // See: https://github.com/golang/go/issues/20138. + for key := range c.data { + delete(c.data, key) + } + + c.size = 0 +} diff --git a/pkg/sql/contention/txnidcache/strip_test.go b/pkg/sql/contention/txnidcache/strip_test.go new file mode 100644 index 000000000000..2a5134efe177 --- /dev/null +++ b/pkg/sql/contention/txnidcache/strip_test.go @@ -0,0 +1,123 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +func TestStrip(t *testing.T) { + block := messageBlock{ + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(1), + }, + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(2), + }, + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(3), + }, + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(4), + }, + } + + t.Run("partial_block_fully_ingested", func(t *testing.T) { + // Create a strip of size 4, and insert 5 entries into the strip. This + // should result in the last entry being discarded. + strip := newStrip(func() int64 { + return 4 + } /* capacity */) + + expected := make(map[uuid.UUID]roachpb.TransactionFingerprintID) + expected = buildExpectedLookupMapFromMessageBlock(block, expected) + endingOffset, more := strip.tryInsertBlock(block, 0 /* blockStartingOffset */) + validateReader(t, strip, expected) + require.Equal(t, 4 /* expected */, endingOffset) + require.Equal(t, false /* expected */, more) + + extraBlock := messageBlock{ + ResolvedTxnID{ + TxnID: uuid.FastMakeV4(), + TxnFingerprintID: roachpb.TransactionFingerprintID(5), + }, + } + endingOffset, more = strip.tryInsertBlock(extraBlock, 0 /* blockStartingOffset */) + validateReader(t, strip, expected) + require.Equal(t, 0 /* expected */, endingOffset, + "expected no entry being inserted into strip, but %d entries "+ + "were inserted", endingOffset) + require.Equal(t, true /* expected */, more) + + strip.clear() + require.True(t, len(strip.data) == 0) + }) + + t.Run("partial_block_partially_ingested", func(t *testing.T) { + strip := newStrip(func() int64 { return 2 } /* capacity */) + expected := make(map[uuid.UUID]roachpb.TransactionFingerprintID) + expected = buildExpectedLookupMapFromMessageBlock(block, expected) + // Removing the first entry since we are inserting a partial block. + delete(expected, block[0].TxnID) + // Removing the last entry since it won't fit into the strip. + delete(expected, block[3].TxnID) + + // Inserting a partial block into the strip. + endingOffset, more := strip.tryInsertBlock(block, 1 /* blockStartingOffset */) + validateReader(t, strip, expected) + require.Equal(t, 3 /* expected */, endingOffset, + "expected to have endingOffset 3, but got %d", endingOffset) + require.Equal(t, true /* expected */, more) + }) + + t.Run("full_block_fully_ingested", func(t *testing.T) { + strip := newStrip(func() int64 { return messageBlockSize } /* capacity */) + randomData, expected := generateRandomData() + endingOffset, more := + strip.tryInsertBlock(randomData, 0 /* blockStartingOffset */) + require.False(t, more, "expect block to be fully ingested, but it was not") + require.Equal(t, messageBlockSize, endingOffset, + "expect block to be fully ingested, but it was not") + validateReader(t, strip, expected) + }) +} + +func validateReader( + t *testing.T, s reader, expectedMap map[uuid.UUID]roachpb.TransactionFingerprintID, +) { + for k, expected := range expectedMap { + actual, found := s.Lookup(k) + require.True(t, found, + "expected to find %s, but didn't", k.String()) + require.Equal(t, expected, actual, + "expected to find %d for %s, but found %d", expected, k, actual) + } +} + +func buildExpectedLookupMapFromMessageBlock( + block messageBlock, existingExpectedLookupMap map[uuid.UUID]roachpb.TransactionFingerprintID, +) map[uuid.UUID]roachpb.TransactionFingerprintID { + for i := range block { + if !block[i].valid() { + break + } + existingExpectedLookupMap[block[i].TxnID] = block[i].TxnFingerprintID + } + return existingExpectedLookupMap +} diff --git a/pkg/sql/contention/txnidcache/txn_id_cache.go b/pkg/sql/contention/txnidcache/txn_id_cache.go new file mode 100644 index 000000000000..4945b29c6530 --- /dev/null +++ b/pkg/sql/contention/txnidcache/txn_id_cache.go @@ -0,0 +1,216 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +const ( + defaultRingSize = 4 + channelSize = 128 +) + +// Cache stores the mapping from the Transaction IDs (UUID) of recently +// executed transactions to their corresponding Transaction Fingerprint ID (uint64). +// The size of Cache is controlled via sql.contention.txn_id_cache.max_size +// cluster setting, and it follows FIFO eviction policy once the cache size +// reaches the limit defined by the cluster setting. +// +// Cache's overall architecture is as follows: +// +------------------------------------+ +// | connExecutor ---------* | +// | +-------------+ | writes to | +// | | Writer |<-* | +// | +-----+-------+ | +// +------------|-----------------------+ +// | +// when full, Writer flushes to a channel. +// | +// v +// channel +// ^ +// | +// Cache runs a goroutine that polls the channel --* +// | and sends the message to its | +// | corresponding shard | +// | | +// +----------------------------------+ | +// | Cache: | | +// | The cache contain a | <---------------------* +// | fixed number of shards. | +// | Each TxnID is hashed | +// | before writes to its | +// | corresponding shard. | +// | | +// | *----> shard1 | +// | | | +// | *----> shard2 | +// | | | +// | | +-------------------------------------------------------------* +// | *----> | shard3: Each shard contain a ring buffer of strips, | +// | | | the size of the ring buffer is fixed. All writes | +// | | | go into the strip pointed by the "head" pointer. | +// | | | Once a strip is full, "head" pointer is | +// | | | incremented to point to the next strip. If the | +// | | | ring buffer is full, the contents in the strip | +// | | | pointed by the "tail" is discarded. This implements | +// | | | a coarse-grain FIFO eviction policy. | +// | | | | +// | | | tail head | +// | | | V V | +// | | | +--------+--------+--------+----------------------------------+ +// | | | | strip1 | strip2 | strip3 | strip4: | +// | | | +--------+--------+--------+ A strip is a fixed-size map. | +// | | | | Once it's full, strip will | +// | | | | refuse to accept any writes. | +// | | | | It's shard's responsibility | +// | | | | handle it (by incrementing | +// | | | | the head pointer and possibly | +// | | | | evict the contents in the | +// | | | | strip. | +// | | | +----------------------------------+ +// | | | | +// | | | | +// | | +-------------------------------------------------------------* +// | | | +// | *----> shard4 | +// | | | +// | *----> shard5 | +// | | | +// | ....... | +// | | | +// | *----> shard256 | +// +----------------------------------+ +type Cache struct { + st *cluster.Settings + + msgChan chan messageBlock + + store storage + pool writerPool + + metrics *Metrics +} + +var ( + entrySize = int64(uuid.UUID{}.Size()) + + roachpb.TransactionFingerprintID(0).Size() +) + +// ResolvedTxnID represents a TxnID that is resolved to its corresponding +// TxnFingerprintID. +type ResolvedTxnID struct { + TxnID uuid.UUID + TxnFingerprintID roachpb.TransactionFingerprintID +} + +func (r *ResolvedTxnID) valid() bool { + return r.TxnFingerprintID != roachpb.InvalidTransactionFingerprintID +} + +var ( + _ Provider = &Cache{} +) + +// NewTxnIDCache creates a new instance of Cache. +func NewTxnIDCache(st *cluster.Settings, metrics *Metrics) *Cache { + t := &Cache{ + st: st, + metrics: metrics, + msgChan: make(chan messageBlock, channelSize), + } + + t.pool = newWriterPoolImpl(func() Writer { + return NewConcurrentWriteBuffer(t) + }) + + t.store = newShardedCache(func() storage { + return newShard(func() int64 { + return MaxSize.Get(&st.SV) / entrySize / shardCount + }, defaultRingSize, metrics.EvictedTxnIDCacheCount) + }) + + return t +} + +// Start implements the Provider interface. +func (t *Cache) Start(ctx context.Context, stopper *stop.Stopper) { + _ = stopper.RunAsyncTask(ctx, "txn-id-cache-ingest", func(ctx context.Context) { + for { + select { + case msgBlock := <-t.msgChan: + t.store.push(msgBlock) + case <-stopper.ShouldQuiesce(): + return + } + } + }) +} + +// Lookup implements the reader interface. +func (t *Cache) Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID, found bool) { + return t.store.Lookup(txnID) +} + +// GetWriter implements the writerPool interface. +func (t *Cache) GetWriter() Writer { + return t.pool.GetWriter() +} + +// push implements the messageSink interface. +func (t *Cache) push(msg messageBlock) { + select { + case t.msgChan <- msg: + // noop. + default: + t.metrics.DiscardedResolvedTxnIDCount.Inc(messageBlockSize) + } +} + +// disconnect implements the messageSink interface. +func (t *Cache) disconnect(w Writer) { + t.pool.disconnect(w) +} + +// FlushActiveWritersForTest is only used in test to flush all writers so the +// content of the Cache can be inspected. +func (t *Cache) FlushActiveWritersForTest() { + poolImpl := t.pool.(*writerPoolImpl) + poolImpl.mu.Lock() + defer poolImpl.mu.Unlock() + for txnIDCacheWriteBuffer := range poolImpl.mu.activeWriters { + txnIDCacheWriteBuffer.(*ConcurrentWriteBuffer).flushForTest() + } +} + +func (t *Cache) sizeLimit() int64 { + return MaxSize.Get(&t.st.SV) +} + +func hashTxnID(txnID uuid.UUID) int { + b := txnID.GetBytes() + b, val1, err := encoding.DecodeUint64Descending(b) + if err != nil { + panic(err) + } + _, val2, err := encoding.DecodeUint64Descending(b) + if err != nil { + panic(err) + } + return int((val1 ^ val2) % shardCount) +} diff --git a/pkg/sql/contention/txnidcache/txn_id_cache_test.go b/pkg/sql/contention/txnidcache/txn_id_cache_test.go new file mode 100644 index 000000000000..af1c6c05ffd6 --- /dev/null +++ b/pkg/sql/contention/txnidcache/txn_id_cache_test.go @@ -0,0 +1,189 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache_test + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestTransactionIDCache tests the correctness of the txnidcache.Cache. +func TestTransactionIDCache(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + + appName := "txnIDCacheTest" + expectedTxnIDToUUIDMapping := make(map[uuid.UUID]roachpb.TransactionFingerprintID) + + params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ + BeforeTxnStatsRecorded: func( + sessionData *sessiondata.SessionData, + txnID uuid.UUID, + txnFingerprintID roachpb.TransactionFingerprintID, + ) { + if strings.Contains(sessionData.ApplicationName, appName) { + expectedTxnIDToUUIDMapping[txnID] = txnFingerprintID + } + }, + } + + testServer, sqlConn, kvDB := serverutils.StartServer(t, params) + defer func() { + require.NoError(t, sqlConn.Close()) + testServer.Stopper().Stop(ctx) + }() + + testConn := sqlutils.MakeSQLRunner(sqlConn) + + // Set the cache size limit to a very generous amount to prevent premature + // eviction. + testConn.Exec(t, "SET CLUSTER SETTING sql.contention.txn_id_cache.max_size = '1GB'") + + testConn.Exec(t, "CREATE DATABASE txnIDTest") + testConn.Exec(t, "USE txnIDTest") + testConn.Exec(t, "CREATE TABLE t AS SELECT generate_series(1, 10)") + testConn.Exec(t, "SET application_name = $1", appName) + + testCases := []struct { + stmts []string + explicit bool + }{ + // Implicit transactions that will have same statement fingerprint. + { + stmts: []string{"SELECT 1"}, + explicit: false, + }, + { + stmts: []string{"SELECT 2"}, + explicit: false, + }, + + // Implicit transaction that have different statement fingerprints. + { + stmts: []string{"SELECT 1, 1"}, + explicit: false, + }, + { + stmts: []string{"SELECT 1, 1, 2"}, + explicit: false, + }, + + // Explicit Transactions. + { + stmts: []string{"SELECT 1"}, + explicit: true, + }, + { + stmts: []string{"SELECT 5"}, + explicit: true, + }, + { + stmts: []string{"SELECT 5", "SELECT 6, 7"}, + explicit: true, + }, + } + + // Send test statements into both regular SQL connection and internal + // executor to test both code paths. + for _, tc := range testCases { + if tc.explicit { + testConn.Exec(t, "BEGIN") + } + { + for _, stmt := range tc.stmts { + testConn.Exec(t, stmt) + } + } + if tc.explicit { + testConn.Exec(t, "COMMIT") + } + } + + ie := testServer.InternalExecutor().(*sql.InternalExecutor) + + for _, tc := range testCases { + // Send statements one by one since internal executor doesn't support + // sending a batch of statements. + for _, stmt := range tc.stmts { + var txn *kv.Txn + if tc.explicit { + txn = kvDB.NewTxn(ctx, "") + } + _, err := ie.QueryRowEx( + ctx, + appName, + txn, + sessiondata.InternalExecutorOverride{ + User: security.RootUserName(), + }, + stmt, + ) + require.NoError(t, err) + if tc.explicit { + require.NoError(t, txn.Commit(ctx)) + } + + require.NoError(t, err) + } + } + + // Ensure we have intercepted all transactions, the expected size is + // calculated as: + // # stmt executed in regular executor + // + # stmt executed in internal executor + // + 1 (`SET application_name` executed previously in regular SQL Conn) + // - 3 (explicit txns executed in internal executor due to + // https://github.com/cockroachdb/cockroach/issues/73091) + expectedTxnIDCacheSize := len(testCases)*2 + 1 - 3 + require.Equal(t, expectedTxnIDCacheSize, len(expectedTxnIDToUUIDMapping)) + + sqlServer := testServer.SQLServer().(*sql.Server) + txnIDCache := sqlServer.GetTxnIDCache() + + txnIDCache.(*txnidcache.Cache).FlushActiveWritersForTest() + testutils.SucceedsWithin(t, func() error { + for txnID, expectedTxnFingerprintID := range expectedTxnIDToUUIDMapping { + actualTxnFingerprintID, ok := txnIDCache.Lookup(txnID) + if !ok { + return errors.Newf("expected to find txn(%s) with fingerprintID: "+ + "%d, but it was not found.", + txnID, expectedTxnFingerprintID, + ) + } + if expectedTxnFingerprintID != actualTxnFingerprintID { + return errors.Newf("expected to find txn(%s) with fingerprintID: %d, but the actual fingerprintID is: %d", txnID, expectedTxnFingerprintID, actualTxnFingerprintID) + } + } + + return nil + }, 3*time.Second) +} diff --git a/pkg/sql/contention/txnidcache/writer_pool.go b/pkg/sql/contention/txnidcache/writer_pool.go new file mode 100644 index 000000000000..606d478b68ea --- /dev/null +++ b/pkg/sql/contention/txnidcache/writer_pool.go @@ -0,0 +1,59 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import "github.com/cockroachdb/cockroach/pkg/util/syncutil" + +type writerPoolImpl struct { + mu struct { + syncutil.Mutex + activeWriters map[Writer]struct{} + } + + new func() Writer +} + +var _ writerPool = &writerPoolImpl{} + +func newWriterPoolImpl(new func() Writer) *writerPoolImpl { + w := &writerPoolImpl{ + new: new, + } + w.mu.activeWriters = make(map[Writer]struct{}) + return w +} + +// GetWriter implements the writerPool interface. +// TODO(azhng): current writerPollImpl is very primitive, it returns a +// new instance of Writer each time GetWriter() is called. This means that +// each connExecutor will have its own copy of Writer. The underlying +// implementation of Writer is ConcurrentWriteBuffer, which is capable of +// efficiently handle concurrent writes. We can improve Writer allocation +// by making multiple connExecutors share a single Writer. +// One possible idea is to pre-allocate a pool of Writers and store them +// in a min-heap sorted by their ref-count. Each time GetWriter() is called, +// we find a Writer from the min-heap with least ref-count and give it back to +// the connExecutor, and increment its ref-count. Once the connExecutor closes, +// the ref-count is decremented. +func (w *writerPoolImpl) GetWriter() Writer { + writeBuffer := w.new() + w.mu.Lock() + w.mu.activeWriters[writeBuffer] = struct{}{} + w.mu.Unlock() + return writeBuffer +} + +// disconnect implements the writerPool interface. +func (w *writerPoolImpl) disconnect(writer Writer) { + w.mu.Lock() + defer w.mu.Unlock() + delete(w.mu.activeWriters, writer) +} diff --git a/pkg/sql/copy.go b/pkg/sql/copy.go index 8dabac401c64..7d4afa41506b 100644 --- a/pkg/sql/copy.go +++ b/pkg/sql/copy.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -218,7 +219,7 @@ type copyTxnOpt struct { // resetExecutor should be called upon completing a batch from the copy // machine when the copy machine handles its own transaction. - resetExtraTxnState func(ctx context.Context) error + resetExtraTxnState func(ctx context.Context, txnID uuid.UUID) error } // run consumes all the copy-in data from the network connection and inserts it @@ -630,7 +631,7 @@ func (p *planner) preparePlannerForCopy( defer func() { // Note: combine errors will return nil if both are nil and the // non-nil error in the case that there's just one. - err = errors.CombineErrors(err, txnOpt.resetExtraTxnState(ctx)) + err = errors.CombineErrors(err, txnOpt.resetExtraTxnState(ctx, txn.ID())) }() } if prevErr == nil { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 214c61b10dd2..c47825738393 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1316,6 +1316,14 @@ type ExecutorTestingKnobs struct { // OnTxnRetry, if set, will be called if there is a transaction retry. OnTxnRetry func(autoRetryReason error, evalCtx *tree.EvalContext) + + // BeforeTxnStatsRecorded, if set, will be called before the statistics + // of a transaction is being recorded. + BeforeTxnStatsRecorded func( + sessionData *sessiondata.SessionData, + txnID uuid.UUID, + txnFingerprintID roachpb.TransactionFingerprintID, + ) } // PGWireTestingKnobs contains knobs for the pgwire module. diff --git a/pkg/sql/pgwire/server.go b/pkg/sql/pgwire/server.go index 2b143c263c5f..937a8e512bc8 100644 --- a/pkg/sql/pgwire/server.go +++ b/pkg/sql/pgwire/server.go @@ -373,6 +373,7 @@ func (s *Server) Metrics() (res []interface{}) { &s.SQLServer.InternalMetrics.EngineMetrics, &s.SQLServer.InternalMetrics.GuardrailMetrics, &s.SQLServer.ServerMetrics.StatsMetrics, + &s.SQLServer.ServerMetrics.ContentionSubsystemMetrics, } } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 10915eacb61d..96c6395ab025 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1860,6 +1860,19 @@ var charts = []sectionDescription{ }, }, }, + { + Organization: [][]string{{SQLLayer, "Contention"}}, + Charts: []chartDescription{ + { + Title: "Number of discarded resolved transaction IDs", + Metrics: []string{"sql.contention.txn_id_cache.discarded_count"}, + }, + { + Title: "Evicted Resolved Transaction IDs", + Metrics: []string{"sql.contention.txn_id_cache.evicted_count"}, + }, + }, + }, { Organization: [][]string{{SQLLayer, "Bulk"}}, Charts: []chartDescription{