Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support tikv label check #800

Merged
merged 20 commits into from
Sep 27, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/cluster/command/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func newDeploy() *cobra.Command {
cmd.Flags().StringVarP(&opt.IdentityFile, "identity_file", "i", opt.IdentityFile, "The path of the SSH identity file. If specified, public key authentication will be used.")
cmd.Flags().BoolVarP(&opt.UsePassword, "password", "p", false, "Use password of target hosts. If specified, password authentication will be used.")
cmd.Flags().BoolVarP(&opt.IgnoreConfigCheck, "ignore-config-check", "", opt.IgnoreConfigCheck, "Ignore the config check result")
cmd.Flags().BoolVarP(&opt.NoLabels, "no-labels", "", false, "Don't check TiKV labels")

return cmd
}
Expand Down
1 change: 1 addition & 0 deletions components/cluster/command/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func newScaleOutCmd() *cobra.Command {
cmd.Flags().BoolVarP(&opt.SkipCreateUser, "skip-create-user", "", false, "Skip creating the user specified in topology (experimental).")
cmd.Flags().StringVarP(&opt.IdentityFile, "identity_file", "i", opt.IdentityFile, "The path of the SSH identity file. If specified, public key authentication will be used.")
cmd.Flags().BoolVarP(&opt.UsePassword, "password", "p", false, "Use password of target hosts. If specified, password authentication will be used.")
cmd.Flags().BoolVarP(&opt.NoLabels, "no-labels", "", false, "Don't check TiKV labels")

return cmd
}
Expand Down
10 changes: 8 additions & 2 deletions examples/topology.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ server_configs:
schedule.leader-schedule-limit: 4
schedule.region-schedule-limit: 2048
schedule.replica-schedule-limit: 64
replication.location-labels: ["zone", "host"]
replication.strictly-match-label: true
tiflash:
# path_realtime_mode: false
logger.level: "info"
Expand Down Expand Up @@ -112,11 +114,15 @@ tikv_servers:
# log_dir: "/tidb-deploy/tikv-20160/log"
# numa_node: "0,1"
# # The following configs are used to overwrite the `server_configs.tikv` values.
# config:
config:
server.labels: { zone: "zone1", host: "host1" }
# server.grpc-concurrency: 4
# server.labels: { zone: "zone1", dc: "dc1", host: "host1" }
- host: 10.0.1.15
config:
server.labels: { zone: "zone1", host: "host2" }
- host: 10.0.1.16
config:
server.labels: { zone: "zone2", host: "host3" }

tiflash_servers:
- host: 10.0.1.14
Expand Down
16 changes: 16 additions & 0 deletions pkg/cluster/api/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tiup/pkg/logger/log"
"github.com/pingcap/tiup/pkg/utils"
pdserverapi "github.com/tikv/pd/server/api"
pdconfig "github.com/tikv/pd/server/config"
)

// PDClient is an HTTP client of the PD server
Expand Down Expand Up @@ -661,6 +662,21 @@ func (pc *PDClient) GetReplicateConfig() ([]byte, error) {
})
}

// GetLocationLabels gets the replication.location-labels config from pd server
func (pc *PDClient) GetLocationLabels() ([]string, error) {
config, err := pc.GetReplicateConfig()
if err != nil {
return nil, err
}

rc := pdconfig.ReplicationConfig{}
if err := json.Unmarshal(config, &rc); err != nil {
return nil, errors.Annotate(err, "unmarshal replication config")
lucklove marked this conversation as resolved.
Show resolved Hide resolved
}

return rc.LocationLabels, nil
}

