Skip to content

Commit

Permalink
sql: implement combined iterator for transaction statistics
Browse files Browse the repository at this point in the history
Follow up to cockroachdb#68675

Release note: None
  • Loading branch information
Azhng committed Aug 17, 2021
1 parent 39d59f1 commit e179dcf
Show file tree
Hide file tree
Showing 5 changed files with 458 additions and 19 deletions.
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"provider.go",
"stmt_reader.go",
"test_utils.go",
"txn_reader.go",
"writer.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats",
Expand Down
179 changes: 179 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/combined_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,182 @@ func compareStmtStats(lhs, rhs *roachpb.CollectedStatementStatistics) int {

return 0
}

// CombinedTxnStatsIterator is an iterator that iterates through both
// in-memory and persisted txn stats provided by the in-memory iterator and
// the on-disk iterator.
type CombinedTxnStatsIterator struct {
nextToReadKey roachpb.TransactionFingerprintID
nextToReadVal *roachpb.CollectedTransactionStatistics
expectedColCnt int

mem struct {
canBeAdvanced bool
paused bool
it *memTxnStatsIterator
}

disk struct {
canBeAdvanced bool
paused bool
it sqlutil.InternalRows
}
}

// NewCombinedTxnStatsIterator returns a new instance of
// CombinedTxnStatsIterator.
func NewCombinedTxnStatsIterator(
memIter *memTxnStatsIterator, diskIter sqlutil.InternalRows, expectedColCnt int,
) *CombinedTxnStatsIterator {
c := &CombinedTxnStatsIterator{
expectedColCnt: expectedColCnt,
}

c.mem.it = memIter
c.mem.canBeAdvanced = true

c.disk.it = diskIter
c.disk.canBeAdvanced = true

return c
}

// Next increments the internal counter of the CombinedTxnStatsIterator. It
// returns true if the following Cur() call will be valid, false otherwise.
func (c *CombinedTxnStatsIterator) Next(ctx context.Context) (bool, error) {
var err error

if c.mem.canBeAdvanced && !c.mem.paused {
c.mem.canBeAdvanced = c.mem.it.Next()
}

if c.disk.canBeAdvanced && !c.disk.paused {
c.disk.canBeAdvanced, err = c.disk.it.Next(ctx)
if err != nil {
return false, err
}
}

// Both iterators are exhausted, no new value can be produced.
if !c.mem.canBeAdvanced && !c.disk.canBeAdvanced {
// Sanity check.
if c.mem.paused || c.disk.paused {
return false, errors.AssertionFailedf("bug: leaked iterator")
}
return false, nil
}

// If memIter is exhausted, but disk iterator can still move forward.
// We promote the disk.Cur() and resume the disk iterator if it was paused.
if !c.mem.canBeAdvanced {
row := c.disk.it.Cur()
if row == nil {
return false, errors.New("unexpected nil row")
}

if len(row) != c.expectedColCnt {
return false, errors.AssertionFailedf("unexpectedly received %d columns", len(row))
}

c.nextToReadKey, c.nextToReadVal, err = rowToTxnStats(c.disk.it.Cur())
if err != nil {
return false, err
}

if c.disk.canBeAdvanced {
c.disk.paused = false
}
return true, nil
}

// If diskIter is exhausted, but mem iterator can still move forward.
// We promote the mem.Cur() and resume the mem iterator if it was paused.
if !c.disk.canBeAdvanced {
c.nextToReadKey, c.nextToReadVal = c.mem.it.Cur()

if c.mem.canBeAdvanced {
c.mem.paused = false
}
return true, nil
}

// Both iterators can be moved forward. Now we check the value of Cur()
// for both iterators. We will have a few scenarios:
// 1. mem.Cur() < disk.Cur():
// we promote mem.Cur() to c.nextToRead. We then pause
// the disk iterator and resume the mem iterator for next iteration.
// 2. mem.Cur() == disk.Cur():
// we promote both mem.Cur() and disk.Cur() by merging both
// stats. We resume both iterators for next iteration.
// 3. mem.Cur() > disk.Cur():
// we promote disk.Cur() to c.nextToRead. We then pause
// mem iterator and resume disk iterator for next iteration.
memCurKey, memCurVal := c.mem.it.Cur()
diskCurKey, diskCurVal, err := rowToTxnStats(c.disk.it.Cur())
if err != nil {
return false, err
}

switch compareTxnStats(memCurKey, diskCurKey, memCurVal, diskCurVal) {
case -1:
// First Case.
c.nextToReadKey = memCurKey
c.nextToReadVal = memCurVal
c.mem.paused = false
c.disk.paused = true
case 0:
// Second Case.
c.nextToReadKey = memCurKey
c.nextToReadVal = memCurVal
c.nextToReadVal.Stats.Add(&diskCurVal.Stats)
c.mem.paused = false
c.disk.paused = false
case 1:
// Third Case.
c.nextToReadKey = diskCurKey
c.nextToReadVal = diskCurVal
c.mem.paused = true
c.disk.paused = false
default:
return false, errors.AssertionFailedf("bug: impossible state")
}

return true, nil
}

