Skip to content

Commit

Permalink
Add test to confirm that concurrent updates don't overwrite each other
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed May 1, 2024
1 parent 52f94d0 commit b284b26
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 5 deletions.
7 changes: 6 additions & 1 deletion go/vt/topo/keyspace_routing_rules_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ func createTopoDirIfNeeded(ctx context.Context, ts *Server) error {
topoPath := path.Join(KeyspaceRoutingRulesPath, "lock")
_, _, err := ts.GetGlobalCell().Get(ctx, topoPath)
if IsErrType(err, NoNode) {
log.Infof("Creating keyspace routing rules file %s", topoPath)
_, err = ts.globalCell.Create(ctx, topoPath, []byte("lock file for keyspace routing rules"))
if IsErrType(err, NodeExists) {
// Another process created the file, which is fine.
return nil
}
if err != nil {
log.Errorf("Failed to create keyspace routing rules lock file: %v", err)
} else {
log.Infof("Successfully created keyspace routing rules lock file %s", topoPath)
}
}
return err
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,6 @@ func changeKeyspaceRouting(ctx context.Context, ts *topo.Server, tabletTypes []t

// updateKeyspaceRoutingRule updates the keyspace routing rule for the (effective) source keyspace to the target keyspace.
func updateKeyspaceRoutingRule(ctx context.Context, ts *topo.Server, sourceKeyspace string, routes map[string]string) error {
log.Infof("Updating keyspace routing rules for keyspace %s", sourceKeyspace)
err := topotools.SaveKeyspaceRoutingRulesLocked(ctx, ts, "ApplyKeyspaceRoutingRules",
func(ctx context.Context) error {
rules, err := topotools.GetKeyspaceRoutingRules(ctx, ts)
Expand All @@ -885,8 +884,6 @@ func updateKeyspaceRoutingRule(ctx context.Context, ts *topo.Server, sourceKeysp
})
if err != nil {
log.Errorf("Failed to update keyspace routing rules for keyspace %s: %v", sourceKeyspace, err)
} else {
log.Infof("Successfully updated keyspace routing rules for keyspace %s", sourceKeyspace)
}
return err
}
Expand Down
65 changes: 64 additions & 1 deletion go/vt/vtctl/workflow/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@ package workflow

import (
"context"
"fmt"
"math"
"math/rand"
"sync"
"testing"
"time"

"vitess.io/vitess/go/vt/topo"

"vitess.io/vitess/go/vt/topotools"

Expand All @@ -11,16 +18,72 @@ import (
"vitess.io/vitess/go/vt/topo/memorytopo"
)

// TestUpdateKeyspaceRoutingRule confirms that the keyspace routing rules are updated correctly.
func TestUpdateKeyspaceRoutingRule(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts := memorytopo.NewServer(ctx, "zone1")
defer ts.Close()
routes := make(map[string]string)
routes["from"] = "to"
for _, tabletType := range tabletTypeSuffixes {
routes["from"+tabletType] = "to"
}
err := updateKeyspaceRoutingRule(ctx, ts, "ks", routes)
require.NoError(t, err)
rules, err := topotools.GetKeyspaceRoutingRules(ctx, ts)
require.NoError(t, err)
require.EqualValues(t, routes, rules)
}

// TestConcurrentKeyspaceRoutingRulesUpdates runs multiple keyspace routing rules updates concurrently to test
// the locking mechanism.
func TestConcurrentKeyspaceRoutingRulesUpdates(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts := memorytopo.NewServer(ctx, "zone1")
defer ts.Close()

concurrency := 100
duration := 3 * time.Second

var wg sync.WaitGroup
wg.Add(concurrency)

stop := make(chan struct{})

for i := 0; i < concurrency; i++ {
go func(id int) {
defer wg.Done()
for {
select {
case <-stop:
return
default:
update(t, ts, id)
}
}
}(i)
}
<-time.After(duration)
close(stop)
wg.Wait()
}

func update(t *testing.T, ts *topo.Server, id int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := fmt.Sprintf("%d_%d", id, rand.Intn(math.MaxInt))
routes := make(map[string]string)
for _, tabletType := range tabletTypeSuffixes {
from := fmt.Sprintf("from%s%s", s, tabletType)
routes[from] = s + tabletType
}
err := updateKeyspaceRoutingRule(ctx, ts, "ks", routes)
require.NoError(t, err)
got, err := topotools.GetKeyspaceRoutingRules(ctx, ts)
require.NoError(t, err)
for _, tabletType := range tabletTypeSuffixes {
from := fmt.Sprintf("from%s%s", s, tabletType)
require.Equal(t, s+tabletType, got[from])
}
}

0 comments on commit b284b26

Please sign in to comment.