Skip to content

Commit

Permalink
sql: introduce Transaction ID Cache
Browse files Browse the repository at this point in the history
Previously, it was impossible to correlate an individual execution
of a transaction (identified via transaction ID) to this historical
execution statistics (identified via transaction fingerprint ID).

This commit introduces Transaction ID Cache (TxnIDCache), a FIFO
cache that stores the mapping from transaction ID to transaction
fingerprint ID. This buffer records the mapping at the
end of the transaction execution. The oldest entry in the buffer
will be evicted through FIFO policy. The default size of this
Transaction ID Cache is capped at 64 MB and it is configurable via
the sql.contention.txn_id_cache.max_size cluster setting.

Release note (sql change): Transaction ID to Transaction Fingerprint
ID mapping is now stored in the new Transaction ID Cache, a FIFO
unordered in-memory buffer. The size of the buffer is 64 MB by default
and configurable via sql.contention.txn_id_cache.max_size cluster
setting. Consequentially, two additioanl metrics are introduced:
* sql.contention.txn_id_cache.size: tracks the current memory usage
  of transaction ID Cache
* sql.contention.txn_id_cache.discarded_count: number of resolved
  transaction IDs that are dropped due to memory constraints.
  • Loading branch information
Azhng committed Dec 23, 2021
1 parent 9e7cf26 commit f280662
Show file tree
Hide file tree
Showing 26 changed files with 1,501 additions and 10 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ server.web_session.purge.max_deletions_per_cycle integer 10 the maximum number o
server.web_session.purge.period duration 1h0m0s the time until old sessions are deleted
server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged
server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid
sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use
sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed
sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed
sql.cross_db_sequence_references.enabled boolean false if true, sequences referenced by tables from other databases are allowed
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<tr><td><code>server.web_session.purge.period</code></td><td>duration</td><td><code>1h0m0s</code></td><td>the time until old sessions are deleted</td></tr>
<tr><td><code>server.web_session.purge.ttl</code></td><td>duration</td><td><code>1h0m0s</code></td><td>if nonzero, entries in system.web_sessions older than this duration are periodically purged</td></tr>
<tr><td><code>server.web_session_timeout</code></td><td>duration</td><td><code>168h0m0s</code></td><td>the duration that a newly created web session will be valid</td></tr>
<tr><td><code>sql.contention.txn_id_cache.max_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the maximum byte size TxnID cache will use</td></tr>
<tr><td><code>sql.cross_db_fks.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating foreign key references across databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_owners.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating sequences owned by tables from other databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_references.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, sequences referenced by tables from other databases are allowed</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ ALL_TESTS = [
"//pkg/sql/colflow/colrpc:colrpc_test",
"//pkg/sql/colflow:colflow_test",
"//pkg/sql/colmem:colmem_test",
"//pkg/sql/contention/txnidcache:txnidcache_test",
"//pkg/sql/contention:contention_test",
"//pkg/sql/covering:covering_test",
"//pkg/sql/distsql:distsql_test",
Expand Down
3 changes: 3 additions & 0 deletions pkg/roachpb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func ConstructStatementFingerprintID(
// individual statement fingerprint IDs that comprise the transaction.
type TransactionFingerprintID uint64

// InvalidTransactionFingerprintID denotes an invalid transaction fingerprint ID.
const InvalidTransactionFingerprintID = TransactionFingerprintID(0)

// Size returns the size of the TransactionFingerprintID.
func (t TransactionFingerprintID) Size() int64 {
return 8
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ go_library(
"//pkg/sql/colexec",
"//pkg/sql/colflow",
"//pkg/sql/contention",
"//pkg/sql/contention/txnidcache",
"//pkg/sql/covering",
"//pkg/sql/delegate",
"//pkg/sql/distsql",
Expand Down
49 changes: 43 additions & 6 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/idxusage"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -64,6 +65,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"golang.org/x/net/trace"
Expand Down Expand Up @@ -280,6 +282,10 @@ type Server struct {
// node as gateway node.
indexUsageStats *idxusage.LocalIndexUsageStats

// txnIDCache stores the mapping from transaction ID to transaction
//fingerprint IDs for all recently executed.
txnIDCache txnidcache.Provider

// Metrics is used to account normal queries.
Metrics Metrics

Expand Down Expand Up @@ -320,6 +326,10 @@ type Metrics struct {
type ServerMetrics struct {
// StatsMetrics contains metrics for SQL statistics collection.
StatsMetrics StatsMetrics

// ContentionSubsystemMetrics contains metrics related to contention
// subsystem.
ContentionSubsystemMetrics txnidcache.Metrics
}

// NewServer creates a new Server. Start() needs to be called before the Server
Expand Down Expand Up @@ -362,6 +372,9 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
ChannelSize: idxusage.DefaultChannelSize,
Setting: cfg.Settings,
}),
txnIDCache: txnidcache.NewTxnIDCache(
cfg.Settings,
&serverMetrics.ContentionSubsystemMetrics),
}

telemetryLoggingMetrics := &TelemetryLoggingMetrics{}
Expand Down Expand Up @@ -452,6 +465,7 @@ func makeServerMetrics(cfg *ExecutorConfig) ServerMetrics {
MetaSQLTxnStatsCollectionOverhead, 6*metricsSampleInterval,
),
},
ContentionSubsystemMetrics: txnidcache.NewMetrics(),
}
}

