Skip to content

Commit

Permalink
Add replicas checker (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
iosmanthus committed Dec 14, 2021
1 parent ec0f80d commit 185f66e
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 2 deletions.
14 changes: 13 additions & 1 deletion components/recover/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"github.com/iosmanthus/learner-recover/common"

"github.com/pingcap/tiup/pkg/cluster/spec"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
)

type Config struct {
WaitRulesFit bool
ClusterVersion string
Patch string
ClusterName string
Expand All @@ -24,6 +26,7 @@ type Config struct {
NewTopology struct {
Path string
PDServers []*spec.PDSpec
Monitors []*spec.PrometheusSpec
}
NewPlacementRules string
PDBootstrap []string
Expand All @@ -38,6 +41,7 @@ type Config struct {

func NewConfig(path string) (*Config, error) {
type _Config struct {
WaitRulesFit bool `yaml:"wait-rules-fit"`
ClusterVersion string `yaml:"cluster-version"`
ExtraSSHOpts string `yaml:"extra-ssh-opts"`
Patch string `yaml:"patch"`
Expand Down Expand Up @@ -111,6 +115,11 @@ func NewConfig(path string) (*Config, error) {
if err != nil {
return nil, err
}
if len(newTopo.PDServers) == 0 {
return nil, errors.New("no PD nodes in the new cluster, please check the topology file")
} else if len(newTopo.Monitors) == 0 {
return nil, errors.New("no monitor nodes in the new cluster, please check the topology file")
}

var sshArgs []string
if c.ExtraSSHOpts != "" {
Expand Down Expand Up @@ -146,6 +155,7 @@ func NewConfig(path string) (*Config, error) {

baseDir := "/tmp"
filename := baseDir + "/join-" + common.StringifyLabels(labels) + ".yaml"
log.Infof("Genrerating join topology: %v for %v", filename, common.StringifyLabels(labels))
err = ioutil.WriteFile(filename, data, 0644)
if err != nil {
return nil, err
Expand All @@ -154,6 +164,7 @@ func NewConfig(path string) (*Config, error) {
}

return &Config{
WaitRulesFit: c.WaitRulesFit,
ClusterVersion: c.ClusterVersion,
Patch: c.Patch,
ExtraSSHOpts: sshArgs,
Expand All @@ -165,7 +176,8 @@ func NewConfig(path string) (*Config, error) {
NewTopology: struct {
Path string
PDServers []*spec.PDSpec
}{c.NewTopology, newTopo.PDServers},
Monitors []*spec.PrometheusSpec
}{c.NewTopology, newTopo.PDServers, newTopo.Monitors},
JoinTopology: joinFiles,
RecoverInfoFile: info,
PDBootstrap: c.PDBootstrap,
Expand Down
53 changes: 52 additions & 1 deletion components/recover/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"errors"
"fmt"
prom "github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"net/http"
"os/exec"
"strings"
Expand Down Expand Up @@ -40,6 +43,7 @@ type Recover interface {
PatchCluster(ctx context.Context) error
ApplyNewPlacementRule(ctx context.Context) error
CleanZones(ctx context.Context) error
WaitRulesFit(ctx context.Context) error
}

type UnsafeRecover interface {
Expand Down Expand Up @@ -348,6 +352,51 @@ func (r *ClusterRescuer) cleanCluster(ctx context.Context) error {
return err
}

func (r *ClusterRescuer) WaitRulesFit(ctx context.Context) error {
config := r.config
if !config.WaitRulesFit {
return nil
}

monitor := config.NewTopology.Monitors[0]
client, err := prom.NewClient(prom.Config{
Address: fmt.Sprintf("http://%s:%v", monitor.Host, monitor.Port),
})
if err != nil {
return err
}

api := v1.NewAPI(client)
for {
result, _, err := api.Query(ctx, "pd_regions_status{type='miss-peer-region-count'}[1m]", time.Now())
if err != nil {
return err
}
metrics, ok := result.(model.Matrix)
if !ok {
return errors.New("expected query result is Matrix type")
}

if metrics.Len() == 0 || len(metrics[0].Values) < 4 {
log.Info("Waiting for monitor nodes ready")
time.Sleep(time.Second * 1)
continue
}

log.Info("Waiting replicas fit")
var cnt int64
for _, m := range metrics[0].Values {
cnt += int64(m.Value)
}

if cnt == 0 {
break
}
time.Sleep(time.Second * 1)
}
return nil
}

func (r *ClusterRescuer) Execute(ctx context.Context) error {
c := r.config
log.Warnf("Recovering zone: %s", common.StringifyLabels(c.Labels[r.currentZoneIdx]))
Expand Down Expand Up @@ -403,5 +452,7 @@ func (r *ClusterRescuer) Execute(ctx context.Context) error {
if err != nil {
log.Error("fail to clean failed zones! Please clean the zones manually.")
}
return nil

log.Info("Waiting placement rules fits")
return r.WaitRulesFit(ctx)
}
48 changes: 48 additions & 0 deletions components/recover/recover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package recover

import (
"context"
"flag"
"github.com/alecthomas/assert"
"github.com/pingcap/tiup/pkg/cluster/spec"
"strconv"
"strings"
"testing"
)

var promAddr string

func init() {
flag.StringVar(&promAddr, "prom", "", "Address of Prometheus")
}

func TestMetrics(t *testing.T) {
flag.Parse()
if promAddr == "" {
return
}
infos := strings.Split(promAddr, ":")
if len(infos) < 2 {
t.Fatal("Invalid Prometheus address")
}

prom := &spec.PrometheusSpec{}
prom.Host = infos[0]
port, err := strconv.Atoi(infos[1])
assert.Nil(t, err, "Invalid Prometheus port")
prom.Port = port

config := &Config{
WaitRulesFit: true,
NewTopology: struct {
Path string
PDServers []*spec.PDSpec
Monitors []*spec.PrometheusSpec
}{
Monitors: []*spec.PrometheusSpec{prom},
},
}
rescuer := NewClusterRescuer(config)
err = rescuer.WaitRulesFit(context.Background())
assert.Nil(t, err)
}

0 comments on commit 185f66e

Please sign in to comment.