Skip to content

Commit

Permalink
sql: introduce Transaction ID Cache
Browse files Browse the repository at this point in the history
Previously, it was impossible to correlate an individual execution
of a transaction (identified via transaction ID) to this historical
execution statistics (identified via transaction fingerprint ID).

This commit introduces Transaction ID Cache (TxnIDCache), a FIFO
cache that stores the mapping from transaction ID to transaction
fingerprint ID. This buffer records the mapping at the
end of the transaction execution. The oldest entry in the buffer
will be evicted through FIFO policy. The default size of this
Transaction ID Cache is capped at 64 MB and it is configurable via
the sql.contention.txn_id_cache.max_size cluster setting.

Release note (sql change): Transaction ID to Transaction Fingerprint
ID mapping is now stored in the new Transaction ID Cache, a FIFO
unordered in-memory buffer. The size of the buffer is 64 MB by default
and configurable via sql.contention.txn_id_cache.max_size cluster
setting. Consequentially, two additioanl metrics are introduced:
* sql.contention.txn_id_cache.size: tracks the current memory usage
  of transaction ID Cache
* sql.contention.txn_id_cache.discarded_count: number of resolved
  transaction IDs that are dropped due to memory constraints.
  • Loading branch information
Azhng committed Dec 21, 2021
1 parent 9e7cf26 commit 7d09bcb
Show file tree
Hide file tree
Showing 21 changed files with 1,051 additions and 11 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ server.web_session.purge.max_deletions_per_cycle integer 10 the maximum number o
server.web_session.purge.period duration 1h0m0s the time until old sessions are deleted
server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged
server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid
sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use
sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed
sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed
sql.cross_db_sequence_references.enabled boolean false if true, sequences referenced by tables from other databases are allowed
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
<tr><td><code>server.web_session.purge.period</code></td><td>duration</td><td><code>1h0m0s</code></td><td>the time until old sessions are deleted</td></tr>
<tr><td><code>server.web_session.purge.ttl</code></td><td>duration</td><td><code>1h0m0s</code></td><td>if nonzero, entries in system.web_sessions older than this duration are periodically purged</td></tr>
<tr><td><code>server.web_session_timeout</code></td><td>duration</td><td><code>168h0m0s</code></td><td>the duration that a newly created web session will be valid</td></tr>
<tr><td><code>sql.contention.txn_id_cache.max_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the maximum byte size TxnID cache will use</td></tr>
<tr><td><code>sql.cross_db_fks.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating foreign key references across databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_owners.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating sequences owned by tables from other databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_references.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, sequences referenced by tables from other databases are allowed</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ ALL_TESTS = [
"//pkg/sql/colflow/colrpc:colrpc_test",
"//pkg/sql/colflow:colflow_test",
"//pkg/sql/colmem:colmem_test",
"//pkg/sql/contention/txnidcache:txnidcache_test",
"//pkg/sql/contention:contention_test",
"//pkg/sql/covering:covering_test",
"//pkg/sql/distsql:distsql_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ go_library(
"//pkg/sql/colexec",
"//pkg/sql/colflow",
"//pkg/sql/contention",
"//pkg/sql/contention/txnidcache",
"//pkg/sql/covering",
"//pkg/sql/delegate",
"//pkg/sql/distsql",
Expand Down
47 changes: 41 additions & 6 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/idxusage"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -64,6 +65,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"golang.org/x/net/trace"
Expand Down Expand Up @@ -280,6 +282,10 @@ type Server struct {
// node as gateway node.
indexUsageStats *idxusage.LocalIndexUsageStats

// txnIDCache stores the mapping from transaction ID to transaction
//fingerprint IDs for all recently executed.
txnIDCache txnidcache.Provider

// Metrics is used to account normal queries.
Metrics Metrics

Expand Down Expand Up @@ -320,6 +326,10 @@ type Metrics struct {
type ServerMetrics struct {
// StatsMetrics contains metrics for SQL statistics collection.
StatsMetrics StatsMetrics

// ContentionSubsystemMetrics contains metrics related to contention
// subsystem.
ContentionSubsystemMetrics txnidcache.Metrics
}

// NewServer creates a new Server. Start() needs to be called before the Server
Expand Down Expand Up @@ -362,6 +372,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
ChannelSize: idxusage.DefaultChannelSize,
Setting: cfg.Settings,
}),
txnIDCache: txnidcache.NewTxnIDCache(cfg.Settings, &serverMetrics.ContentionSubsystemMetrics),
}