// Cur returns the roachpb.CollectedStatementStatistics at the current internal
// counter.
func (c *CombinedTxnStatsIterator) Cur() (
roachpb.TransactionFingerprintID,
*roachpb.CollectedTransactionStatistics,
) {
return c.nextToReadKey, c.nextToReadVal
}

func compareTxnStats(
lhsKey, rhsKey roachpb.TransactionFingerprintID, lhs, rhs *roachpb.CollectedTransactionStatistics,
) int {
// 1. we compare their aggregated_ts
if lhs.AggregatedTs.Before(rhs.AggregatedTs) {
return -1
}
if lhs.AggregatedTs.After(rhs.AggregatedTs) {
return 1
}

// 2. we compare their app name.
cmp := strings.Compare(lhs.App, rhs.App)
if cmp != 0 {
return cmp
}

// 3. we compared the transaction fingerprint ID.
if lhsKey < rhsKey {
return -1
}
if lhsKey > rhsKey {
return 1
}

return 0
}
30 changes: 30 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/mem_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,33 @@ func (m *memStmtStatsIterator) Cur() *roachpb.CollectedStatementStatistics {
c.AggregatedTs = m.aggregatedTs
return c
}

// memTxnStatsIterator wraps a sslocal.TxnStatsIterator. Since in-memory
// transaction statistics does not have aggregated_ts field populated,
// memTxnStatsIterator overrides the sslocal.TxnStatsIterator's Cur() method
// to populate the aggregated_ts field on the returning
// roachpb.CollectedTransactionStatistics.
type memTxnStatsIterator struct {
*sslocal.TxnStatsIterator
aggregatedTs time.Time
}

func newMemTxnStatsIterator(
stats *sslocal.SQLStats, options *sqlstats.IteratorOptions, aggregatedTS time.Time,
) *memTxnStatsIterator {
return &memTxnStatsIterator{
TxnStatsIterator: stats.TxnStatsIterator(options),
aggregatedTs: aggregatedTS,
}
}

// Cur calls the m.TxnStatsIterator.Cur() and populates the m.aggregatedTs
// field.
func (m *memTxnStatsIterator) Cur() (
roachpb.TransactionFingerprintID,
*roachpb.CollectedTransactionStatistics,
) {
id, stats := m.TxnStatsIterator.Cur()
stats.AggregatedTs = m.aggregatedTs
return id, stats
}
105 changes: 86 additions & 19 deletions pkg/sql/sqlstats/persistedsqlstats/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,56 @@ func TestPersistedSQLStatsRead(t *testing.T) {
}

foundQueries := make(map[string]struct{})
require.NoError(t, sqlStats.IterateStatementStats(context.Background(), &sqlstats.IteratorOptions{
SortedKey: true,
SortedAppNames: true,
}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
if expectedExecCount, ok := expectedStmtFingerprints[statistics.Key.Query]; ok {
_, ok = foundQueries[statistics.Key.Query]
require.False(t, ok, "should only found one stats entry for %s, but found more than one", statistics.Key.Query)
foundQueries[statistics.Key.Query] = struct{}{}
require.Equal(t, expectedExecCount, statistics.Stats.Count, "query: %s", statistics.Key.Query)
}
return nil
}))
foundTxns := make(map[string]struct{})
stmtFingerprintIDToQueries := make(map[roachpb.StmtFingerprintID]string)