Expand All @@ -463,6 +477,8 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
// accumulated in the reporter when the telemetry server fails.
// Usually it is telemetry's reporter's job to clear the reporting SQL Stats.
s.reportedStats.Start(ctx, stopper)

s.txnIDCache.Start(ctx, stopper)
}

// GetSQLStatsController returns the persistedsqlstats.Controller for current
Expand All @@ -488,6 +504,11 @@ func (s *Server) GetReportedSQLStatsController() *sslocal.Controller {
return s.reportedStatsController
}

// GetTxnIDCache returns the txnidcache.Cache for the current sql.Server.
func (s *Server) GetTxnIDCache() txnidcache.Provider {
return s.txnIDCache
}

// GetScrubbedStmtStats returns the statement statistics by app, with the
// queries scrubbed of their identifiers. Any statements which cannot be
// scrubbed will be omitted from the returned map.
Expand Down Expand Up @@ -813,6 +834,7 @@ func (s *Server) newConnExecutor(
hasCreatedTemporarySchema: false,
stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder,
indexUsageStats: s.indexUsageStats,
txnIDCacheWriter: s.txnIDCache.GetWriter(),
}

ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
Expand Down Expand Up @@ -983,6 +1005,7 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{})

func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.sessionEventf(ctx, "finishing connExecutor")
ex.txnIDCacheWriter.Close()

txnEv := noEvent
if _, noTxn := ex.machine.CurState().(stateNoTxn); !noTxn {
Expand Down Expand Up @@ -1018,7 +1041,8 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.state.finishExternalTxn()
}

if err := ex.resetExtraTxnState(ctx, txnEv); err != nil {
// Since we are closing connExecutor, no need to fetch txnID.
if err := ex.resetExtraTxnState(ctx, txnEv, uuid.UUID{}); err != nil {
log.Warningf(ctx, "error while cleaning up connExecutor: %s", err)
}

Expand Down Expand Up @@ -1400,6 +1424,10 @@ type connExecutor struct {

// indexUsageStats is used to track index usage stats.
indexUsageStats *idxusage.LocalIndexUsageStats

// txnIDCacheWriter is used to write txnidcache.ResolvedTxnID to the
// Transaction ID Cache.
txnIDCacheWriter txnidcache.Writer
}

// ctxHolder contains a connection's context and, while session tracing is
Expand Down Expand Up @@ -1516,7 +1544,9 @@ func (ns *prepStmtNamespace) resetTo(

// resetExtraTxnState resets the fields of ex.extraTxnState when a transaction
// commits, rolls back or restarts.
func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) error {
func (ex *connExecutor) resetExtraTxnState(
ctx context.Context, ev txnEvent, txnID uuid.UUID,
) error {
ex.extraTxnState.jobs = nil
ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}
ex.extraTxnState.schemaChangerState = SchemaChangerState{
Expand All @@ -1542,7 +1572,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err
delete(ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals, name)
}
ex.extraTxnState.savepoints.clear()
ex.onTxnFinish(ctx, ev)
ex.onTxnFinish(ctx, ev, txnID)
case txnRestart:
ex.onTxnRestart()
ex.state.mu.Lock()
Expand Down Expand Up @@ -2173,8 +2203,8 @@ func (ex *connExecutor) execCopyIn(
}
} else {
txnOpt = copyTxnOpt{
resetExtraTxnState: func(ctx context.Context) error {
return ex.resetExtraTxnState(ctx, noEvent)
resetExtraTxnState: func(ctx context.Context, txnID uuid.UUID) error {
return ex.resetExtraTxnState(ctx, noEvent, txnID)
},
}
}
Expand Down Expand Up @@ -2598,6 +2628,13 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
txnIsOpen = true
}

ex.state.mu.RLock()
var txnID uuid.UUID
if ex.state.mu.txn != nil {
txnID = ex.state.mu.txn.ID()
}
ex.state.mu.RUnlock()

ex.mu.Lock()
err := ex.machine.ApplyWithPayload(withStatement(ex.Ctx(), ex.curStmtAST), ev, payload)
ex.mu.Unlock()
Expand Down Expand Up @@ -2718,7 +2755,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(

fallthrough
case txnRestart, txnRollback:
if err := ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent); err != nil {
if err := ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent, txnID); err != nil {
return advanceInfo{}, err
}
default:
Expand Down
19 changes: 17 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain"
Expand All @@ -51,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)
Expand Down Expand Up @@ -1909,7 +1911,7 @@ func (ex *connExecutor) recordTransactionStart() {
}
}

