Skip to content

Commit

Permalink
sql: introduce Transaction ID Cache
Browse files Browse the repository at this point in the history
Previously, it was impossible to correlate an individual execution
of a transaction (identified via transaction ID) to this historical
execution statistics (identified via transaction fingerprint ID).

This commit introduces Transaction ID Cache (TxnIDCache), a FIFO
cache that stores the mapping from transaction ID to transaction
fingerprint ID. This buffer records the mapping at the
end of the transaction execution. The oldest entry in the buffer
will be evicted through FIFO policy. The default size of this
Transaction ID Cache is capped at 64 MB and it is configurable via
the sql.contention.txn_id_cache.max_size cluster setting.

Release note (sql change): Transaction ID to Transaction Fingerprint
ID mapping is now stored in the new Transaction ID Cache, a FIFO
unordered in-memory buffer. The size of the buffer is 64 MB by default
and configurable via sql.contention.txn_id_cache.max_size cluster
setting. Consequentially, two additioanl metrics are introduced:
* sql.contention.txn_id_cache.size: tracks the current memory usage
  of transaction ID Cache
* sql.contention.txn_id_cache.discarded_count: number of resolved
  transaction IDs that are dropped due to memory constraints.
  • Loading branch information
Azhng committed Jan 28, 2022
1 parent 7adf8b7 commit ac5e305
Show file tree
Hide file tree
Showing 22 changed files with 1,137 additions and 95 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ server.web_session.purge.max_deletions_per_cycle integer 10 the maximum number o
server.web_session.purge.period duration 1h0m0s the time until old sessions are deleted
server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged
server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid
sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use
sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed
sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed
sql.cross_db_sequence_references.enabled boolean false if true, sequences referenced by tables from other databases are allowed
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
<tr><td><code>server.web_session.purge.period</code></td><td>duration</td><td><code>1h0m0s</code></td><td>the time until old sessions are deleted</td></tr>
<tr><td><code>server.web_session.purge.ttl</code></td><td>duration</td><td><code>1h0m0s</code></td><td>if nonzero, entries in system.web_sessions older than this duration are periodically purged</td></tr>
<tr><td><code>server.web_session_timeout</code></td><td>duration</td><td><code>168h0m0s</code></td><td>the duration that a newly created web session will be valid</td></tr>
<tr><td><code>sql.contention.txn_id_cache.max_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the maximum byte size TxnID cache will use</td></tr>
<tr><td><code>sql.cross_db_fks.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating foreign key references across databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_owners.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating sequences owned by tables from other databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_references.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, sequences referenced by tables from other databases are allowed</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ ALL_TESTS = [
"//pkg/sql/colflow/colrpc:colrpc_test",
"//pkg/sql/colflow:colflow_test",
"//pkg/sql/colmem:colmem_test",
"//pkg/sql/contention/txnidcache:txnidcache_test",
"//pkg/sql/contention:contention_test",
"//pkg/sql/covering:covering_test",
"//pkg/sql/distsql:distsql_test",
Expand Down
3 changes: 3 additions & 0 deletions pkg/roachpb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func ConstructStatementFingerprintID(
// individual statement fingerprint IDs that comprise the transaction.
type TransactionFingerprintID uint64

// InvalidTransactionFingerprintID denotes an invalid transaction fingerprint ID.
const InvalidTransactionFingerprintID = TransactionFingerprintID(0)

// Size returns the size of the TransactionFingerprintID.
func (t TransactionFingerprintID) Size() int64 {
return 8
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ go_library(
"//pkg/sql/colflow",
"//pkg/sql/commenter",
"//pkg/sql/contention",
"//pkg/sql/contention/txnidcache",
"//pkg/sql/covering",
"//pkg/sql/delegate",
"//pkg/sql/distsql",
Expand Down
47 changes: 37 additions & 10 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/idxusage"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -279,6 +280,10 @@ type Server struct {
// node as gateway node.
indexUsageStats *idxusage.LocalIndexUsageStats

// txnIDCache stores the mapping from transaction ID to transaction
// fingerprint IDs for all recently executed transactions.
txnIDCache *txnidcache.Cache

// Metrics is used to account normal queries.
Metrics Metrics

Expand Down Expand Up @@ -319,6 +324,10 @@ type Metrics struct {
type ServerMetrics struct {
// StatsMetrics contains metrics for SQL statistics collection.
StatsMetrics StatsMetrics

// ContentionSubsystemMetrics contains metrics related to contention
// subsystem.
ContentionSubsystemMetrics txnidcache.Metrics
}

// NewServer creates a new Server. Start() needs to be called before the Server
Expand Down Expand Up @@ -361,6 +370,9 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
ChannelSize: idxusage.DefaultChannelSize,
Setting: cfg.Settings,
}),
txnIDCache: txnidcache.NewTxnIDCache(
cfg.Settings,
&serverMetrics.ContentionSubsystemMetrics),
}

telemetryLoggingMetrics := &TelemetryLoggingMetrics{}
Expand Down Expand Up @@ -451,6 +463,7 @@ func makeServerMetrics(cfg *ExecutorConfig) ServerMetrics {
MetaSQLTxnStatsCollectionOverhead, 6*metricsSampleInterval,
),
},
ContentionSubsystemMetrics: txnidcache.NewMetrics(),
}
}

