From b4dead6acb627fcc8a3a00c808334a8c4c7f87c9 Mon Sep 17 00:00:00 2001 From: j82w Date: Thu, 7 Sep 2023 09:57:21 -0400 Subject: [PATCH 1/4] Revert "ssmemstorage: improve lock contention on RecordStatement" This reverts commit a792cd30a864cae879f8f91040b7394304ee3572. --- pkg/sql/sqlstats/sslocal/BUILD.bazel | 8 - pkg/sql/sqlstats/sslocal/sql_stats_test.go | 144 ------------------ .../sqlstats/ssmemstorage/ss_mem_storage.go | 143 +++++++---------- .../sqlstats/ssmemstorage/ss_mem_writer.go | 89 ++++------- 4 files changed, 81 insertions(+), 303 deletions(-) diff --git a/pkg/sql/sqlstats/sslocal/BUILD.bazel b/pkg/sql/sqlstats/sslocal/BUILD.bazel index e2a50fe27050..dc3ee4c4ad76 100644 --- a/pkg/sql/sqlstats/sslocal/BUILD.bazel +++ b/pkg/sql/sqlstats/sslocal/BUILD.bazel @@ -45,7 +45,6 @@ go_test( ":sslocal", "//pkg/base", "//pkg/ccl", - "//pkg/kv/kvpb", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", @@ -56,8 +55,6 @@ go_test( "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/appstatspb", - "//pkg/sql/clusterunique", - "//pkg/sql/execstats", "//pkg/sql/sessiondata", "//pkg/sql/sessiondatapb", "//pkg/sql/sessionphase", @@ -65,20 +62,15 @@ go_test( "//pkg/sql/sqlstats/insights", "//pkg/sql/sqlstats/persistedsqlstats", "//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil", - "//pkg/storage/enginepb", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", - "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/metric", "//pkg/util/mon", "//pkg/util/timeutil", - "//pkg/util/uint128", - "//pkg/util/uuid", "@com_github_jackc_pgx_v4//:pgx", "@com_github_stretchr_testify//require", ], diff --git a/pkg/sql/sqlstats/sslocal/sql_stats_test.go b/pkg/sql/sqlstats/sslocal/sql_stats_test.go index 3385587dcea3..e09b032418f7 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats_test.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats_test.go @@ -14,7 +14,6 @@ import ( "context" gosql "database/sql" "encoding/json" - "fmt" "math" "net/url" "sort" @@ -23,7 +22,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -31,8 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" - "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" - "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" @@ -41,19 +37,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/uint128" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/jackc/pgx/v4" "github.com/stretchr/testify/require" ) @@ -510,141 +501,6 @@ func TestExplicitTxnFingerprintAccounting(t *testing.T) { } } -func BenchmarkRecordStatement(b *testing.B) { - defer leaktest.AfterTest(b)() - defer log.Scope(b).Close(b) - - ctx := context.Background() - - st := cluster.MakeTestingClusterSettings() - monitor := mon.NewUnlimitedMonitor( - context.Background(), "test", mon.MemoryResource, - nil /* curCount */, nil /* maxHist */, math.MaxInt64, st, - ) - - insightsProvider := insights.New(st, insights.NewMetrics()) - sqlStats := sslocal.New( - st, - sqlstats.MaxMemSQLStatsStmtFingerprints, - sqlstats.MaxMemSQLStatsTxnFingerprints, - metric.NewGauge(sql.MetaReportedSQLStatsMemCurBytes), /* curMemoryBytesCount */ - metric.NewHistogram(metric.HistogramOptions{ - Metadata: sql.MetaReportedSQLStatsMemMaxBytes, - Duration: 10 * time.Second, - MaxVal: 19 * 1000, - SigFigs: 3, - BucketConfig: metric.MemoryUsage64MBBuckets, - }), /* maxMemoryBytesHist */ - insightsProvider.Writer, - monitor, - nil, /* reportingSink */ - nil, /* knobs */ - insightsProvider.LatencyInformation(), - ) - - appStats := sqlStats.GetApplicationStats("" /* appName */, false /* internal */) - statsCollector := sslocal.NewStatsCollector( - st, - appStats, - sessionphase.NewTimes(), - nil, /* knobs */ - ) - - txnId := uuid.FastMakeV4() - generateRecord := func() sqlstats.RecordedStmtStats { - return sqlstats.RecordedStmtStats{ - SessionID: clusterunique.ID{Uint128: uint128.Uint128{Hi: 0x1771ca67e66e6b48, Lo: 0x1}}, - StatementID: clusterunique.ID{Uint128: uint128.Uint128{Hi: 0x1771ca67e67262e8, Lo: 0x1}}, - TransactionID: txnId, - AutoRetryCount: 0, - AutoRetryReason: error(nil), - RowsAffected: 1, - IdleLatencySec: 0, - ParseLatencySec: 6.5208e-05, - PlanLatencySec: 0.000187041, - RunLatencySec: 0.500771041, - ServiceLatencySec: 0.501024374, - OverheadLatencySec: 1.0839999999845418e-06, - BytesRead: 30, - RowsRead: 1, - RowsWritten: 1, - Nodes: []int64{1}, - StatementType: 1, - Plan: (*appstatspb.ExplainTreePlanNode)(nil), - PlanGist: "AgHQAQIAAwIAAAcGBQYh0AEAAQ==", - StatementError: error(nil), - IndexRecommendations: []string(nil), - Query: "UPDATE t SET s = '_' WHERE id = '_'", - StartTime: time.Date(2023, time.July, 14, 16, 58, 2, 837542000, time.UTC), - EndTime: time.Date(2023, time.July, 14, 16, 58, 3, 338566374, time.UTC), - FullScan: false, - ExecStats: &execstats.QueryLevelStats{ - NetworkBytesSent: 10, - MaxMemUsage: 40960, - MaxDiskUsage: 197, - KVBytesRead: 30, - KVPairsRead: 1, - KVRowsRead: 1, - KVBatchRequestsIssued: 1, - KVTime: 498771793, - MvccSteps: 1, - MvccStepsInternal: 2, - MvccSeeks: 4, - MvccSeeksInternal: 4, - MvccBlockBytes: 39, - MvccBlockBytesInCache: 25, - MvccKeyBytes: 23, - MvccValueBytes: 250, - MvccPointCount: 6, - MvccPointsCoveredByRangeTombstones: 99, - MvccRangeKeyCount: 9, - MvccRangeKeyContainedPoints: 88, - MvccRangeKeySkippedPoints: 66, - NetworkMessages: 55, - ContentionTime: 498546750, - ContentionEvents: []kvpb.ContentionEvent{{Key: []byte{}, TxnMeta: enginepb.TxnMeta{ID: uuid.FastMakeV4(), Key: []uint8{0xf0, 0x89, 0x12, 0x74, 0x65, 0x73, 0x74, 0x0, 0x1, 0x88}, IsoLevel: 0, Epoch: 0, WriteTimestamp: hlc.Timestamp{WallTime: 1689354396802600000, Logical: 0, Synthetic: false}, MinTimestamp: hlc.Timestamp{WallTime: 1689354396802600000, Logical: 0, Synthetic: false}, Priority: 118164, Sequence: 1, CoordinatorNodeID: 1}, Duration: 498546750}}, - RUEstimate: 9999, - CPUTime: 12345, - SqlInstanceIds: map[base.SQLInstanceID]struct{}{1: {}}, - Regions: []string{"test"}}, - Indexes: []string{"104@1"}, - Database: "defaultdb", - } - } - - parallel := []int{10} - for _, p := range parallel { - name := fmt.Sprintf("RecordStatement: Parallelism %d", p) - b.Run(name, func(b *testing.B) { - b.SetParallelism(p) - recordValue := generateRecord() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, err := statsCollector.RecordStatement( - ctx, - appstatspb.StatementStatisticsKey{ - Query: "select * from T where t.l = 1000", - ImplicitTxn: true, - Vec: true, - FullScan: true, - DistSQL: true, - Database: "testDb", - App: "myTestApp", - PlanHash: 9001, - QuerySummary: "select * from T", - }, - recordValue, - ) - // Adds overhead to benchmark and shows up in profile - if err != nil { - require.NoError(b, err) - } - } - }) - }) - } -} - func TestAssociatingStmtStatsWithTxnFingerprint(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go index 9d6105cbca61..6b3325ce3808 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_storage.go @@ -105,12 +105,6 @@ type Container struct { stmts map[stmtKey]*stmtStats txns map[appstatspb.TransactionFingerprintID]*txnStats - } - - // Use a separate lock to avoid lock contention. Don't block the statement - // stats just to update the sampled plan time. - muPlanCache struct { - syncutil.RWMutex // sampledPlanMetadataCache records when was the last time the plan was // sampled. This data structure uses a subset of stmtKey as the key into @@ -160,7 +154,7 @@ func New( s.mu.stmts = make(map[stmtKey]*stmtStats) s.mu.txns = make(map[appstatspb.TransactionFingerprintID]*txnStats) - s.muPlanCache.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time) + s.mu.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time) s.atomic.uniqueStmtFingerprintCount = uniqueStmtFingerprintCount s.atomic.uniqueTxnFingerprintCount = uniqueTxnFingerprintCount @@ -550,18 +544,6 @@ func (s *Container) getStatsForStmt( func (s *Container) getStatsForStmtWithKey( key stmtKey, stmtFingerprintID appstatspb.StmtFingerprintID, createIfNonexistent bool, ) (stats *stmtStats, created, throttled bool) { - // Use the read lock to get the key to avoid contention. - ok := func() (ok bool) { - s.mu.RLock() - defer s.mu.RUnlock() - stats, ok = s.mu.stmts[key] - return ok - }() - if ok || !createIfNonexistent { - return stats, false /* created */, false /* throttled */ - } - - // Key does not exist in map. Take a full lock to add the key. s.mu.Lock() defer s.mu.Unlock() return s.getStatsForStmtWithKeyLocked(key, stmtFingerprintID, createIfNonexistent) @@ -573,32 +555,30 @@ func (s *Container) getStatsForStmtWithKeyLocked( // Retrieve the per-statement statistic object, and create it if it // doesn't exist yet. stats, ok := s.mu.stmts[key] - if ok || !createIfNonexistent { - return stats, false /* created */, false /* throttled */ - } - - // If the uniqueStmtFingerprintCount is nil, then we don't check for - // fingerprint limit. - if s.atomic.uniqueStmtFingerprintCount != nil { - // We check if we have reached the limit of unique fingerprints we can - // store. - limit := s.uniqueStmtFingerprintLimit.Get(&s.st.SV) - incrementedFingerprintCount := - atomic.AddInt64(s.atomic.uniqueStmtFingerprintCount, int64(1) /* delts */) - - // Abort if we have exceeded limit of unique statement fingerprints. - if incrementedFingerprintCount > limit { - atomic.AddInt64(s.atomic.uniqueStmtFingerprintCount, -int64(1) /* delts */) - return stats, false /* created */, true /* throttled */ + if !ok && createIfNonexistent { + // If the uniqueStmtFingerprintCount is nil, then we don't check for + // fingerprint limit. + if s.atomic.uniqueStmtFingerprintCount != nil { + // We check if we have reached the limit of unique fingerprints we can + // store. + limit := s.uniqueStmtFingerprintLimit.Get(&s.st.SV) + incrementedFingerprintCount := + atomic.AddInt64(s.atomic.uniqueStmtFingerprintCount, int64(1) /* delts */) + + // Abort if we have exceeded limit of unique statement fingerprints. + if incrementedFingerprintCount > limit { + atomic.AddInt64(s.atomic.uniqueStmtFingerprintCount, -int64(1) /* delts */) + return stats, false /* created */, true /* throttled */ + } } - } - stats = &stmtStats{} - stats.ID = stmtFingerprintID - s.mu.stmts[key] = stats - - s.setLogicalPlanLastSampled(key.sampledPlanKey, s.getTimeNow()) + stats = &stmtStats{} + stats.ID = stmtFingerprintID + s.mu.stmts[key] = stats + s.mu.sampledPlanMetadataCache[key.sampledPlanKey] = s.getTimeNow() - return stats, true /* created */, false /* throttled */ + return stats, true /* created */, false /* throttled */ + } + return stats, false /* created */, false /* throttled */ } func (s *Container) getStatsForTxnWithKey( @@ -606,20 +586,9 @@ func (s *Container) getStatsForTxnWithKey( stmtFingerprintIDs []appstatspb.StmtFingerprintID, createIfNonexistent bool, ) (stats *txnStats, created, throttled bool) { - // Use the read lock to get the key to avoid contention - ok := func() (ok bool) { - s.mu.RLock() - defer s.mu.RUnlock() - stats, ok = s.mu.txns[key] - return ok - }() - if ok || !createIfNonexistent { - return stats, false /* created */, false /* throttled */ - } - - // Key does not exist in map. Take a full lock to add the key. s.mu.Lock() defer s.mu.Unlock() + return s.getStatsForTxnWithKeyLocked(key, stmtFingerprintIDs, createIfNonexistent) } @@ -631,34 +600,33 @@ func (s *Container) getStatsForTxnWithKeyLocked( // Retrieve the per-transaction statistic object, and create it if it doesn't // exist yet. stats, ok := s.mu.txns[key] - if ok || !createIfNonexistent { - return stats, false /* created */, false /* throttled */ - } - - // If the uniqueTxnFingerprintCount is nil, then we don't check for - // fingerprint limit. - if s.atomic.uniqueTxnFingerprintCount != nil { - limit := s.uniqueTxnFingerprintLimit.Get(&s.st.SV) - incrementedFingerprintCount := - atomic.AddInt64(s.atomic.uniqueTxnFingerprintCount, int64(1) /* delts */) - - // If we have exceeded limit of fingerprint count, decrement the counter - // and abort. - if incrementedFingerprintCount > limit { - atomic.AddInt64(s.atomic.uniqueTxnFingerprintCount, -int64(1) /* delts */) - return nil /* stats */, false /* created */, true /* throttled */ + if !ok && createIfNonexistent { + // If the uniqueTxnFingerprintCount is nil, then we don't check for + // fingerprint limit. + if s.atomic.uniqueTxnFingerprintCount != nil { + limit := s.uniqueTxnFingerprintLimit.Get(&s.st.SV) + incrementedFingerprintCount := + atomic.AddInt64(s.atomic.uniqueTxnFingerprintCount, int64(1) /* delts */) + + // If we have exceeded limit of fingerprint count, decrement the counter + // and abort. + if incrementedFingerprintCount > limit { + atomic.AddInt64(s.atomic.uniqueTxnFingerprintCount, -int64(1) /* delts */) + return nil /* stats */, false /* created */, true /* throttled */ + } } + stats = &txnStats{} + stats.statementFingerprintIDs = stmtFingerprintIDs + s.mu.txns[key] = stats + return stats, true /* created */, false /* throttled */ } - stats = &txnStats{} - stats.statementFingerprintIDs = stmtFingerprintIDs - s.mu.txns[key] = stats - return stats, true /* created */, false /* throttled */ + return stats, false /* created */, false /* throttled */ } // SaveToLog saves the existing statement stats into the info log. func (s *Container) SaveToLog(ctx context.Context, appName string) { - s.mu.RLock() - defer s.mu.RUnlock() + s.mu.Lock() + defer s.mu.Unlock() if len(s.mu.stmts) == 0 { return } @@ -690,10 +658,7 @@ func (s *Container) Clear(ctx context.Context) { // large for the likely future workload. s.mu.stmts = make(map[stmtKey]*stmtStats, len(s.mu.stmts)/2) s.mu.txns = make(map[appstatspb.TransactionFingerprintID]*txnStats, len(s.mu.txns)/2) - - s.muPlanCache.Lock() - defer s.muPlanCache.Unlock() - s.muPlanCache.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time, len(s.muPlanCache.sampledPlanMetadataCache)/2) + s.mu.sampledPlanMetadataCache = make(map[sampledPlanKey]time.Time, len(s.mu.sampledPlanMetadataCache)/2) } // Free frees the accounted resources from the Container. The Container is @@ -803,8 +768,8 @@ func (s *Container) MergeApplicationTransactionStats( // a lock on a will cause a deadlock. func (s *Container) Add(ctx context.Context, other *Container) (err error) { statMap := func() map[stmtKey]*stmtStats { - other.mu.RLock() - defer other.mu.RUnlock() + other.mu.Lock() + defer other.mu.Unlock() statMap := make(map[stmtKey]*stmtStats) for k, v := range other.mu.stmts { @@ -981,16 +946,16 @@ func (s *transactionCounts) recordTransactionCounts( func (s *Container) getLogicalPlanLastSampled( key sampledPlanKey, ) (lastSampled time.Time, found bool) { - s.muPlanCache.RLock() - defer s.muPlanCache.RUnlock() - lastSampled, found = s.muPlanCache.sampledPlanMetadataCache[key] + s.mu.Lock() + defer s.mu.Unlock() + lastSampled, found = s.mu.sampledPlanMetadataCache[key] return lastSampled, found } func (s *Container) setLogicalPlanLastSampled(key sampledPlanKey, time time.Time) { - s.muPlanCache.Lock() - defer s.muPlanCache.Unlock() - s.muPlanCache.sampledPlanMetadataCache[key] = time + s.mu.Lock() + defer s.mu.Unlock() + s.mu.sampledPlanMetadataCache[key] = time } // shouldSaveLogicalPlanDescription returns whether we should save the sample diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index 6e2d00bc53b8..776a65a14e8a 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -103,57 +103,20 @@ func (s *Container) RecordStatement( return stmtFingerprintID, nil } - var errorCode string - if value.StatementError != nil { - errorCode = pgerror.GetPGCode(value.StatementError).String() - } - - s.observeInsightStatement(value, stmtFingerprintID, errorCode) - - var lastErrorCode string - if key.Failed { - lastErrorCode = pgerror.GetPGCode(value.StatementError).String() - } - - // Percentile latencies are only being sampled if the latency was above the - // AnomalyDetectionLatencyThreshold. - // The Insights detector already does a flush when detecting for anomaly latency, - // so there is no need to force a flush when retrieving the data during this step. - latencies := s.latencyInformation.GetPercentileValues(stmtFingerprintID, false) - latencyInfo := appstatspb.LatencyInfo{ - Min: value.ServiceLatencySec, - Max: value.ServiceLatencySec, - P50: latencies.P50, - P90: latencies.P90, - P99: latencies.P99, - } - - // Get the time outside the lock to reduce time in the lock - timeNow := s.getTimeNow() - - // setLogicalPlanLastSampled has its own lock so update it before taking - // the stats lock. - if value.Plan != nil { - s.setLogicalPlanLastSampled(statementKey.sampledPlanKey, timeNow) - } - - planGist := []string{value.PlanGist} - // Collect the per-statement statisticstats. - // Do as much computation before the lock to reduce the amount of time the - // lock is held which reduces lock contention. stats.mu.Lock() defer stats.mu.Unlock() stats.mu.data.Count++ if key.Failed { stats.mu.data.SensitiveInfo.LastErr = value.StatementError.Error() - stats.mu.data.LastErrorCode = lastErrorCode + stats.mu.data.LastErrorCode = pgerror.GetPGCode(value.StatementError).String() } // Only update MostRecentPlanDescription if we sampled a new PlanDescription. if value.Plan != nil { stats.mu.data.SensitiveInfo.MostRecentPlanDescription = *value.Plan - stats.mu.data.SensitiveInfo.MostRecentPlanTimestamp = timeNow + stats.mu.data.SensitiveInfo.MostRecentPlanTimestamp = s.getTimeNow() + s.setLogicalPlanLastSampled(statementKey.sampledPlanKey, stats.mu.data.SensitiveInfo.MostRecentPlanTimestamp) } if value.AutoRetryCount == 0 { stats.mu.data.FirstAttemptCount++ @@ -172,14 +135,27 @@ func (s *Container) RecordStatement( stats.mu.data.BytesRead.Record(stats.mu.data.Count, float64(value.BytesRead)) stats.mu.data.RowsRead.Record(stats.mu.data.Count, float64(value.RowsRead)) stats.mu.data.RowsWritten.Record(stats.mu.data.Count, float64(value.RowsWritten)) - stats.mu.data.LastExecTimestamp = timeNow + stats.mu.data.LastExecTimestamp = s.getTimeNow() stats.mu.data.Nodes = util.CombineUnique(stats.mu.data.Nodes, value.Nodes) if value.ExecStats != nil { stats.mu.data.Regions = util.CombineUnique(stats.mu.data.Regions, value.ExecStats.Regions) } - stats.mu.data.PlanGists = util.CombineUnique(stats.mu.data.PlanGists, planGist) - stats.mu.data.Indexes = util.CombineUnique(stats.mu.data.Indexes, value.Indexes) + stats.mu.data.PlanGists = util.CombineUnique(stats.mu.data.PlanGists, []string{value.PlanGist}) stats.mu.data.IndexRecommendations = value.IndexRecommendations + stats.mu.data.Indexes = util.CombineUnique(stats.mu.data.Indexes, value.Indexes) + + // Percentile latencies are only being sampled if the latency was above the + // AnomalyDetectionLatencyThreshold. + // The Insights detector already does a flush when detecting for anomaly latency, + // so there is no need to force a flush when retrieving the data during this step. + latencies := s.latencyInformation.GetPercentileValues(stmtFingerprintID, false) + latencyInfo := appstatspb.LatencyInfo{ + Min: value.ServiceLatencySec, + Max: value.ServiceLatencySec, + P50: latencies.P50, + P90: latencies.P90, + P99: latencies.P99, + } stats.mu.data.LatencyInfo.Add(latencyInfo) // Note that some fields derived from tracing statements (such as @@ -216,14 +192,6 @@ func (s *Container) RecordStatement( } } - return stats.ID, nil -} - -func (s *Container) observeInsightStatement( - value sqlstats.RecordedStmtStats, - stmtFingerprintID appstatspb.StmtFingerprintID, - errorCode string, -) { var autoRetryReason string if value.AutoRetryReason != nil { autoRetryReason = value.AutoRetryReason.Error() @@ -236,6 +204,11 @@ func (s *Container) observeInsightStatement( cpuSQLNanos = value.ExecStats.CPUTime.Nanoseconds() } + var errorCode string + if value.StatementError != nil { + errorCode = pgerror.GetPGCode(value.StatementError).String() + } + s.insights.ObserveStatement(value.SessionID, &insights.Statement{ ID: value.StatementID, FingerprintID: stmtFingerprintID, @@ -257,6 +230,8 @@ func (s *Container) observeInsightStatement( CPUSQLNanos: cpuSQLNanos, ErrorCode: errorCode, }) + + return stats.ID, nil } // RecordStatementExecStats implements sqlstats.Writer interface. @@ -318,12 +293,7 @@ func (s *Container) RecordTransaction( return ErrFingerprintLimitReached } - // Insights logic is thread safe and does not require stats lock - s.observerInsightTransaction(value, key) - // Collect the per-transaction statistics. - // Do as much computation before the lock to reduce the amount of time the - // lock is held which reduces lock contention. stats.mu.Lock() defer stats.mu.Unlock() @@ -390,12 +360,6 @@ func (s *Container) RecordTransaction( stats.mu.data.ExecStats.MVCCIteratorStats.RangeKeySkippedPoints.Record(stats.mu.data.ExecStats.Count, float64(value.ExecStats.MvccRangeKeySkippedPoints)) } - return nil -} - -func (s *Container) observerInsightTransaction( - value sqlstats.RecordedTxnStats, key appstatspb.TransactionFingerprintID, -) { var retryReason string if value.AutoRetryReason != nil { retryReason = value.AutoRetryReason.Error() @@ -422,6 +386,7 @@ func (s *Container) observerInsightTransaction( AutoRetryReason: retryReason, CPUSQLNanos: cpuSQLNanos, }) + return nil } func (s *Container) recordTransactionHighLevelStats( From 50d101612ff93c2a5d33b42e257ed2a5c81a52f3 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 7 Sep 2023 20:46:36 +0000 Subject: [PATCH 2/4] kv/bulk: remove error return from SSTBatcher.Reset Release note: none. Epic: none. --- pkg/ccl/backupccl/restore_data_processor.go | 6 ++--- .../stream_ingestion_processor.go | 4 +--- pkg/kv/bulk/buffering_adder.go | 4 +--- pkg/kv/bulk/sst_batcher.go | 22 +++++++++---------- 4 files changed, 15 insertions(+), 21 deletions(-) diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index b458ed7c4b2f..fa56c1e27e53 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -767,7 +767,7 @@ func reserveRestoreWorkerMemory( // implement a mock SSTBatcher used purely for job progress tracking. type SSTBatcherExecutor interface { AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error - Reset(ctx context.Context) error + Reset(ctx context.Context) Flush(ctx context.Context) error Close(ctx context.Context) GetSummary() kvpb.BulkOpSummary @@ -786,9 +786,7 @@ func (b *sstBatcherNoop) AddMVCCKey(ctx context.Context, key storage.MVCCKey, va } // Reset resets the counter -func (b *sstBatcherNoop) Reset(ctx context.Context) error { - return nil -} +func (b *sstBatcherNoop) Reset(ctx context.Context) {} // Flush noops. func (b *sstBatcherNoop) Flush(ctx context.Context) error { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 094a7c326be0..d78f80a05f79 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -1188,9 +1188,7 @@ func (sip *streamIngestionProcessor) flushBuffer(b flushableBuffer) (*jobspb.Res sip.metrics.IngestedEvents.Inc(int64(len(b.buffer.curKVBatch))) sip.metrics.IngestedEvents.Inc(int64(len(b.buffer.curRangeKVBatch))) - if err := sip.batcher.Reset(ctx); err != nil { - return b.checkpoint, err - } + sip.batcher.Reset(ctx) releaseBuffer(b.buffer) diff --git a/pkg/kv/bulk/buffering_adder.go b/pkg/kv/bulk/buffering_adder.go index 44a419d0f7d1..c51fd3bf80b3 100644 --- a/pkg/kv/bulk/buffering_adder.go +++ b/pkg/kv/bulk/buffering_adder.go @@ -256,9 +256,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { b.curBufSummary.Reset() return nil } - if err := b.sink.Reset(ctx); err != nil { - return err - } + b.sink.Reset(ctx) b.sink.currentStats.BufferFlushes++ var before *bulkpb.IngestionPerformanceStats diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 97ca41e42d02..7d11cf35ece4 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -232,8 +232,8 @@ func MakeSSTBatcher( } b.mu.lastFlush = timeutil.Now() b.mu.tracingSpan = tracing.SpanFromContext(ctx) - err := b.Reset(ctx) - return b, err + b.Reset(ctx) + return b, nil } // MakeStreamSSTBatcher creates a batcher configured to ingest duplicate keys @@ -258,8 +258,8 @@ func MakeStreamSSTBatcher( b.mu.lastFlush = timeutil.Now() b.mu.tracingSpan = tracing.SpanFromContext(ctx) b.SetOnFlush(onFlush) - err := b.Reset(ctx) - return b, err + b.Reset(ctx) + return b, nil } // MakeTestingSSTBatcher creates a batcher for testing, allowing setting options @@ -281,8 +281,8 @@ func MakeTestingSSTBatcher( mem: mem, limiter: sendLimiter, } - err := b.Reset(ctx) - return b, err + b.Reset(ctx) + return b, nil } func (b *SSTBatcher) updateMVCCStats(key storage.MVCCKey, value []byte) { @@ -373,7 +373,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value } // Reset clears all state in the batcher and prepares it for reuse. -func (b *SSTBatcher) Reset(ctx context.Context) error { +func (b *SSTBatcher) Reset(ctx context.Context) { b.sstWriter.Close() b.sstFile = &storage.MemObject{} // Create sstables intended for ingestion using the newest format that all @@ -402,8 +402,6 @@ func (b *SSTBatcher) Reset(ctx context.Context) error { if b.mu.totalStats.SendWaitByStore == nil { b.mu.totalStats.SendWaitByStore = make(map[roachpb.StoreID]time.Duration) } - - return nil } const ( @@ -438,7 +436,8 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err if err := b.doFlush(ctx, rangeFlush); err != nil { return err } - return b.Reset(ctx) + b.Reset(ctx) + return nil } if b.sstWriter.DataSize >= ingestFileSize(b.settings) { @@ -462,7 +461,8 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err if err := b.doFlush(ctx, sizeFlush); err != nil { return err } - return b.Reset(ctx) + b.Reset(ctx) + return nil } return nil } From 36b74093a720cd170569f31e0dc71147e5523703 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 7 Sep 2023 20:51:12 +0000 Subject: [PATCH 3/4] streamingest: ensure batcher is always reset Release note: none. Epic: none. --- .../streamingccl/streamingest/stream_ingestion_processor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index d78f80a05f79..2e50f6d00d9c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -1155,6 +1155,8 @@ type flushableBuffer struct { func (sip *streamIngestionProcessor) flushBuffer(b flushableBuffer) (*jobspb.ResolvedSpans, error) { ctx, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-flush") defer sp.Finish() + // Ensure the batcher is always reset, even on early error returns. + defer sip.batcher.Reset(ctx) // First process the point KVs. // @@ -1188,8 +1190,6 @@ func (sip *streamIngestionProcessor) flushBuffer(b flushableBuffer) (*jobspb.Res sip.metrics.IngestedEvents.Inc(int64(len(b.buffer.curKVBatch))) sip.metrics.IngestedEvents.Inc(int64(len(b.buffer.curRangeKVBatch))) - sip.batcher.Reset(ctx) - releaseBuffer(b.buffer) return b.checkpoint, nil From eb3059f598ad080ecb100f1951d8189d4b627c7c Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 7 Sep 2023 20:52:57 +0000 Subject: [PATCH 4/4] kv/bulk: ensure in-flight requests end on batcher reset Release note: none. Epic: none. --- pkg/kv/bulk/sst_batcher.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 7d11cf35ece4..fc4eba747098 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -374,7 +374,13 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value // Reset clears all state in the batcher and prepares it for reuse. func (b *SSTBatcher) Reset(ctx context.Context) { + if err := b.asyncAddSSTs.Wait(); err != nil { + log.Warningf(ctx, "closing with flushes in-progress encountered an error: %v", err) + } + b.asyncAddSSTs = ctxgroup.Group{} + b.sstWriter.Close() + b.sstFile = &storage.MemObject{} // Create sstables intended for ingestion using the newest format that all // nodes can support. MakeIngestionSSTWriter will handle cluster version