Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#51451
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
you06 authored and ti-chi-bot committed Mar 7, 2024
1 parent 1dc6edf commit e86a505
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 0 deletions.
33 changes: 33 additions & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,39 @@ func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
return is.getAllServerInfo(ctx)
}

// UpdateServerLabel updates the server label for global info syncer.
func UpdateServerLabel(ctx context.Context, labels map[string]string) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return err
}
// when etcdCli is nil, the server infos are generated from the latest config, no need to update.
if is.etcdCli == nil {
return nil
}
selfInfo, err := is.getServerInfoByID(ctx, is.info.ID)
if err != nil {
return err
}
changed := false
for k, v := range labels {
if selfInfo.Labels[k] != v {
changed = true
selfInfo.Labels[k] = v
}
}
if !changed {
return nil
}
infoBuf, err := selfInfo.Marshal()
if err != nil {
return errors.Trace(err)
}
str := string(hack.String(infoBuf))
err = util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.serverInfoPath, str, clientv3.WithLease(is.session.Lease()))
return err
}

// DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress.
func DeleteTiFlashTableSyncProgress(tableInfo *model.TableInfo) error {
is, err := getGlobalInfoSyncer()
Expand Down
61 changes: 61 additions & 0 deletions pkg/server/handler/tests/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "tests_test",
timeout = "short",
srcs = [
"http_handler_serial_test.go",
"http_handler_test.go",
"main_test.go",
],
flaky = True,
shard_count = 37,
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/util",
"//pkg/domain",
"//pkg/domain/infosync",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/meta",
"//pkg/metrics",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/planner/core",
"//pkg/server",
"//pkg/server/handler",
"//pkg/server/handler/optimizor",
"//pkg/server/handler/tikvhandler",
"//pkg/server/internal/testserverclient",
"//pkg/server/internal/testutil",
"//pkg/server/internal/util",
"//pkg/session",
"//pkg/sessionctx",
"//pkg/sessionctx/binloginfo",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/store/helper",
"//pkg/store/mockstore",
"//pkg/store/mockstore/unistore",
"//pkg/tablecodec",
"//pkg/testkit",
"//pkg/testkit/external",
"//pkg/testkit/testsetup",
"//pkg/types",
"//pkg/util/codec",
"//pkg/util/deadlockhistory",
"//pkg/util/rowcodec",
"//pkg/util/topsql/state",
"//pkg/util/versioninfo",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
],
)
5 changes: 5 additions & 0 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,11 @@ func (h labelHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := infosync.UpdateServerLabel(ctx, labels); err != nil {
logutil.BgLogger().Error("update etcd labels failed", zap.Any("labels", cfg.Labels), zap.Error(err))
}
cancel()
cfg.Labels = labels
config.StoreGlobalConfig(&cfg)
logutil.BgLogger().Info("update server labels", zap.Any("labels", cfg.Labels))
Expand Down
60 changes: 60 additions & 0 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
Expand Down Expand Up @@ -59,6 +60,7 @@ import (
"github.com/pingcap/tidb/util/rowcodec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"go.etcd.io/etcd/tests/v3/integration"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1202,6 +1204,64 @@ func TestSetLabels(t *testing.T) {
})
}

func TestSetLabelsWithEtcd(t *testing.T) {
ts := createBasicHTTPHandlerTestSuite()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ts.startServer(t)
defer ts.stopServer(t)

integration.BeforeTestExternal(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
client := cluster.RandClient()
infosync.SetEtcdClient(client)
ts.domain.InfoSyncer().Restart(ctx)

testUpdateLabels := func(labels, expected map[string]string) {
buffer := bytes.NewBuffer([]byte{})
require.Nil(t, json.NewEncoder(buffer).Encode(labels))
resp, err := ts.PostStatus("/labels", "application/json", buffer)
require.NoError(t, err)
require.NotNil(t, resp)
defer func() {
require.NoError(t, resp.Body.Close())
}()
require.Equal(t, http.StatusOK, resp.StatusCode)
newLabels := config.GetGlobalConfig().Labels
require.Equal(t, newLabels, expected)
servers, err := infosync.GetAllServerInfo(ctx)
require.NoError(t, err)
for _, server := range servers {
for k, expectV := range expected {
v, ok := server.Labels[k]
require.True(t, ok)
require.Equal(t, expectV, v)
}
return
}
require.Fail(t, "no server found")
}

labels := map[string]string{
"zone": "us-west-1",
"test": "123",
}
testUpdateLabels(labels, labels)

updated := map[string]string{
"zone": "bj-1",
}
labels["zone"] = "bj-1"
testUpdateLabels(updated, labels)

// reset the global variable
config.UpdateGlobal(func(conf *config.Config) {
conf.Labels = map[string]string{}
})
}

func TestSetLabelsConcurrentWithGetLabel(t *testing.T) {
ts := createBasicHTTPHandlerTestSuite()

Expand Down

0 comments on commit e86a505

Please sign in to comment.