Expand All @@ -462,6 +475,8 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
// accumulated in the reporter when the telemetry server fails.
// Usually it is telemetry's reporter's job to clear the reporting SQL Stats.
s.reportedStats.Start(ctx, stopper)

s.txnIDCache.Start(ctx, stopper)
}

// GetSQLStatsController returns the persistedsqlstats.Controller for current
Expand All @@ -487,6 +502,11 @@ func (s *Server) GetReportedSQLStatsController() *sslocal.Controller {
return s.reportedStatsController
}

// GetTxnIDCache returns the txnidcache.Cache for the current sql.Server.
func (s *Server) GetTxnIDCache() *txnidcache.Cache {
return s.txnIDCache
}

// GetScrubbedStmtStats returns the statement statistics by app, with the
// queries scrubbed of their identifiers. Any statements which cannot be
// scrubbed will be omitted from the returned map.
Expand Down Expand Up @@ -814,6 +834,7 @@ func (s *Server) newConnExecutor(
hasCreatedTemporarySchema: false,
stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder,
indexUsageStats: s.indexUsageStats,
txnIDCacheWriter: s.txnIDCache,
}

ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
Expand Down Expand Up @@ -985,9 +1006,9 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{})
func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.sessionEventf(ctx, "finishing connExecutor")

txnEv := noEvent
txnEvType := noEvent
if _, noTxn := ex.machine.CurState().(stateNoTxn); !noTxn {
txnEv = txnRollback
txnEvType = txnRollback
}

if closeType == normalClose {
Expand Down Expand Up @@ -1019,7 +1040,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.state.finishExternalTxn()
}

if err := ex.resetExtraTxnState(ctx, txnEv); err != nil {
if err := ex.resetExtraTxnState(ctx, txnEvent{eventTyp: txnEvType}); err != nil {
log.Warningf(ctx, "error while cleaning up connExecutor: %s", err)
}

Expand Down Expand Up @@ -1401,6 +1422,10 @@ type connExecutor struct {

// indexUsageStats is used to track index usage stats.
indexUsageStats *idxusage.LocalIndexUsageStats

// txnIDCacheWriter is used to write txnidcache.ResolvedTxnID to the
// Transaction ID Cache.
txnIDCacheWriter txnidcache.Writer
}

// ctxHolder contains a connection's context and, while session tracing is
Expand Down Expand Up @@ -1516,7 +1541,9 @@ func (ns *prepStmtNamespace) resetTo(
}

// resetExtraTxnState resets the fields of ex.extraTxnState when a transaction
// commits, rolls back or restarts.
// finishes execution (either commits, rollbacks or restarts). Based on the
// transaction event, resetExtraTxnState invokes corresponding callbacks
// (e.g. onTxnFinish() and onTxnRestart()).
func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) error {
ex.extraTxnState.jobs = nil
ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}
Expand All @@ -1536,7 +1563,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err
delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
}