telemetryLoggingMetrics := &TelemetryLoggingMetrics{}
Expand Down Expand Up @@ -452,6 +463,7 @@ func makeServerMetrics(cfg *ExecutorConfig) ServerMetrics {
MetaSQLTxnStatsCollectionOverhead, 6*metricsSampleInterval,
),
},
ContentionSubsystemMetrics: txnidcache.NewMetrics(),
}
}

Expand All @@ -463,6 +475,8 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
// accumulated in the reporter when the telemetry server fails.
// Usually it is telemetry's reporter's job to clear the reporting SQL Stats.
s.reportedStats.Start(ctx, stopper)

s.txnIDCache.Start(ctx, stopper)
}

// GetSQLStatsController returns the persistedsqlstats.Controller for current
Expand All @@ -488,6 +502,11 @@ func (s *Server) GetReportedSQLStatsController() *sslocal.Controller {
return s.reportedStatsController
}

// GetTxnIDCache returns the txnidcache.Cache for the current sql.Server.
func (s *Server) GetTxnIDCache() txnidcache.Provider {
return s.txnIDCache
}

// GetScrubbedStmtStats returns the statement statistics by app, with the
// queries scrubbed of their identifiers. Any statements which cannot be
// scrubbed will be omitted from the returned map.
Expand Down Expand Up @@ -813,6 +832,7 @@ func (s *Server) newConnExecutor(
hasCreatedTemporarySchema: false,
stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder,
indexUsageStats: s.indexUsageStats,
txnIDCacheWriter: s.txnIDCache.GetWriter(),
}

ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
Expand Down Expand Up @@ -983,6 +1003,7 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{})

func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.sessionEventf(ctx, "finishing connExecutor")
ex.txnIDCacheWriter.Close()

txnEv := noEvent
if _, noTxn := ex.machine.CurState().(stateNoTxn); !noTxn {
Expand Down Expand Up @@ -1018,7 +1039,8 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.state.finishExternalTxn()
}

if err := ex.resetExtraTxnState(ctx, txnEv); err != nil {
// Since we are closing connExecutor, no need to fetch txnID.
if err := ex.resetExtraTxnState(ctx, txnEv, uuid.UUID{}); err != nil {
log.Warningf(ctx, "error while cleaning up connExecutor: %s", err)
}

Expand Down Expand Up @@ -1400,6 +1422,9 @@ type connExecutor struct {

// indexUsageStats is used to track index usage stats.
indexUsageStats *idxusage.LocalIndexUsageStats

// TODO(azhng): wip
txnIDCacheWriter txnidcache.Writer
}

// ctxHolder contains a connection's context and, while session tracing is
Expand Down Expand Up @@ -1516,7 +1541,9 @@ func (ns *prepStmtNamespace) resetTo(

// resetExtraTxnState resets the fields of ex.extraTxnState when a transaction
// commits, rolls back or restarts.
func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) error {
func (ex *connExecutor) resetExtraTxnState(
ctx context.Context, ev txnEvent, txnID uuid.UUID,
) error {
ex.extraTxnState.jobs = nil
ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}
ex.extraTxnState.schemaChangerState = SchemaChangerState{
Expand All @@ -1542,7 +1569,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err
delete(ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals, name)
}
ex.extraTxnState.savepoints.clear()
ex.onTxnFinish(ctx, ev)
ex.onTxnFinish(ctx, ev, txnID)
case txnRestart:
ex.onTxnRestart()
ex.state.mu.Lock()
Expand Down Expand Up @@ -2173,8 +2200,8 @@ func (ex *connExecutor) execCopyIn(
}
} else {
txnOpt = copyTxnOpt{
resetExtraTxnState: func(ctx context.Context) error {
return ex.resetExtraTxnState(ctx, noEvent)
resetExtraTxnState: func(ctx context.Context, txnID uuid.UUID) error {
return ex.resetExtraTxnState(ctx, noEvent, txnID)
},
}
}
Expand Down Expand Up @@ -2598,6 +2625,14 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
txnIsOpen = true
}

// TODO(azhng): wip: should we be smarter about this to reduce locking?
ex.state.mu.RLock()
var txnID uuid.UUID
if ex.state.mu.txn != nil {
txnID = ex.state.mu.txn.ID()
}
ex.state.mu.RUnlock()

ex.mu.Lock()
err := ex.machine.ApplyWithPayload(withStatement(ex.Ctx(), ex.curStmtAST), ev, payload)
ex.mu.Unlock()
Expand Down Expand Up @@ -2718,7 +2753,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(

fallthrough
case txnRestart, txnRollback:
if err := ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent); err != nil {
if err := ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent, txnID); err != nil {
return advanceInfo{}, err
}
default:
Expand Down
21 changes: 18 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain"
Expand All @@ -51,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)
Expand Down Expand Up @@ -1909,7 +1911,7 @@ func (ex *connExecutor) recordTransactionStart() {
}
}

