From 45712177e7d1cb8f389885ae7e69fb39207c6c57 Mon Sep 17 00:00:00 2001 From: Allen Zhong Date: Mon, 28 Dec 2020 17:32:45 +0800 Subject: [PATCH] cluster: refine the process of getting latest store info from pd (#1016) * cluster/pdapi: refine the process of getting latest store * cluster: minor optimization of output when evicting store leaders * cluster/api: refactor to use a predefined error type when store not found --- pkg/cluster/api/error.go | 41 +++++++ pkg/cluster/api/error_test.go | 47 ++++++++ pkg/cluster/api/pdapi.go | 204 +++++++++++++++++----------------- pkg/cluster/spec/tikv.go | 59 +++------- 4 files changed, 203 insertions(+), 148 deletions(-) create mode 100644 pkg/cluster/api/error.go create mode 100644 pkg/cluster/api/error_test.go diff --git a/pkg/cluster/api/error.go b/pkg/cluster/api/error.go new file mode 100644 index 0000000000..669858ea75 --- /dev/null +++ b/pkg/cluster/api/error.go @@ -0,0 +1,41 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import "fmt" + +var ( + // ErrNoStore is an empty NoStoreErr object, useful for type checking + ErrNoStore = &NoStoreErr{} +) + +// NoStoreErr is the error that no store matching address can be found in PD +type NoStoreErr struct { + addr string +} + +// Error implement the error interface +func (e *NoStoreErr) Error() string { + return fmt.Sprintf("no store matching address \"%s\" found", e.addr) +} + +// Is implements the error interface +func (e *NoStoreErr) Is(target error) bool { + t, ok := target.(*NoStoreErr) + if !ok { + return false + } + + return e.addr == t.addr || t.addr == "" +} diff --git a/pkg/cluster/api/error_test.go b/pkg/cluster/api/error_test.go new file mode 100644 index 0000000000..e23317330f --- /dev/null +++ b/pkg/cluster/api/error_test.go @@ -0,0 +1,47 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "errors" + "testing" + + "github.com/pingcap/check" +) + +func TestNoStoreErrIs(t *testing.T) { + var c *check.C + err0 := &NoStoreErr{ + addr: "1.2.3.4", + } + // identical errors are equal + c.Assert(errors.Is(err0, err0), check.IsTrue) + c.Assert(errors.Is(ErrNoStore, ErrNoStore), check.IsTrue) + c.Assert(errors.Is(ErrNoStore, &NoStoreErr{}), check.IsTrue) + c.Assert(errors.Is(&NoStoreErr{}, ErrNoStore), check.IsTrue) + // not equal for different error types + c.Assert(errors.Is(err0, errors.New("")), check.IsFalse) + // default Value matches any error + c.Assert(errors.Is(err0, ErrNoStore), check.IsTrue) + // error with values are not matching default ones + c.Assert(errors.Is(ErrNoStore, err0), check.IsFalse) + + err1 := &NoStoreErr{ + addr: "2.3.4.5", + } + c.Assert(errors.Is(err1, ErrNoStore), check.IsTrue) + // errors with different values are not equal + c.Assert(errors.Is(err0, err1), check.IsFalse) + c.Assert(errors.Is(err1, err0), check.IsFalse) +} diff --git a/pkg/cluster/api/pdapi.go b/pkg/cluster/api/pdapi.go index eaf5fb43fe..92ffb7d18d 100644 --- a/pkg/cluster/api/pdapi.go +++ b/pkg/cluster/api/pdapi.go @@ -17,15 +17,15 @@ import ( "bytes" "crypto/tls" "encoding/json" + "errors" "fmt" "io" "net/http" "net/url" - "sort" "time" "github.com/jeremywohl/flatten" - "github.com/pingcap/errors" + perrs "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/tiup/pkg/logger/log" @@ -87,7 +87,7 @@ func tryURLs(endpoints []string, f func(endpoint string) ([]byte, error)) ([]byt u, err = url.Parse(endpoint) if err != nil { - return bytes, errors.AddStack(err) + return bytes, perrs.AddStack(err) } endpoint = u.String() @@ -99,7 +99,7 @@ func tryURLs(endpoints []string, f func(endpoint string) ([]byte, error)) ([]byt return bytes, nil } if len(endpoints) > 1 && err != nil { - err = errors.Errorf("no endpoint available, the last err was: %s", err) + err = perrs.Errorf("no endpoint available, the last err was: %s", err) } return bytes, err } @@ -153,13 +153,57 @@ func (pc *PDClient) GetStores() (*pdserverapi.StoresInfo, error) { return nil, err } - sort.Slice(storesInfo.Stores, func(i int, j int) bool { - return storesInfo.Stores[i].Store.Id > storesInfo.Stores[j].Store.Id - }) + // Desc sorting the store list, we assume the store with largest ID is the + // latest one. + // Not necessary when we implement the workaround pd-3303 in GetCurrentStore() + //sort.Slice(storesInfo.Stores, func(i int, j int) bool { + // return storesInfo.Stores[i].Store.Id > storesInfo.Stores[j].Store.Id + //}) return &storesInfo, nil } +// GetCurrentStore gets the current store info of a given host +func (pc *PDClient) GetCurrentStore(addr string) (*pdserverapi.StoreInfo, error) { + stores, err := pc.GetStores() + if err != nil { + return nil, err + } + + // Find the store with largest ID + var latestStore *pdserverapi.StoreInfo + for _, store := range stores.Stores { + if store.Store.Address == addr { + // Workaround of pd-3303: + // If the PD leader has been switched multiple times, the store IDs + // may be not monitonically assigned. To workaround this, we iterate + // over the whole store list to see if any of the store's state is + // not marked as "tombstone", then use that as the result. + // See: https://github.com/tikv/pd/issues/3303 + // + // It's logically not necessary to find the store with largest ID + // number anymore in this process, but we're keeping the behavior + // as the reasonable approach would still be using the state from + // latest store, and this is only a workaround. + if store.Store.State != metapb.StoreState_Tombstone { + return store, nil + } + + if latestStore == nil { + latestStore = store + continue + } + if store.Store.Id > latestStore.Store.Id { + latestStore = store + } + } + } + if latestStore != nil { + return latestStore, nil + } + return nil, &NoStoreErr{addr: addr} +} + // WaitLeader wait until there's a leader or timeout. func (pc *PDClient) WaitLeader(retryOpt *utils.RetryOption) error { if retryOpt == nil { @@ -177,7 +221,7 @@ func (pc *PDClient) WaitLeader(retryOpt *utils.RetryOption) error { // return error by default, to make the retry work log.Debugf("Still waitting for the PD leader to be elected") - return errors.New("still waitting for the PD leader to be elected") + return perrs.New("still waitting for the PD leader to be elected") }, *retryOpt); err != nil { return fmt.Errorf("error getting PD leader, %v", err) } @@ -249,12 +293,12 @@ func (pc *PDClient) GetDashboardAddress() (string, error) { cfg, err := flatten.Flatten(pdConfig, "", flatten.DotStyle) if err != nil { - return "", errors.AddStack(err) + return "", perrs.AddStack(err) } addr, ok := cfg["pd-server.dashboard-address"].(string) if !ok { - return "", errors.New("cannot found dashboard address") + return "", perrs.New("cannot found dashboard address") } return addr, nil } @@ -308,7 +352,7 @@ func (pc *PDClient) EvictPDLeader(retryOpt *utils.RetryOption) error { // return error by default, to make the retry work log.Debugf("Still waitting for the PD leader to transfer") - return errors.New("still waitting for the PD leader to transfer") + return perrs.New("still waitting for the PD leader to transfer") }, *retryOpt); err != nil { return fmt.Errorf("error evicting PD leader, %v", err) } @@ -330,23 +374,12 @@ type pdSchedulerRequest struct { // The host parameter should be in format of IP:Port, that matches store's address func (pc *PDClient) EvictStoreLeader(host string, retryOpt *utils.RetryOption, countLeader func(string) (int, error)) error { // get info of current stores - stores, err := pc.GetStores() + latestStore, err := pc.GetCurrentStore(host) if err != nil { - return err - } - - // get store info of host - var latestStore *pdserverapi.StoreInfo - for _, storeInfo := range stores.Stores { - if storeInfo.Store.Address != host { - continue + if errors.Is(err, ErrNoStore) { + return nil } - latestStore = storeInfo - break - } - - if latestStore == nil { - return nil + return err } // XXX: the status address in store will be something like 0.0.0.0:20180 @@ -359,7 +392,7 @@ func (pc *PDClient) EvictStoreLeader(host string, retryOpt *utils.RetryOption, c return nil } - log.Infof("Evicting %d leaders from store %s...", leaderCount, latestStore.Store.Address) + log.Infof("\tEvicting %d leaders from store %s...", leaderCount, latestStore.Store.Address) // set scheduler for stores scheduler, err := json.Marshal(pdSchedulerRequest{ @@ -387,31 +420,28 @@ func (pc *PDClient) EvictStoreLeader(host string, retryOpt *utils.RetryOption, c } } if err := utils.Retry(func() error { - currStores, err := pc.GetStores() + currStore, err := pc.GetCurrentStore(host) if err != nil { + if errors.Is(err, ErrNoStore) { + return nil + } return err } // check if all leaders are evicted - for _, currStoreInfo := range currStores.Stores { - if currStoreInfo.Store.Address != host { - continue - } - if leaderCount, err = countLeader(latestStore.Store.Address); err != nil { - return err - } - if leaderCount == 0 { - return nil - } - log.Debugf( - "Still waitting for %d store leaders to transfer...", - leaderCount, - ) - break + if leaderCount, err = countLeader(currStore.Store.Address); err != nil { + return err + } + if leaderCount == 0 { + return nil } + log.Infof( + "\t Still waitting for %d store leaders to transfer...", + leaderCount, + ) // return error by default, to make the retry work - return errors.New("still waiting for the store leaders to transfer") + return perrs.New("still waiting for the store leaders to transfer") }, *retryOpt); err != nil { return fmt.Errorf("error evicting store leader from %s, %v", host, err) } @@ -422,26 +452,11 @@ func (pc *PDClient) EvictStoreLeader(host string, retryOpt *utils.RetryOption, c // leaders to be transffered to it again. func (pc *PDClient) RemoveStoreEvict(host string) error { // get info of current stores - stores, err := pc.GetStores() + latestStore, err := pc.GetCurrentStore(host) if err != nil { return err } - // get store info of host - var latestStore *pdserverapi.StoreInfo - for _, storeInfo := range stores.Stores { - if storeInfo.Store.Address != host { - continue - } - latestStore = storeInfo - break - } - - if latestStore == nil { - // no store matches, just skip - return nil - } - // remove scheduler for the store cmd := fmt.Sprintf( "%s/%s", @@ -478,7 +493,7 @@ func (pc *PDClient) DelPD(name string, retryOpt *utils.RetryOption) error { return err } if len(members.Members) == 1 { - return errors.New("at least 1 PD node must be online, can not delete") + return perrs.New("at least 1 PD node must be online, can not delete") } // try to delete the node @@ -517,7 +532,7 @@ func (pc *PDClient) DelPD(name string, retryOpt *utils.RetryOption) error { // check if the deleted member still present for _, member := range currMembers.Members { if member.Name == name { - return errors.New("still waitting for the PD node to be deleted") + return perrs.New("still waitting for the PD node to be deleted") } } @@ -530,23 +545,16 @@ func (pc *PDClient) DelPD(name string, retryOpt *utils.RetryOption) error { func (pc *PDClient) isSameState(host string, state metapb.StoreState) (bool, error) { // get info of current stores - stores, err := pc.GetStores() + storeInfo, err := pc.GetCurrentStore(host) if err != nil { return false, err } - for _, storeInfo := range stores.Stores { - if storeInfo.Store.Address != host { - continue - } - - if storeInfo.Store.State == state { - return true, nil - } - return false, nil + if storeInfo.Store.State == state { + return true, nil } - return false, errors.New("node not exists") + return false, nil } // IsTombStone check if the node is Tombstone. @@ -561,31 +569,20 @@ func (pc *PDClient) IsUp(host string) (bool, error) { return pc.isSameState(host, metapb.StoreState_Up) } -// ErrStoreNotExists represents the store not exists. -var ErrStoreNotExists = errors.New("store not exists") - // DelStore deletes stores from a (TiKV) host // The host parameter should be in format of IP:Port, that matches store's address func (pc *PDClient) DelStore(host string, retryOpt *utils.RetryOption) error { // get info of current stores - stores, err := pc.GetStores() + storeInfo, err := pc.GetCurrentStore(host) if err != nil { + if errors.Is(err, ErrNoStore) { + return nil + } return err } // get store ID of host - var storeID uint64 - for _, storeInfo := range stores.Stores { - if storeInfo.Store.Address != host { - continue - } - storeID = storeInfo.Store.Id - break - } - - if storeID == 0 { - return errors.Annotatef(ErrStoreNotExists, "id: %s", host) - } + storeID := storeInfo.Store.Id cmd := fmt.Sprintf("%s/%d", pdStoreURI, storeID) endpoints := pc.getEndpoints(cmd) @@ -614,23 +611,24 @@ func (pc *PDClient) DelStore(host string, retryOpt *utils.RetryOption) error { } } if err := utils.Retry(func() error { - currStores, err := pc.GetStores() + currStore, err := pc.GetCurrentStore(host) if err != nil { + // the store does not exist anymore, just ignore and skip + if errors.Is(err, ErrNoStore) { + return nil + } return err } - // check if the deleted member still present - for _, store := range currStores.Stores { - if store.Store.Id == storeID { - // deleting a store may take long time to transfer data, so we - // return success once it get to "Offline" status and not waiting - // for the whole process to complete. - // When finished, the store's state will be "Tombstone". - if store.Store.StateName != metapb.StoreState_name[0] { - return nil - } - return errors.New("still waiting for the store to be deleted") + if currStore.Store.Id == storeID { + // deleting a store may take long time to transfer data, so we + // return success once it get to "Offline" status and not waiting + // for the whole process to complete. + // When finished, the store's state will be "Tombstone". + if currStore.Store.State != metapb.StoreState_Up { + return nil } + return perrs.New("still waiting for the store to be deleted") } return nil @@ -670,7 +668,7 @@ func (pc *PDClient) GetLocationLabels() ([]string, error) { rc := pdconfig.ReplicationConfig{} if err := json.Unmarshal(config, &rc); err != nil { - return nil, errors.Annotatef(err, "unmarshal replication config: %s", string(config)) + return nil, perrs.Annotatef(err, "unmarshal replication config: %s", string(config)) } return rc.LocationLabels, nil @@ -685,7 +683,7 @@ func (pc *PDClient) GetTiKVLabels() (map[string]map[string]string, error) { locationLabels := map[string]map[string]string{} for _, s := range r.Stores { - if s.Store.StateName != "Up" { + if s.Store.State != metapb.StoreState_Up { continue } lbs := s.Store.GetLabels() diff --git a/pkg/cluster/spec/tikv.go b/pkg/cluster/spec/tikv.go index ca06acd079..3bdb2759d3 100644 --- a/pkg/cluster/spec/tikv.go +++ b/pkg/cluster/spec/tikv.go @@ -15,6 +15,7 @@ package spec import ( "crypto/tls" + "errors" "fmt" "io/ioutil" "net/http" @@ -23,8 +24,7 @@ import ( "strings" "time" - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/metapb" + perrs "github.com/pingcap/errors" "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/cluster/executor" "github.com/pingcap/tiup/pkg/cluster/template/scripts" @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tiup/pkg/utils" dto "github.com/prometheus/client_model/go" "github.com/prometheus/prom2json" - pdserverapi "github.com/tikv/pd/server/api" ) const ( @@ -66,45 +65,15 @@ func checkStoreStatus(storeAddr string, tlsCfg *tls.Config, pdList ...string) st return "N/A" } pdapi := api.NewPDClient(pdList, statusQueryTimeout, tlsCfg) - stores, err := pdapi.GetStores() + store, err := pdapi.GetCurrentStore(storeAddr) if err != nil { + if errors.Is(err, api.ErrNoStore) { + return "N/A" + } return "Down" } - // only get status of the latest store, it is the store with largest ID number - // older stores might be legacy ones that already offlined - var latestStore *pdserverapi.StoreInfo - - for _, store := range stores.Stores { - if storeAddr == store.Store.Address { - if latestStore == nil { - latestStore = store - continue - } - - // If the PD leader has been switched multiple times, the store IDs - // may be not monitonically assigned. To workaround this, we iterate - // over the whole store list to see if any of the store's state is - // not marked as "tombstone", then use that as the result. - // See: https://github.com/tikv/pd/issues/3303 - // - // It's logically not necessary to find the store with largest ID - // number anymore in this process, but we're keeping the behavior - // as the reasonable approach would still be using the state from - // latest store, and this is only a workaround. - if store.Store.State != metapb.StoreState_Tombstone { - return store.Store.StateName - } - - if store.Store.Id > latestStore.Store.Id { - latestStore = store - } - } - } - if latestStore != nil { - return latestStore.Store.StateName - } - return "N/A" + return store.Store.StateName } // Status queries current status of the instance @@ -153,11 +122,11 @@ func (s TiKVSpec) Labels() (map[string]string, error) { for k, v := range m { key, ok := k.(string) if !ok { - return nil, errors.Errorf("TiKV label name %v is not a string, check the instance: %s:%d", k, s.Host, s.GetMainPort()) + return nil, perrs.Errorf("TiKV label name %v is not a string, check the instance: %s:%d", k, s.Host, s.GetMainPort()) } value, ok := v.(string) if !ok { - return nil, errors.Errorf("TiKV label value %v is not a string, check the instance: %s:%d", v, s.Host, s.GetMainPort()) + return nil, perrs.Errorf("TiKV label value %v is not a string, check the instance: %s:%d", v, s.Host, s.GetMainPort()) } lbs[key] = value @@ -352,7 +321,7 @@ func (i *TiKVInstance) PreRestart(topo Topology, apiTimeoutSeconds int, tlsCfg * if utils.IsTimeoutOrMaxRetry(err) { log.Warnf("Ignore evicting store leader from %s, %v", i.ID(), err) } else { - return errors.Annotatef(err, "failed to evict store leader %s", i.GetHost()) + return perrs.Annotatef(err, "failed to evict store leader %s", i.GetHost()) } } return nil @@ -373,7 +342,7 @@ func (i *TiKVInstance) PostRestart(topo Topology, tlsCfg *tls.Config) error { // remove store leader evict scheduler after restart if err := pdClient.RemoveStoreEvict(addr(i)); err != nil { - return errors.Annotatef(err, "failed to remove evict store scheduler for %s", i.GetHost()) + return perrs.Annotatef(err, "failed to remove evict store scheduler for %s", i.GetHost()) } return nil @@ -435,7 +404,7 @@ func genLeaderCounter(topo *Specification, tlsCfg *tls.Config) func(string) (int } } - return 0, errors.Errorf("metric %s{type=\"%s\"} not found", metricNameRegionCount, labelNameLeaderCount) + return 0, perrs.Errorf("metric %s{type=\"%s\"} not found", metricNameRegionCount, labelNameLeaderCount) } } @@ -460,13 +429,13 @@ func checkHTTPS(url string, tlsCfg *tls.Config) error { req, err := http.NewRequest("GET", url, nil) if err != nil { - return errors.Annotatef(err, "creating GET request for URL %q failed", url) + return perrs.Annotatef(err, "creating GET request for URL %q failed", url) } client := http.Client{Transport: transport} resp, err := client.Do(req) if err != nil { - return errors.Annotatef(err, "executing GET request for URL %q failed", url) + return perrs.Annotatef(err, "executing GET request for URL %q failed", url) } resp.Body.Close() return nil