switch ev {
switch ev.eventTyp {
case txnCommit, txnRollback:
for name, p := range ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals {
p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
Expand Down Expand Up @@ -2046,7 +2073,7 @@ func (ex *connExecutor) updateTxnRewindPosMaybe(
if _, ok := ex.machine.CurState().(stateOpen); !ok {
return nil
}
if advInfo.txnEvent == txnStart || advInfo.txnEvent == txnRestart {
if advInfo.txnEvent.eventTyp == txnStart || advInfo.txnEvent.eventTyp == txnRestart {
var nextPos CmdPos
switch advInfo.code {
case stayInPlace:
Expand Down Expand Up @@ -2228,7 +2255,7 @@ func (ex *connExecutor) execCopyIn(
} else {
txnOpt = copyTxnOpt{
resetExtraTxnState: func(ctx context.Context) error {
return ex.resetExtraTxnState(ctx, noEvent)
return ex.resetExtraTxnState(ctx, txnEvent{eventTyp: noEvent})
},
}
}
Expand Down Expand Up @@ -2680,7 +2707,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}

// Handle transaction events which cause updates to txnState.
switch advInfo.txnEvent {
switch advInfo.txnEvent.eventTyp {
case noEvent:
_, nextStateIsAborted := ex.machine.CurState().(stateAborted)
// Update the deadline on the transaction based on the collections,
Expand All @@ -2696,7 +2723,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
case txnStart:
ex.extraTxnState.autoRetryCounter = 0
ex.extraTxnState.autoRetryReason = nil
ex.recordTransactionStart()
ex.recordTransactionStart(advInfo.txnEvent.txnID)
// Bump the txn counter for logging.
ex.extraTxnState.txnCounter++
if !ex.server.cfg.Codec.ForSystemTenant() {
Expand All @@ -2719,7 +2746,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
err := errorutil.UnexpectedWithIssueErrorf(
26687,
"programming error: non-error event %s generated even though res.Err() has been set to: %s",
errors.Safe(advInfo.txnEvent.String()),
errors.Safe(advInfo.txnEvent.eventTyp.String()),
res.Err())
log.Errorf(ex.Ctx(), "%v", err)
errorutil.SendReport(ex.Ctx(), &ex.server.cfg.Settings.SV, err)
Expand Down
25 changes: 23 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain"
Expand All @@ -51,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)
Expand Down Expand Up @@ -1881,7 +1883,14 @@ func payloadHasError(payload fsm.EventPayload) bool {
}

// recordTransactionStart records the start of the transaction.
func (ex *connExecutor) recordTransactionStart() {
func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
// Transaction fingerprint ID will be available once transaction finishes
// execution.
ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
TxnID: txnID,
TxnFingerprintID: roachpb.InvalidTransactionFingerprintID,
})

ex.state.mu.RLock()
txnStart := ex.state.mu.txnStart
ex.state.mu.RUnlock()
Expand Down Expand Up @@ -1934,6 +1943,13 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
transactionFingerprintID,
)
}
if ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded != nil {
ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded(
ex.sessionData(),
ev.txnID,
transactionFingerprintID,
)
}
err := ex.recordTransaction(ctx, transactionFingerprintID, ev, implicit, txnStart)
if err != nil {
if log.V(1) {
Expand Down Expand Up @@ -1991,13 +2007,18 @@ func (ex *connExecutor) recordTransaction(
ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1)
ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds())

ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
TxnID: ev.txnID,
TxnFingerprintID: transactionFingerprintID,
})

txnServiceLat := ex.phaseTimes.GetTransactionServiceLatency()
txnRetryLat := ex.phaseTimes.GetTransactionRetryLatency()
commitLat := ex.phaseTimes.GetCommitLatency()

recordedTxnStats := sqlstats.RecordedTxnStats{
TransactionTimeSec: txnTime.Seconds(),
Committed: ev == txnCommit,
Committed: ev.eventTyp == txnCommit,
ImplicitTxn: implicit,
RetryCount: int64(ex.extraTxnState.autoRetryCounter),
StatementFingerprintIDs: ex.extraTxnState.transactionStatementFingerprintIDs,
Expand Down
Loading

0 comments on commit ac5e305

Please sign in to comment.