Skip to content

Commit

Permalink
cluster: fix the merging issue of labels after store reboot (#6468) (#…
Browse files Browse the repository at this point in the history
…6470)

ref #5510, close #6467, ref #6468

#5510 introduced a bug that would cause the store labels to be overwritten wrongly after the store reboot.
This PR fixed this issue.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: JmPotato <[email protected]>
  • Loading branch information
ti-chi-bot and JmPotato authored May 15, 2023
1 parent f1b5566 commit dbfbdac
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 14 deletions.
25 changes: 11 additions & 14 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,21 +1254,15 @@ func (c *RaftCluster) GetRangeHoles() [][]string {
}

// UpdateStoreLabels updates a store's location labels
// If 'force' is true, then update the store's labels forcibly.
// If 'force' is true, the origin labels will be overwritten with the new one forcibly.
func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLabel, force bool) error {
store := c.GetStore(storeID)
if store == nil {
return errs.ErrInvalidStoreID.FastGenByArgs(storeID)
}
newStore := typeutil.DeepClone(store.GetMeta(), core.StoreFactory)
if force {
newStore.Labels = labels
} else {
// If 'force' isn't set, the given labels will merge into those labels which already existed in the store.
newStore.Labels = core.MergeLabels(newStore.GetLabels(), labels)
}
// PutStore will perform label merge.
return c.putStoreImpl(newStore)
newStore.Labels = labels
return c.putStoreImpl(newStore, force)
}

// DeleteStoreLabel updates a store's location labels
Expand All @@ -1289,13 +1283,12 @@ func (c *RaftCluster) DeleteStoreLabel(storeID uint64, labelKey string) error {
return errors.Errorf("the label key %s does not exist", labelKey)
}
newStore.Labels = labels
// PutStore will perform label merge.
return c.putStoreImpl(newStore)
return c.putStoreImpl(newStore, true)
}

// PutStore puts a store.
func (c *RaftCluster) PutStore(store *metapb.Store) error {
if err := c.putStoreImpl(store); err != nil {
if err := c.putStoreImpl(store, false); err != nil {
return err
}
c.OnStoreVersionChange()
Expand All @@ -1304,8 +1297,9 @@ func (c *RaftCluster) PutStore(store *metapb.Store) error {
}

// putStoreImpl puts a store.
// If 'force' is true, then overwrite the store's labels.
func (c *RaftCluster) putStoreImpl(store *metapb.Store) error {
// If 'force' is true, the store's labels will overwrite those labels which already existed in the store.
// If 'force' is false, the store's labels will merge into those labels which already existed in the store.
func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error {
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -1335,6 +1329,9 @@ func (c *RaftCluster) putStoreImpl(store *metapb.Store) error {
} else {
// Use the given labels to update the store.
labels := store.GetLabels()
if !force {
labels = core.MergeLabels(s.GetLabels(), labels)
}
// Update an existed store.
s = s.Clone(
core.SetStoreAddress(store.Address, store.StatusAddress, store.PeerAddress),
Expand Down
109 changes: 109 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/pkg/versioninfo"
"github.com/tikv/pd/server/config"
)
Expand Down Expand Up @@ -1851,6 +1852,114 @@ func TestAwakenStore(t *testing.T) {
}
}

func TestUpdateAndDeleteLabel(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
stores := newTestStores(1, "6.5.1")
for _, store := range stores {
re.NoError(cluster.PutStore(store.GetMeta()))
}
re.Empty(cluster.GetStore(1).GetLabels())
// Update label.
cluster.UpdateStoreLabels(
1,
[]*metapb.StoreLabel{
{Key: "zone", Value: "zone1"},
{Key: "host", Value: "host1"},
},
false,
)
re.Equal(
[]*metapb.StoreLabel{
{Key: "zone", Value: "zone1"},
{Key: "host", Value: "host1"},
},
cluster.GetStore(1).GetLabels(),
)
// Update label again.
cluster.UpdateStoreLabels(
1,
[]*metapb.StoreLabel{
{Key: "mode", Value: "readonly"},
},
false,
)
// Update label with empty value.
cluster.UpdateStoreLabels(
1,
[]*metapb.StoreLabel{},
false,
)
re.Equal(
[]*metapb.StoreLabel{
{Key: "zone", Value: "zone1"},
{Key: "host", Value: "host1"},
{Key: "mode", Value: "readonly"},
},
cluster.GetStore(1).GetLabels(),
)
// Delete label.
err = cluster.DeleteStoreLabel(1, "mode")
re.NoError(err)
re.Equal(
[]*metapb.StoreLabel{
{Key: "zone", Value: "zone1"},
{Key: "host", Value: "host1"},
},
cluster.GetStore(1).GetLabels(),
)
// Delete a non-exist label.
err = cluster.DeleteStoreLabel(1, "mode")
re.Error(err)
re.Equal(
[]*metapb.StoreLabel{
{Key: "zone", Value: "zone1"},
{Key: "host", Value: "host1"},
},
cluster.GetStore(1).GetLabels(),
)
// Update label without force.
cluster.UpdateStoreLabels(
1,
[]*metapb.StoreLabel{},
false,
)
re.Equal(
[]*metapb.StoreLabel{
{Key: "zone", Value: "zone1"},
{Key: "host", Value: "host1"},
},
cluster.GetStore(1).GetLabels(),
)
// Update label with force.
cluster.UpdateStoreLabels(
1,
[]*metapb.StoreLabel{},
true,
)
re.Empty(cluster.GetStore(1).GetLabels())
// Update label first and then reboot the store.
cluster.UpdateStoreLabels(
1,
[]*metapb.StoreLabel{{Key: "mode", Value: "readonly"}},
false,
)
re.Equal([]*metapb.StoreLabel{{Key: "mode", Value: "readonly"}}, cluster.GetStore(1).GetLabels())
// Mock the store doesn't have any label configured.
newStore := typeutil.DeepClone(cluster.GetStore(1).GetMeta(), core.StoreFactory)
newStore.Labels = nil
// Store rebooting will call PutStore.
err = cluster.PutStore(newStore)
re.NoError(err)
// Check the label after rebooting.
re.Equal([]*metapb.StoreLabel{{Key: "mode", Value: "readonly"}}, cluster.GetStore(1).GetLabels())
}

type testCluster struct {
*RaftCluster
}
Expand Down

0 comments on commit dbfbdac

Please sign in to comment.