Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support tikv label check #800

Merged
merged 20 commits into from
Sep 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/cluster/command/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func newDeploy() *cobra.Command {
cmd.Flags().StringVarP(&opt.IdentityFile, "identity_file", "i", opt.IdentityFile, "The path of the SSH identity file. If specified, public key authentication will be used.")
cmd.Flags().BoolVarP(&opt.UsePassword, "password", "p", false, "Use password of target hosts. If specified, password authentication will be used.")
cmd.Flags().BoolVarP(&opt.IgnoreConfigCheck, "ignore-config-check", "", opt.IgnoreConfigCheck, "Ignore the config check result")
cmd.Flags().BoolVarP(&opt.NoLabels, "no-labels", "", false, "Don't check TiKV labels")

return cmd
}
Expand Down
1 change: 1 addition & 0 deletions components/cluster/command/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func newScaleOutCmd() *cobra.Command {
cmd.Flags().BoolVarP(&opt.SkipCreateUser, "skip-create-user", "", false, "Skip creating the user specified in topology (experimental).")
cmd.Flags().StringVarP(&opt.IdentityFile, "identity_file", "i", opt.IdentityFile, "The path of the SSH identity file. If specified, public key authentication will be used.")
cmd.Flags().BoolVarP(&opt.UsePassword, "password", "p", false, "Use password of target hosts. If specified, password authentication will be used.")
cmd.Flags().BoolVarP(&opt.NoLabels, "no-labels", "", false, "Don't check TiKV labels")

return cmd
}
Expand Down
10 changes: 8 additions & 2 deletions examples/topology.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ server_configs:
schedule.leader-schedule-limit: 4
schedule.region-schedule-limit: 2048
schedule.replica-schedule-limit: 64
replication.location-labels: ["zone", "host"]
replication.strictly-match-label: true
tiflash:
# path_realtime_mode: false
logger.level: "info"
Expand Down Expand Up @@ -112,11 +114,15 @@ tikv_servers:
# log_dir: "/tidb-deploy/tikv-20160/log"
# numa_node: "0,1"
# # The following configs are used to overwrite the `server_configs.tikv` values.
# config:
config:
server.labels: { zone: "zone1", host: "host1" }
# server.grpc-concurrency: 4
# server.labels: { zone: "zone1", dc: "dc1", host: "host1" }
- host: 10.0.1.15
config:
server.labels: { zone: "zone1", host: "host2" }
- host: 10.0.1.16
config:
server.labels: { zone: "zone2", host: "host3" }

tiflash_servers:
- host: 10.0.1.14
Expand Down
16 changes: 16 additions & 0 deletions pkg/cluster/api/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tiup/pkg/logger/log"
"github.com/pingcap/tiup/pkg/utils"
pdserverapi "github.com/tikv/pd/server/api"
pdconfig "github.com/tikv/pd/server/config"
)

// PDClient is an HTTP client of the PD server
Expand Down Expand Up @@ -661,6 +662,21 @@ func (pc *PDClient) GetReplicateConfig() ([]byte, error) {
})
}

// GetLocationLabels gets the replication.location-labels config from pd server
func (pc *PDClient) GetLocationLabels() ([]string, error) {
config, err := pc.GetReplicateConfig()
if err != nil {
return nil, err
}

rc := pdconfig.ReplicationConfig{}
if err := json.Unmarshal(config, &rc); err != nil {
return nil, errors.Annotatef(err, "unmarshal replication config: %s", string(config))
}

return rc.LocationLabels, nil
}

// UpdateScheduleConfig updates the PD schedule config
func (pc *PDClient) UpdateScheduleConfig(body io.Reader) error {
return pc.updateConfig(body, pdConfigSchedule)
Expand Down
71 changes: 58 additions & 13 deletions pkg/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (
"path/filepath"
"sort"
"strings"
"time"

"github.com/fatih/color"
"github.com/joomcode/errorx"
perrs "github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/clusterutil"
"github.com/pingcap/tiup/pkg/cluster/executor"
operator "github.com/pingcap/tiup/pkg/cluster/operation"
Expand Down Expand Up @@ -184,7 +186,7 @@ func (m *Manager) StopCluster(clusterName string, options operator.Options) erro

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
return err
}

t := task.NewBuilder().
Expand Down Expand Up @@ -221,7 +223,7 @@ func (m *Manager) RestartCluster(clusterName string, options operator.Options) e

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
return err
}