func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnID uuid.UUID) {
if ex.extraTxnState.shouldExecuteOnTxnFinish {
ex.extraTxnState.shouldExecuteOnTxnFinish = false
txnStart := ex.extraTxnState.txnFinishClosure.txnStartTime
Expand All @@ -1923,7 +1925,14 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
transactionFingerprintID,
)
}
err := ex.recordTransaction(ctx, transactionFingerprintID, ev, implicit, txnStart)
if ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded != nil {
ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded(
ex.sessionData(),
txnID,
transactionFingerprintID,
)
}
err := ex.recordTransaction(ctx, txnID, transactionFingerprintID, ev, implicit, txnStart)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to record transaction stats: %s", err)
Expand Down Expand Up @@ -1954,6 +1963,7 @@ func (ex *connExecutor) onTxnRestart() {

func (ex *connExecutor) recordTransaction(
ctx context.Context,
transactionID uuid.UUID,
transactionFingerprintID roachpb.TransactionFingerprintID,
ev txnEvent,
implicit bool,
Expand All @@ -1973,6 +1983,11 @@ func (ex *connExecutor) recordTransaction(
ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1)
ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds())

ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
TxnID: transactionID,
TxnFingerprintID: transactionFingerprintID,
})

txnServiceLat := ex.phaseTimes.GetTransactionServiceLatency()
txnRetryLat := ex.phaseTimes.GetTransactionRetryLatency()
commitLat := ex.phaseTimes.GetCommitLatency()
Expand Down
60 changes: 60 additions & 0 deletions pkg/sql/contention/txnidcache/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "txnidcache",
srcs = [
"cluster_settings.go",
"concurrent_write_buffer.go",
"metrics.go",
"provider.go",
"shard.go",
"sharded_store.go",
"strip.go",
"txn_id_cache.go",
"writer_pool.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache",
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb:with-mocks",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/encoding",
"//pkg/util/metric",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/uuid",
],
)

go_test(
name = "txnidcache_test",
srcs = [
"main_test.go",
"shard_test.go",
"store_test.go",
"strip_test.go",
"txn_id_cache_test.go",
],
embed = [":txnidcache"],
deps = [
"//pkg/kv",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/sessiondata",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
20 changes: 20 additions & 0 deletions pkg/sql/contention/txnidcache/cluster_settings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package txnidcache

import "github.com/cockroachdb/cockroach/pkg/settings"

// MaxSize limits the maximum byte size can be used by the TxnIDCache.
var MaxSize = settings.RegisterByteSizeSetting(
`sql.contention.txn_id_cache.max_size`,
"the maximum byte size TxnID cache will use",
64*1024*1024, // 64 MB
).WithPublic()
Loading

0 comments on commit f280662

Please sign in to comment.