// UpdateScheduleConfig updates the PD schedule config
func (pc *PDClient) UpdateScheduleConfig(body io.Reader) error {
return pc.updateConfig(body, pdConfigSchedule)
Expand Down
48 changes: 44 additions & 4 deletions pkg/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (
"path/filepath"
"sort"
"strings"
"time"

"github.com/fatih/color"
"github.com/joomcode/errorx"
perrs "github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cliutil"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/clusterutil"
"github.com/pingcap/tiup/pkg/cluster/executor"
operator "github.com/pingcap/tiup/pkg/cluster/operation"
Expand Down Expand Up @@ -529,6 +531,10 @@ func (m *Manager) Display(clusterName string, opt operator.Options) error {
filterRoles := set.NewStringSet(opt.Roles...)
filterNodes := set.NewStringSet(opt.Nodes...)
pdList := topo.BaseTopo().MasterList
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
lucklove marked this conversation as resolved.
Show resolved Hide resolved
}
for _, comp := range topo.ComponentsByStartOrder() {
for _, ins := range comp.Instances() {
// apply role filter
Expand All @@ -547,10 +553,6 @@ func (m *Manager) Display(clusterName string, opt operator.Options) error {
dataDir = insDirs[1]
}

tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
}
status := ins.Status(tlsCfg, pdList...)
// Query the service status
if status == "-" {
Expand Down Expand Up @@ -595,6 +597,15 @@ func (m *Manager) Display(clusterName string, opt operator.Options) error {
cliutil.PrintTable(clusterTable, true)
fmt.Printf("Total nodes: %d\n", len(clusterTable)-1)

// Check if TiKV's label set correctly
kvs := topo.(*spec.Specification).TiKVServers
pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg)
if lbs, err := pdClient.GetLocationLabels(); err != nil {
color.Yellow("\nWARN: get location labels from pd failed: %v", err)
} else if err := spec.CheckTiKVLocationLabels(lbs, kvs); err != nil {
color.Yellow("\nWARN: there is something wrong with TiKV labels, which may cause data losing:\n%v", err)
}

return nil
}

Expand Down Expand Up @@ -1000,6 +1011,7 @@ type ScaleOutOptions struct {
SkipCreateUser bool // don't create user
IdentityFile string // path to the private key file
UsePassword bool // use password instead of identity file for ssh connection
NoLabels bool // don't check labels for TiKV instance
}

// DeployOptions contains the options for scale out.
Expand All @@ -1010,6 +1022,7 @@ type DeployOptions struct {
IdentityFile string // path to the private key file
UsePassword bool // use password instead of identity file for ssh connection
IgnoreConfigCheck bool // ignore config check result
NoLabels bool // don't check labels for TiKV instance
}

// DeployerInstance is a instance can deploy to a target deploy directory.
Expand Down Expand Up @@ -1057,6 +1070,15 @@ func (m *Manager) Deploy(
base.GlobalOptions.SSHType = sshType
}

if !opt.NoLabels {
// Check if TiKV's label set correctly
lbs := topo.(*spec.Specification).LocationLabels()
kvs := topo.(*spec.Specification).TiKVServers
if err := spec.CheckTiKVLocationLabels(lbs, kvs); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
lucklove marked this conversation as resolved.
Show resolved Hide resolved
}
}

clusterList, err := m.specManager.GetAllClusters()
if err != nil {
return err
Expand Down Expand Up @@ -1478,6 +1500,24 @@ func (m *Manager) ScaleOut(
return err
}

if !opt.NoLabels {
// Check if TiKV's label set correctly
pdList := topo.BaseTopo().MasterList
tlsCfg, err := topo.TLSConfig(m.specManager.Path(clusterName, spec.TLSCertKeyDir))
if err != nil {
return perrs.AddStack(err)
}
pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg)
lbs, err := pdClient.GetLocationLabels()
if err != nil {
return err
}
kvs := mergedTopo.(*spec.Specification).TiKVServers
if err := spec.CheckTiKVLocationLabels(lbs, kvs); err != nil {
return perrs.Errorf("check TiKV label failed, please fix that before continue:\n%s", err)
}
}

clusterList, err := m.specManager.GetAllClusters()
if err != nil {
return err
Expand Down
28 changes: 27 additions & 1 deletion pkg/cluster/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type BaseTopo struct {
MasterList []string
}

// Topology represents specification of the cluster.
// Topology represents specification of the cluster.
type Topology interface {
BaseTopo() *BaseTopo
// Validate validates the topology specification and produce error if the
Expand Down Expand Up @@ -210,6 +210,32 @@ func (s *Specification) BaseTopo() *BaseTopo {
}
}

// LocationLabels returns replication.location-labels in PD config
func (s *Specification) LocationLabels() []string {
lbs := []string{}

if pdReplica := s.ServerConfigs.PD["replication"]; pdReplica != nil {
lucklove marked this conversation as resolved.
Show resolved Hide resolved
// server_configs:
// pd:
// replication:
// location-labels: ["zone", "host"]
if repLbs := pdReplica.(map[interface{}]interface{})["location-labels"]; repLbs != nil {
for _, l := range repLbs.([]interface{}) {
lbs = append(lbs, l.(string))
}
}
} else if repLbs := s.ServerConfigs.PD["replication.location-labels"]; repLbs != nil {
// server_configs:
// pd:
// replication.location-labels: ["zone", "host"]
for _, l := range repLbs.([]interface{}) {
lbs = append(lbs, l.(string))
lucklove marked this conversation as resolved.
Show resolved Hide resolved
}
}

return lbs
}

// AllComponentNames contains the names of all components.
// should include all components in ComponentsByStartOrder
func AllComponentNames() (roles []string) {
Expand Down
28 changes: 28 additions & 0 deletions pkg/cluster/spec/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,31 @@ item7 = 700
c.Assert(err, IsNil)
c.Assert(string(merge2), DeepEquals, expected)
}

func (s *metaSuiteTopo) TestLocationLabels(c *C) {
spec := Specification{}

c.Assert(len(spec.LocationLabels()), Equals, 0)

err := yaml.Unmarshal([]byte(`
server_configs:
pd:
replication.location-labels: ["zone", "host"]
`), &spec)
c.Assert(err, IsNil)
lbs := spec.LocationLabels()
c.Assert(lbs, DeepEquals, []string{"zone", "host"})

spec = Specification{}
err = yaml.Unmarshal([]byte(`
server_configs:
pd:
replication:
location-labels:
- zone
- host
`), &spec)
c.Assert(err, IsNil)
lbs = spec.LocationLabels()
c.Assert(lbs, DeepEquals, []string{"zone", "host"})
}
26 changes: 26 additions & 0 deletions pkg/cluster/spec/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,32 @@ func (s TiKVSpec) IsImported() bool {
return s.Imported
}

// Labels returns the labels of TiKV
func (s TiKVSpec) Labels() map[string]string {
lbs := make(map[string]string)

if serverCfg := s.Config["server"]; serverCfg != nil {
lucklove marked this conversation as resolved.
Show resolved Hide resolved
// server:
// labels:
// host: xxx
// zone: yyy
if labelsMap := serverCfg.(map[interface{}]interface{})["labels"]; labelsMap != nil {
for k, v := range labelsMap.(map[interface{}]interface{}) {
lbs[k.(string)] = v.(string)
}
}
} else if serverLbs := s.Config["server.labels"]; serverLbs != nil {
// server.labels:
// host: xxx
// zone: yyy
for k, v := range serverLbs.(map[interface{}]interface{}) {
lbs[k.(string)] = v.(string)
}
}

return lbs
}

// TiKVComponent represents TiKV component.
type TiKVComponent struct {
*Specification
Expand Down
58 changes: 58 additions & 0 deletions pkg/cluster/spec/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,64 @@ Please change to use another port or another host.
return nil
}

// TiKVLabelError indicates that some TiKV servers don't have correct labels
type TiKVLabelError struct {
TiKVInstances map[string][]error
lucklove marked this conversation as resolved.
Show resolved Hide resolved
}

// Error implements error
func (e *TiKVLabelError) Error() string {
str := ""
for id, errs := range e.TiKVInstances {
if len(errs) == 0 {
continue
}

str += fmt.Sprintf("%s:\n", id)
for _, e := range errs {
str += fmt.Sprintf("\t%s\n", e.Error())
}
}
return str
}

// CheckTiKVLocationLabels will check if tikv missing label or have wrong label
func CheckTiKVLocationLabels(pdLocLabels []string, kvs []TiKVSpec) error {
lerr := &TiKVLabelError{
TiKVInstances: make(map[string][]error),
}
lbs := set.NewStringSet(pdLocLabels...)
lucklove marked this conversation as resolved.
Show resolved Hide resolved
hosts := make(map[string]int)

for _, kv := range kvs {
hosts[kv.Host] = hosts[kv.Host] + 1
}
for _, kv := range kvs {
id := fmt.Sprintf("%s:%d", kv.Host, kv.GetMainPort())
ls := kv.Labels()
if len(ls) == 0 && hosts[kv.Host] > 1 {
lerr.TiKVInstances[id] = append(
lerr.TiKVInstances[id],
errors.New("location label missing"),
lucklove marked this conversation as resolved.
Show resolved Hide resolved
)
continue
}
for lname := range ls {
if !lbs.Exist(lname) {
lerr.TiKVInstances[id] = append(
lerr.TiKVInstances[id],
fmt.Errorf("label name '%s' is not specified in pd config (replication.location-labels: %v)", lname, pdLocLabels),
)
}
}
}

if len(lerr.TiKVInstances) == 0 {
return nil
}
return lerr
}

// platformConflictsDetect checks for conflicts in topology for different OS / Arch
// set to the same host / IP
func (s *Specification) platformConflictsDetect() error {
Expand Down
Loading