t := task.NewBuilder().
Expand Down Expand Up @@ -291,7 +293,7 @@ func (m *Manager) CleanCluster(clusterName string, gOpt operator.Options, cleanO

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
return err
}

if !skipConfirm {
Expand Down Expand Up @@ -354,7 +356,7 @@ func (m *Manager) DestroyCluster(clusterName string, gOpt operator.Options, dest

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
return err
}

if !skipConfirm {
Expand Down Expand Up @@ -529,6 +531,10 @@ func (m *Manager) Display(clusterName string, opt operator.Options) error {
filterRoles := set.NewStringSet(opt.Roles...)
filterNodes := set.NewStringSet(opt.Nodes...)
pdList := topo.BaseTopo().MasterList
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return err
}
for _, comp := range topo.ComponentsByStartOrder() {
for _, ins := range comp.Instances() {
// apply role filter
Expand All @@ -547,10 +553,6 @@ func (m *Manager) Display(clusterName string, opt operator.Options) error {
dataDir = insDirs[1]
}

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
}
status := ins.Status(tlsCfg, pdList...)
// Query the service status
if status == "-" {
Expand Down Expand Up @@ -595,6 +597,17 @@ func (m *Manager) Display(clusterName string, opt operator.Options) error {
cliutil.PrintTable(clusterTable, true)
fmt.Printf("Total nodes: %d\n", len(clusterTable)-1)

if topo, ok := topo.(*spec.Specification); ok {
// Check if TiKV's label set correctly
kvs := topo.TiKVServers
pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg)
if lbs, err := pdClient.GetLocationLabels(); err != nil {
color.Yellow("\nWARN: get location labels from pd failed: %v", err)
} else if err := spec.CheckTiKVLocationLabels(lbs, kvs); err != nil {
color.Yellow("\nWARN: there is something wrong with TiKV labels, which may cause data losing:\n%v", err)
}
}

return nil
}

Expand Down Expand Up @@ -749,7 +762,7 @@ func (m *Manager) Reload(clusterName string, opt operator.Options, skipRestart b

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
return err
}
if !skipRestart {
tb = tb.Func("UpgradeCluster", func(ctx *task.Context) error {
Expand Down Expand Up @@ -894,7 +907,7 @@ func (m *Manager) Upgrade(clusterName string, clusterVersion string, opt operato

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
return err
}
t := task.NewBuilder().
SSHKeySet(
Expand Down Expand Up @@ -964,7 +977,7 @@ func (m *Manager) Patch(clusterName string, packagePath string, opt operator.Opt

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
return err
}
t := task.NewBuilder().
SSHKeySet(
Expand Down Expand Up @@ -1000,6 +1013,7 @@ type ScaleOutOptions struct {
SkipCreateUser bool // don't create user
IdentityFile string // path to the private key file
UsePassword bool // use password instead of identity file for ssh connection
NoLabels bool // don't check labels for TiKV instance
}

// DeployOptions contains the options for scale out.
Expand All @@ -1010,6 +1024,7 @@ type DeployOptions struct {
IdentityFile string // path to the private key file
UsePassword bool // use password instead of identity file for ssh connection
IgnoreConfigCheck bool // ignore config check result
NoLabels bool // don't check labels for TiKV instance
}

// DeployerInstance is a instance can deploy to a target deploy directory.
Expand Down Expand Up @@ -1057,6 +1072,18 @@ func (m *Manager) Deploy(
base.GlobalOptions.SSHType = sshType
}

if topo, ok := topo.(*spec.Specification); ok && !opt.NoLabels {
// Check if TiKV's label set correctly
lbs, err := topo.LocationLabels()
if err != nil {
return err
}
kvs := topo.TiKVServers
if err := spec.CheckTiKVLocationLabels(lbs, kvs); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
lucklove marked this conversation as resolved.
Show resolved Hide resolved
}
}

clusterList, err := m.specManager.GetAllClusters()
if err != nil {
return err
Expand Down Expand Up @@ -1404,7 +1431,7 @@ func (m *Manager) ScaleIn(

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
return err
}

b := task.NewBuilder().
Expand Down Expand Up @@ -1478,6 +1505,24 @@ func (m *Manager) ScaleOut(
return err
}

if topo, ok := topo.(*spec.Specification); ok && !opt.NoLabels {
// Check if TiKV's label set correctly
pdList := topo.BaseTopo().MasterList
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return err
}
pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg)
lbs, err := pdClient.GetLocationLabels()
if err != nil {
return err
}
kvs := mergedTopo.(*spec.Specification).TiKVServers
if err := spec.CheckTiKVLocationLabels(lbs, kvs); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
}
}

clusterList, err := m.specManager.GetAllClusters()
if err != nil {
return err
Expand Down Expand Up @@ -1872,7 +1917,7 @@ func buildScaleOutTask(

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return nil, perrs.AddStack(err)
return nil, err
}

// Initialize the environments
Expand Down
53 changes: 43 additions & 10 deletions pkg/cluster/spec/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,32 +99,65 @@ func patch(origin map[string]interface{}, key string, val interface{}) {
}
}

func flattenMap(ms map[string]interface{}) (map[string]interface{}, error) {
func flattenMap(ms map[string]interface{}) map[string]interface{} {
result := map[string]interface{}{}
for k, v := range ms {
key, val := flattenKey(k, v)
patch(result, key, val)
}
return result, nil
return result
}

func merge(orig map[string]interface{}, overwrites ...map[string]interface{}) (map[string]interface{}, error) {
lhs, err := flattenMap(orig)
if err != nil {
return nil, err
}
lhs := flattenMap(orig)
for _, overwrite := range overwrites {
rhs, err := flattenMap(overwrite)
if err != nil {
return nil, err
}
rhs := flattenMap(overwrite)
for k, v := range rhs {
patch(lhs, k, v)
}
}
return lhs, nil
}

// GetValueFromPath try to find the value by path recursively
func GetValueFromPath(m map[string]interface{}, p string) interface{} {
ss := strings.Split(p, ".")

searchMap := make(map[interface{}]interface{})
for k, v := range m {
searchMap[k] = v
}

return searchValue(searchMap, ss)
}

func searchValue(m map[interface{}]interface{}, ss []string) interface{} {
l := len(ss)
switch l {
case 0:
return m
case 1:
return m[ss[0]]
}

if m[strings.Join(ss, ".")] != nil {
return m[strings.Join(ss, ".")]
}

for i := l - 1; i > 0; i-- {
key := strings.Join(ss[:i], ".")
if m[key] == nil {
continue
}
if pm, ok := m[key].(map[interface{}]interface{}); ok {
return searchValue(pm, ss[i:])
}
return nil
}

return nil
}

// Merge2Toml merge the config of global.
func Merge2Toml(comp string, global, overwrite map[string]interface{}) ([]byte, error) {
return merge2Toml(comp, global, overwrite)
Expand Down
27 changes: 27 additions & 0 deletions pkg/cluster/spec/server_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,30 @@ server_configs:
decimal = bytes.Contains(get, []byte("0.0"))
c.Assert(decimal, check.IsTrue)
}

func (s *configSuite) TestGetValueFromPath(c *check.C) {
yamlData := []byte(`
server_configs:
tidb:
a.b.c.d: 1
a:
b:
c:
d: 2
a.b:
c.e: 3
a.b.c:
f: 4
h.i.j.k: [1, 2, 3]
`)

topo := new(Specification)

err := yaml.Unmarshal(yamlData, topo)
c.Assert(err, check.IsNil)

c.Assert(GetValueFromPath(topo.ServerConfigs.TiDB, "a.b.c.d"), check.Equals, 1)
c.Assert(GetValueFromPath(topo.ServerConfigs.TiDB, "a.b.c.e"), check.Equals, nil)
c.Assert(GetValueFromPath(topo.ServerConfigs.TiDB, "a.b.c.f"), check.Equals, 4)
c.Assert(GetValueFromPath(topo.ServerConfigs.TiDB, "h.i.j.k"), check.DeepEquals, []interface{}{1, 2, 3})
}
Loading