func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnID uuid.UUID) {
if ex.extraTxnState.shouldExecuteOnTxnFinish {
ex.extraTxnState.shouldExecuteOnTxnFinish = false
txnStart := ex.extraTxnState.txnFinishClosure.txnStartTime
Expand All @@ -1923,7 +1925,14 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
transactionFingerprintID,
)
}
err := ex.recordTransaction(ctx, transactionFingerprintID, ev, implicit, txnStart)
if ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded != nil {
ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded(
ex.sessionData(),
txnID,
transactionFingerprintID,
)
}
err := ex.recordTransaction(ctx, txnID, transactionFingerprintID, ev, implicit, txnStart)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to record transaction stats: %s", err)
Expand Down Expand Up @@ -1954,14 +1963,15 @@ func (ex *connExecutor) onTxnRestart() {

func (ex *connExecutor) recordTransaction(
ctx context.Context,
transactionID uuid.UUID,
transactionFingerprintID roachpb.TransactionFingerprintID,
ev txnEvent,
implicit bool,
txnStart time.Time,
) error {
recordingStart := timeutil.Now()
defer func() {
recordingOverhead := timeutil.Since(recordingStart)
recordingOverhead := timeutil.Now().Sub(recordingStart)
ex.server.
ServerMetrics.
StatsMetrics.
Expand All @@ -1973,6 +1983,11 @@ func (ex *connExecutor) recordTransaction(
ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1)
ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds())

ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
TxnID: transactionID,
TxnFingerprintID: transactionFingerprintID,
})

txnServiceLat := ex.phaseTimes.GetTransactionServiceLatency()
txnRetryLat := ex.phaseTimes.GetTransactionRetryLatency()
commitLat := ex.phaseTimes.GetCommitLatency()
Expand Down
56 changes: 56 additions & 0 deletions pkg/sql/contention/txnidcache/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "txnidcache",
srcs = [
"cluster_settings.go",
"metrics.go",
"provider.go",
"sharded_store.go",
"txn_id_cache.go",
"txn_id_cache_shard.go",
"txn_id_cache_write_buffer.go",
"writer_pool.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache",
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb:with-mocks",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/cache",
"//pkg/util/encoding",
"//pkg/util/metric",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/uuid",
],
)

go_test(
name = "txnidcache_test",
srcs = [
"main_test.go",
"txn_id_cache_test.go",
],
deps = [
":txnidcache",
"//pkg/kv",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/sessiondata",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
20 changes: 20 additions & 0 deletions pkg/sql/contention/txnidcache/cluster_settings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package txnidcache

import "github.com/cockroachdb/cockroach/pkg/settings"

// MaxSize limits the maximum byte size can be used by the TxnIDCache.
var MaxSize = settings.RegisterByteSizeSetting(
`sql.contention.txn_id_cache.max_size`,
"the maximum byte size TxnID cache will use",
64*1024*1024, // 64 MB
).WithPublic()
29 changes: 29 additions & 0 deletions pkg/sql/contention/txnidcache/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package txnidcache_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}
Loading

0 comments on commit 7d09bcb

Please sign in to comment.