diff --git a/go/vt/topo/keyspace_routing_rules_lock.go b/go/vt/topo/keyspace_routing_rules_lock.go index fae32016ef0..cd7dda75bd4 100644 --- a/go/vt/topo/keyspace_routing_rules_lock.go +++ b/go/vt/topo/keyspace_routing_rules_lock.go @@ -36,10 +36,15 @@ func createTopoDirIfNeeded(ctx context.Context, ts *Server) error { topoPath := path.Join(KeyspaceRoutingRulesPath, "lock") _, _, err := ts.GetGlobalCell().Get(ctx, topoPath) if IsErrType(err, NoNode) { - log.Infof("Creating keyspace routing rules file %s", topoPath) _, err = ts.globalCell.Create(ctx, topoPath, []byte("lock file for keyspace routing rules")) + if IsErrType(err, NodeExists) { + // Another process created the file, which is fine. + return nil + } if err != nil { log.Errorf("Failed to create keyspace routing rules lock file: %v", err) + } else { + log.Infof("Successfully created keyspace routing rules lock file %s", topoPath) } } return err diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index acd6a1432ce..5df748b807c 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -865,7 +865,6 @@ func changeKeyspaceRouting(ctx context.Context, ts *topo.Server, tabletTypes []t // updateKeyspaceRoutingRule updates the keyspace routing rule for the (effective) source keyspace to the target keyspace. func updateKeyspaceRoutingRule(ctx context.Context, ts *topo.Server, sourceKeyspace string, routes map[string]string) error { - log.Infof("Updating keyspace routing rules for keyspace %s", sourceKeyspace) err := topotools.SaveKeyspaceRoutingRulesLocked(ctx, ts, "ApplyKeyspaceRoutingRules", func(ctx context.Context) error { rules, err := topotools.GetKeyspaceRoutingRules(ctx, ts) @@ -885,8 +884,6 @@ func updateKeyspaceRoutingRule(ctx context.Context, ts *topo.Server, sourceKeysp }) if err != nil { log.Errorf("Failed to update keyspace routing rules for keyspace %s: %v", sourceKeyspace, err) - } else { - log.Infof("Successfully updated keyspace routing rules for keyspace %s", sourceKeyspace) } return err } diff --git a/go/vt/vtctl/workflow/utils_test.go b/go/vt/vtctl/workflow/utils_test.go index 6590a0fdad4..e65875a0faa 100644 --- a/go/vt/vtctl/workflow/utils_test.go +++ b/go/vt/vtctl/workflow/utils_test.go @@ -2,7 +2,14 @@ package workflow import ( "context" + "fmt" + "math" + "math/rand" + "sync" "testing" + "time" + + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topotools" @@ -11,16 +18,72 @@ import ( "vitess.io/vitess/go/vt/topo/memorytopo" ) +// TestUpdateKeyspaceRoutingRule confirms that the keyspace routing rules are updated correctly. func TestUpdateKeyspaceRoutingRule(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() ts := memorytopo.NewServer(ctx, "zone1") defer ts.Close() routes := make(map[string]string) - routes["from"] = "to" + for _, tabletType := range tabletTypeSuffixes { + routes["from"+tabletType] = "to" + } err := updateKeyspaceRoutingRule(ctx, ts, "ks", routes) require.NoError(t, err) rules, err := topotools.GetKeyspaceRoutingRules(ctx, ts) require.NoError(t, err) require.EqualValues(t, routes, rules) } + +// TestConcurrentKeyspaceRoutingRulesUpdates runs multiple keyspace routing rules updates concurrently to test +// the locking mechanism. +func TestConcurrentKeyspaceRoutingRulesUpdates(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts := memorytopo.NewServer(ctx, "zone1") + defer ts.Close() + + concurrency := 100 + duration := 3 * time.Second + + var wg sync.WaitGroup + wg.Add(concurrency) + + stop := make(chan struct{}) + + for i := 0; i < concurrency; i++ { + go func(id int) { + defer wg.Done() + for { + select { + case <-stop: + return + default: + update(t, ts, id) + } + } + }(i) + } + <-time.After(duration) + close(stop) + wg.Wait() +} + +func update(t *testing.T, ts *topo.Server, id int) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s := fmt.Sprintf("%d_%d", id, rand.Intn(math.MaxInt)) + routes := make(map[string]string) + for _, tabletType := range tabletTypeSuffixes { + from := fmt.Sprintf("from%s%s", s, tabletType) + routes[from] = s + tabletType + } + err := updateKeyspaceRoutingRule(ctx, ts, "ks", routes) + require.NoError(t, err) + got, err := topotools.GetKeyspaceRoutingRules(ctx, ts) + require.NoError(t, err) + for _, tabletType := range tabletTypeSuffixes { + from := fmt.Sprintf("from%s%s", s, tabletType) + require.Equal(t, s+tabletType, got[from]) + } +}