From 6a886a1dba5f8199db036e4e7d4c7d59b119383b Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 1 Mar 2024 16:38:38 +0900 Subject: [PATCH 1/3] fix update labels Signed-off-by: you06 --- pkg/domain/infosync/BUILD.bazel | 2 +- pkg/domain/infosync/info.go | 32 ++++++++++ pkg/domain/infosync/info_test.go | 45 ++++++++++++++ pkg/server/handler/tests/BUILD.bazel | 3 +- pkg/server/handler/tests/http_handler_test.go | 60 +++++++++++++++++++ .../handler/tikvhandler/tikv_handler.go | 5 ++ 6 files changed, 145 insertions(+), 2 deletions(-) diff --git a/pkg/domain/infosync/BUILD.bazel b/pkg/domain/infosync/BUILD.bazel index 2c34d101101d2..b55c30363851d 100644 --- a/pkg/domain/infosync/BUILD.bazel +++ b/pkg/domain/infosync/BUILD.bazel @@ -63,7 +63,7 @@ go_test( srcs = ["info_test.go"], embed = [":infosync"], flaky = True, - shard_count = 3, + shard_count = 4, deps = [ "//pkg/ddl/placement", "//pkg/ddl/util", diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index d7c78fd852f6d..71cf23e4f09b1 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -405,6 +405,38 @@ func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) { return is.getAllServerInfo(ctx) } +func UpdateServerLabel(ctx context.Context, labels map[string]string) error { + is, err := getGlobalInfoSyncer() + if err != nil { + return err + } + // when etcdCli is nil, the server infos are generated from the latest config, no need to update. + if is.etcdCli == nil { + return nil + } + selfInfo, err := is.getServerInfoByID(ctx, is.info.ID) + if err != nil { + return err + } + changed := false + for k, v := range labels { + if selfInfo.Labels[k] != v { + changed = true + selfInfo.Labels[k] = v + } + } + if !changed { + return nil + } + infoBuf, err := selfInfo.Marshal() + if err != nil { + return errors.Trace(err) + } + str := string(hack.String(infoBuf)) + err = util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.serverInfoPath, str, clientv3.WithLease(is.session.Lease())) + return err +} + // DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress. func DeleteTiFlashTableSyncProgress(tableInfo *model.TableInfo) error { is, err := getGlobalInfoSyncer() diff --git a/pkg/domain/infosync/info_test.go b/pkg/domain/infosync/info_test.go index 7896e6e582251..f6d305eae1396 100644 --- a/pkg/domain/infosync/info_test.go +++ b/pkg/domain/infosync/info_test.go @@ -15,10 +15,12 @@ package infosync import ( + "bytes" "context" "encoding/json" "errors" "fmt" + "net/http" "os" "path" "runtime" @@ -280,3 +282,46 @@ func TestTiFlashManager(t *testing.T) { CloseTiFlashManager(ctx) } + +func TestUpdateServerInfoLabels(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + integration.BeforeTestExternal(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + currentID := "test" + + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + + client := cluster.RandClient() + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/infosync/mockServerInfo", "return(true)")) + defer func() { + err := failpoint.Disable("github.com/pingcap/tidb/pkg/domain/infosync/mockServerInfo") + require.NoError(t, err) + }() + + info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, nil, keyspace.CodecV1, false) + require.NoError(t, err) + + getLabels := func() map[string]string { + servers, err := info.getAllServerInfo(ctx) + require.NoError(t, err) + server, ok := servers[currentID] + require.True(t, ok) + return server.Labels + } + updateLabels := func(labels map[string]string) { + buffer := bytes.NewBuffer([]byte{}) + require.Nil(t, json.NewEncoder(buffer).Encode(labels)) + url := fmt.Sprintf("http://localhost:%d%s", 4000, "/labels") + resp, err := http.Post(url, "application/json", buffer) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + } + updateLabels(map[string]string{"zone": "east-1"}) + _ = getLabels() +} diff --git a/pkg/server/handler/tests/BUILD.bazel b/pkg/server/handler/tests/BUILD.bazel index f992b3170f853..e7ec8a26d0fbd 100644 --- a/pkg/server/handler/tests/BUILD.bazel +++ b/pkg/server/handler/tests/BUILD.bazel @@ -9,7 +9,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 36, + shard_count = 37, deps = [ "//pkg/config", "//pkg/ddl", @@ -54,6 +54,7 @@ go_test( "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", + "@io_etcd_go_etcd_tests_v3//integration", "@org_uber_go_goleak//:goleak", "@org_uber_go_zap//:zap", ], diff --git a/pkg/server/handler/tests/http_handler_test.go b/pkg/server/handler/tests/http_handler_test.go index b04f10f3379a0..217a988f410cd 100644 --- a/pkg/server/handler/tests/http_handler_test.go +++ b/pkg/server/handler/tests/http_handler_test.go @@ -16,6 +16,7 @@ package tests import ( "bytes" + "context" "crypto/tls" "crypto/x509" "crypto/x509/pkix" @@ -66,6 +67,7 @@ import ( "github.com/pingcap/tidb/pkg/util/rowcodec" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" + "go.etcd.io/etcd/tests/v3/integration" "go.uber.org/zap" ) @@ -1202,6 +1204,64 @@ func TestSetLabels(t *testing.T) { }) } +func TestSetLabelsWithEtcd(t *testing.T) { + ts := createBasicHTTPHandlerTestSuite() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts.startServer(t) + defer ts.stopServer(t) + + integration.BeforeTestExternal(t) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + client := cluster.RandClient() + infosync.SetEtcdClient(client) + ts.domain.InfoSyncer().Restart(ctx) + + testUpdateLabels := func(labels, expected map[string]string) { + buffer := bytes.NewBuffer([]byte{}) + require.Nil(t, json.NewEncoder(buffer).Encode(labels)) + resp, err := ts.PostStatus("/labels", "application/json", buffer) + require.NoError(t, err) + require.NotNil(t, resp) + defer func() { + require.NoError(t, resp.Body.Close()) + }() + require.Equal(t, http.StatusOK, resp.StatusCode) + newLabels := config.GetGlobalConfig().Labels + require.Equal(t, newLabels, expected) + servers, err := infosync.GetAllServerInfo(ctx) + require.NoError(t, err) + for _, server := range servers { + for k, expectV := range expected { + v, ok := server.Labels[k] + require.True(t, ok) + require.Equal(t, expectV, v) + } + return + } + require.Fail(t, "no server found") + } + + labels := map[string]string{ + "zone": "us-west-1", + "test": "123", + } + testUpdateLabels(labels, labels) + + updated := map[string]string{ + "zone": "bj-1", + } + labels["zone"] = "bj-1" + testUpdateLabels(updated, labels) + + // reset the global variable + config.UpdateGlobal(func(conf *config.Config) { + conf.Labels = map[string]string{} + }) +} + func TestSetLabelsConcurrentWithGetLabel(t *testing.T) { ts := createBasicHTTPHandlerTestSuite() diff --git a/pkg/server/handler/tikvhandler/tikv_handler.go b/pkg/server/handler/tikvhandler/tikv_handler.go index 7dc3029254bfd..aa525016a5ce9 100644 --- a/pkg/server/handler/tikvhandler/tikv_handler.go +++ b/pkg/server/handler/tikvhandler/tikv_handler.go @@ -2000,6 +2000,11 @@ func (LabelHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } } } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if err := infosync.UpdateServerLabel(ctx, labels); err != nil { + logutil.BgLogger().Error("update etcd labels failed", zap.Any("labels", cfg.Labels), zap.Error(err)) + } + cancel() cfg.Labels = labels config.StoreGlobalConfig(&cfg) logutil.BgLogger().Info("update server labels", zap.Any("labels", cfg.Labels)) From 5191fee5bb0979222e689541b8fd1b3b88b3fb6a Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 1 Mar 2024 16:53:26 +0900 Subject: [PATCH 2/3] remove unused test Signed-off-by: you06 --- pkg/domain/infosync/BUILD.bazel | 2 +- pkg/domain/infosync/info_test.go | 45 -------------------------------- 2 files changed, 1 insertion(+), 46 deletions(-) diff --git a/pkg/domain/infosync/BUILD.bazel b/pkg/domain/infosync/BUILD.bazel index b55c30363851d..2c34d101101d2 100644 --- a/pkg/domain/infosync/BUILD.bazel +++ b/pkg/domain/infosync/BUILD.bazel @@ -63,7 +63,7 @@ go_test( srcs = ["info_test.go"], embed = [":infosync"], flaky = True, - shard_count = 4, + shard_count = 3, deps = [ "//pkg/ddl/placement", "//pkg/ddl/util", diff --git a/pkg/domain/infosync/info_test.go b/pkg/domain/infosync/info_test.go index f6d305eae1396..7896e6e582251 100644 --- a/pkg/domain/infosync/info_test.go +++ b/pkg/domain/infosync/info_test.go @@ -15,12 +15,10 @@ package infosync import ( - "bytes" "context" "encoding/json" "errors" "fmt" - "net/http" "os" "path" "runtime" @@ -282,46 +280,3 @@ func TestTiFlashManager(t *testing.T) { CloseTiFlashManager(ctx) } - -func TestUpdateServerInfoLabels(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") - } - integration.BeforeTestExternal(t) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - currentID := "test" - - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer cluster.Terminate(t) - - client := cluster.RandClient() - - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/infosync/mockServerInfo", "return(true)")) - defer func() { - err := failpoint.Disable("github.com/pingcap/tidb/pkg/domain/infosync/mockServerInfo") - require.NoError(t, err) - }() - - info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, nil, keyspace.CodecV1, false) - require.NoError(t, err) - - getLabels := func() map[string]string { - servers, err := info.getAllServerInfo(ctx) - require.NoError(t, err) - server, ok := servers[currentID] - require.True(t, ok) - return server.Labels - } - updateLabels := func(labels map[string]string) { - buffer := bytes.NewBuffer([]byte{}) - require.Nil(t, json.NewEncoder(buffer).Encode(labels)) - url := fmt.Sprintf("http://localhost:%d%s", 4000, "/labels") - resp, err := http.Post(url, "application/json", buffer) - require.NoError(t, err) - require.Equal(t, http.StatusOK, resp.StatusCode) - } - updateLabels(map[string]string{"zone": "east-1"}) - _ = getLabels() -} From 7acdc3238a7289ef63237f557c26bf734687665f Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 1 Mar 2024 17:08:15 +0900 Subject: [PATCH 3/3] lint Signed-off-by: you06 --- pkg/domain/infosync/info.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 71cf23e4f09b1..b20a0a18f1262 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -405,6 +405,7 @@ func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) { return is.getAllServerInfo(ctx) } +// UpdateServerLabel updates the server label for global info syncer. func UpdateServerLabel(ctx context.Context, labels map[string]string) error { is, err := getGlobalInfoSyncer() if err != nil {