require.NoError(t,
sqlStats.IterateStatementStats(
context.Background(),
&sqlstats.IteratorOptions{
SortedKey: true,
SortedAppNames: true,
},
func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
if expectedExecCount, ok := expectedStmtFingerprints[statistics.Key.Query]; ok {
_, ok = foundQueries[statistics.Key.Query]
require.False(
t,
ok,
"should only found one stats entry for %s, but found more than one", statistics.Key.Query,
)
foundQueries[statistics.Key.Query] = struct{}{}
stmtFingerprintIDToQueries[statistics.ID] = statistics.Key.Query
require.Equal(t, expectedExecCount, statistics.Stats.Count, "query: %s", statistics.Key.Query)
}
return nil
}))

require.NoError(t,
sqlStats.IterateTransactionStats(
context.Background(),
&sqlstats.IteratorOptions{},
func(
ctx context.Context,
id roachpb.TransactionFingerprintID,
statistics *roachpb.CollectedTransactionStatistics,
) error {
if len(statistics.StatementFingerprintIDs) == 1 {
if query, ok := stmtFingerprintIDToQueries[statistics.StatementFingerprintIDs[0]]; ok {
if expectedExecCount, ok := expectedStmtFingerprints[query]; ok {
_, ok = foundTxns[query]
require.False(
t,
ok,
"should only found one txn stats entry for %s, but found more than one", query,
)
foundTxns[query] = struct{}{}
require.Equal(t, expectedExecCount, statistics.Stats.Count)
}
}
}
return nil
}))

for expectedStmtFingerprint := range expectedStmtFingerprints {
_, ok := foundQueries[expectedStmtFingerprint]
Expand All @@ -112,16 +150,45 @@ func verifyStoredStmtFingerprints(
sqlStats *persistedsqlstats.PersistedSQLStats,
) {
foundQueries := make(map[string]struct{})
require.NoError(t, sqlStats.IterateStatementStats(context.Background(), &sqlstats.IteratorOptions{}, func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
if expectedExecCount, ok := expectedStmtFingerprints[statistics.Key.Query]; ok {
foundQueries[statistics.Key.Query] = struct{}{}
require.Equal(t, expectedExecCount, statistics.Stats.Count)
}
return nil
}))
foundTxns := make(map[string]struct{})
stmtFingerprintIDToQueries := make(map[roachpb.StmtFingerprintID]string)
require.NoError(t,
sqlStats.IterateStatementStats(
context.Background(),
&sqlstats.IteratorOptions{},
func(ctx context.Context, statistics *roachpb.CollectedStatementStatistics) error {
if expectedExecCount, ok := expectedStmtFingerprints[statistics.Key.Query]; ok {
foundQueries[statistics.Key.Query] = struct{}{}
stmtFingerprintIDToQueries[statistics.ID] = statistics.Key.Query
require.Equal(t, expectedExecCount, statistics.Stats.Count)
}
return nil
}))

require.NoError(t,
sqlStats.IterateTransactionStats(
context.Background(),
&sqlstats.IteratorOptions{},
func(
ctx context.Context,
id roachpb.TransactionFingerprintID,
statistics *roachpb.CollectedTransactionStatistics,
) error {
if len(statistics.StatementFingerprintIDs) == 1 {
if query, ok := stmtFingerprintIDToQueries[statistics.StatementFingerprintIDs[0]]; ok {
if expectedExecCount, ok := expectedStmtFingerprints[query]; ok {
foundTxns[query] = struct{}{}
require.Equal(t, expectedExecCount, statistics.Stats.Count)
}
}
}
return nil
}))

for expectedStmtFingerprint := range expectedStmtFingerprints {
_, ok := foundQueries[expectedStmtFingerprint]
require.True(t, ok, "expected %s to be returned, but it didn't", expectedStmtFingerprint)
_, ok = foundTxns[expectedStmtFingerprint]
require.True(t, ok, "expected %s to be returned, but it didn't", expectedStmtFingerprint)
}
}
Loading

0 comments on commit e179dcf

Please sign in to comment.