Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race in replicationLagModule of go/vt/throttle #16078

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions go/vt/throttler/replication_lag_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package throttler

import (
"sort"
"sync"
"time"

"vitess.io/vitess/go/vt/discovery"
Expand All @@ -30,6 +31,8 @@ type replicationLagCache struct {
// The map key is replicationLagRecord.LegacyTabletStats.Key.
entries map[string]*replicationLagHistory

mu sync.Mutex
mattlord marked this conversation as resolved.
Show resolved Hide resolved

// slowReplicas is a set of slow replicas.
// The map key is replicationLagRecord.LegacyTabletStats.Key.
// This map will always be recomputed by sortByLag() and must not be modified
Expand Down Expand Up @@ -60,6 +63,9 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache

// add inserts or updates "r" in the cache for the replica with the key "r.Key".
func (c *replicationLagCache) add(r replicationLagRecord) {
c.mu.Lock()
defer c.mu.Unlock()

if !r.Serving {
// Tablet is down. Do no longer track it.
delete(c.entries, discovery.TabletToMapKey(r.Tablet))
Expand All @@ -76,9 +82,35 @@ func (c *replicationLagCache) add(r replicationLagRecord) {
entry.add(r)
}

// maxLag returns the maximum replication lag for the entries in cache.
func (c *replicationLagCache) maxLag() (maxLag uint32) {
Copy link
Contributor Author

@timvaillancourt timvaillancourt Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this mostly-unchanged from throttler.go (.MaxLag(...)) instead of holding the mu sync.Mutex lock from a different struct

.MaxLag() now calls this func

c.mu.Lock()
defer c.mu.Unlock()

for key := range c.entries {
if c.isIgnored(key) {
continue
}

entry, ok := c.entries[key]
if !ok {
continue
}
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved

latest := entry.latest()
if lag := latest.Stats.ReplicationLagSeconds; lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// latest returns the current lag record for the given LegacyTabletStats.Key string.
// A zero record is returned if there is no latest entry.
func (c *replicationLagCache) latest(key string) replicationLagRecord {
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.entries[key]
if !ok {
return replicationLagRecord{}
Expand All @@ -90,6 +122,8 @@ func (c *replicationLagCache) latest(key string) replicationLagRecord {
// or just after it.
// If there is no such record, a zero record is returned.
func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord {
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.entries[key]
if !ok {
return replicationLagRecord{}
Expand All @@ -100,6 +134,9 @@ func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLag
// sortByLag sorts all replicas by their latest replication lag value and
// tablet uid and updates the c.slowReplicas set.
func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumReplicationLag int64) {
c.mu.Lock()
defer c.mu.Unlock()

// Reset the current list of ignored replicas.
c.slowReplicas = make(map[string]bool)

Expand Down Expand Up @@ -142,6 +179,9 @@ func (a byLagAndTabletUID) Less(i, j int) bool {
// this slow replica.
// "key" refers to ReplicationLagRecord.LegacyTabletStats.Key.
func (c *replicationLagCache) ignoreSlowReplica(key string) bool {
c.mu.Lock()
defer c.mu.Unlock()

if len(c.slowReplicas) == 0 {
// No slow replicas at all.
return false
Expand Down
7 changes: 7 additions & 0 deletions go/vt/throttler/replication_lag_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,10 @@ func TestReplicationLagCache_SortByLag(t *testing.T) {

require.True(t, c.slowReplicas[r1Key], "r1 should be tracked as a slow replica")
}

func TestReplicationLagCache_MaxLag(t *testing.T) {
c := newReplicationLagCache(2)
c.add(lagRecord(sinceZero(1*time.Second), r1, 30))
c.add(lagRecord(sinceZero(1*time.Second), r2, 1))
require.Equal(t, uint32(30), c.maxLag())
}
18 changes: 3 additions & 15 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,10 @@ func (t *ThrottlerImpl) Throttle(threadID int) time.Duration {
// the provided type, excluding ignored tablets.
func (t *ThrottlerImpl) MaxLag(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
if cache == nil {
return 0
}

return maxLag
return cache.maxLag()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved into replicationLagCache

}

// ThreadFinished marks threadID as finished and redistributes the thread's
Expand Down
79 changes: 79 additions & 0 deletions go/vt/throttler/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
package throttler

import (
"context"
"runtime"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

// The main purpose of the benchmarks below is to demonstrate the functionality
Expand Down Expand Up @@ -402,3 +408,76 @@
}()
throttler.ThreadFinished(0)
}

func TestThrottlerMaxLag(t *testing.T) {
fc := &fakeClock{}
throttler, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now)
require.NoError(t, err)
defer throttler.Close()

require.NotNil(t, throttler)
require.NotNil(t, throttler.maxReplicationLagModule)

Check failure on line 419 in go/vt/throttler/throttler_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

throttler.maxReplicationLagModule undefined (type Throttler has no field or method maxReplicationLagModule)

ctx, cancel := context.WithCancel(context.Background())
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
var wg sync.WaitGroup

// run .add() and .MaxLag() concurrently to detect races
for _, tabletType := range []topodata.TabletType{
topodata.TabletType_REPLICA,
topodata.TabletType_RDONLY,
} {
wg.Add(1)
go func(wg *sync.WaitGroup, ctx context.Context, t *Throttler, tabletType topodata.TabletType) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
throttler.MaxLag(tabletType)
}
}
Comment on lines +438 to +445
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a busy loop. potentially running thousands of time in the span of 1sec. Is this intentional? I'm thinking there should be a ticker to introduce some forced delay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomi-noach yes, these busy loops were used to reproduce the race in unit tests, before I fixed the race

}(&wg, ctx, throttler, tabletType)

Check failure on line 440 in go/vt/throttler/throttler_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

cannot use throttler (variable of type Throttler) as *Throttler value in argument to func(wg *sync.WaitGroup, ctx context.Context, t *Throttler, tabletType topodata.TabletType) {…}: Throttler does not implement *Throttler (type *Throttler is pointer to interface, not interface)
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved

wg.Add(1)
go func(wg *sync.WaitGroup, ctx context.Context, throttler *Throttler, tabletType topodata.TabletType) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
Comment on lines +451 to +455
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a busy loop. Is this intentional? For the duration of 1sec this loop could run potentially thousands of time as there is no sleep. Is there an objective to this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomi-noach yes, the intent here was to call .MaxLag() as frequently as possible, because the race needs decent concurrency to occur

cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType)

Check failure on line 450 in go/vt/throttler/throttler_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

throttler.maxReplicationLagModule undefined (type *Throttler is pointer to interface, not interface)
require.NotNil(t, cache)
cache.add(replicationLagRecord{
time: time.Now(),
TabletHealth: discovery.TabletHealth{
Serving: true,
Stats: &query.RealtimeStats{
ReplicationLagSeconds: 5,
},
Tablet: &topodata.Tablet{
Hostname: t.Name(),
Type: tabletType,
PortMap: map[string]int32{
"test": 15999,
},
},
},
})
}
}
}(&wg, ctx, throttler, tabletType)

Check failure on line 470 in go/vt/throttler/throttler_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

cannot use throttler (variable of type Throttler) as *Throttler value in argument to func(wg *sync.WaitGroup, ctx context.Context, throttler *Throttler, tabletType topodata.TabletType) {…}: Throttler does not implement *Throttler (type *Throttler is pointer to interface, not interface) (typecheck)
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
}
time.Sleep(time.Second)
cancel()
wg.Wait()

// check .MaxLag()
for _, tabletType := range []topodata.TabletType{
topodata.TabletType_REPLICA,
topodata.TabletType_RDONLY,
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
} {
require.Equal(t, uint32(5), throttler.MaxLag(tabletType))
}
}
Loading