Skip to content

Commit

Permalink
cluster: check tiflash status by requesting it's api (#1600)
Browse files Browse the repository at this point in the history
  • Loading branch information
AstroProfundis authored Oct 27, 2021
1 parent e4b380e commit 956504c
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 30 deletions.
40 changes: 13 additions & 27 deletions pkg/cluster/operation/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func systemctlMonitor(ctx context.Context, hosts []string, noAgentHosts set.Stri
return nil
}

func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64) error {
func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64, tlsCfg *tls.Config) error {
e := ctxt.GetInner(ctx).Get(ins.GetHost())
log.Infof("\tRestarting instance %s", ins.ID())

Expand All @@ -319,7 +319,7 @@ func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64) err
}

// Check ready.
if err := ins.Ready(ctx, e, timeout); err != nil {
if err := ins.Ready(ctx, e, timeout, tlsCfg); err != nil {
return toFailedActionError(err, "restart", ins.GetHost(), ins.ServiceName(), ins.LogDir())
}

Expand All @@ -328,25 +328,6 @@ func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64) err
return nil
}

// RestartComponent restarts the component.
func RestartComponent(ctx context.Context, instances []spec.Instance, timeout uint64) error {
if len(instances) == 0 {
return nil
}

name := instances[0].ComponentName()
log.Infof("Restarting component %s", name)

for _, ins := range instances {
err := restartInstance(ctx, ins, timeout)
if err != nil {
return err
}
}

return nil
}

func enableInstance(ctx context.Context, ins spec.Instance, timeout uint64, isEnable bool) error {
e := ctxt.GetInner(ctx).Get(ins.GetHost())

Expand All @@ -366,7 +347,7 @@ func enableInstance(ctx context.Context, ins spec.Instance, timeout uint64, isEn
return nil
}

func startInstance(ctx context.Context, ins spec.Instance, timeout uint64) error {
func startInstance(ctx context.Context, ins spec.Instance, timeout uint64, tlsCfg *tls.Config) error {
e := ctxt.GetInner(ctx).Get(ins.GetHost())
log.Infof("\tStarting instance %s", ins.ID())

Expand All @@ -375,7 +356,7 @@ func startInstance(ctx context.Context, ins spec.Instance, timeout uint64) error
}

// Check ready.
if err := ins.Ready(ctx, e, timeout); err != nil {
if err := ins.Ready(ctx, e, timeout, tlsCfg); err != nil {
return toFailedActionError(err, "start", ins.GetHost(), ins.ServiceName(), ins.LogDir())
}

Expand Down Expand Up @@ -471,8 +452,13 @@ func StartComponent(ctx context.Context, instances []spec.Instance, noAgentHosts
// eg: PD has more strict restrictions on the capacity expansion process,
// that is, there should be only one node in the peer-join stage at most
// ref https://github.com/tikv/pd/blob/d38b36714ccee70480c39e07126e3456b5fb292d/server/join/join.go#L179-L191
if options.Operation == ScaleOutOperation && (name == spec.ComponentPD || name == spec.ComponentDMMaster) {
return serialStartInstances(ctx, instances, options, tlsCfg)
if options.Operation == ScaleOutOperation {
switch name {
case spec.ComponentPD,
spec.ComponentTiFlash,
spec.ComponentDMMaster:
return serialStartInstances(ctx, instances, options, tlsCfg)
}
}

errg, _ := errgroup.WithContext(ctx)
Expand All @@ -496,7 +482,7 @@ func StartComponent(ctx context.Context, instances []spec.Instance, noAgentHosts
if err := ins.PrepareStart(nctx, tlsCfg); err != nil {
return err
}
return startInstance(nctx, ins, options.OptTimeout)
return startInstance(nctx, ins, options.OptTimeout, tlsCfg)
})
}

Expand All @@ -508,7 +494,7 @@ func serialStartInstances(ctx context.Context, instances []spec.Instance, option
if err := ins.PrepareStart(ctx, tlsCfg); err != nil {
return err
}
if err := startInstance(ctx, ins, options.OptTimeout); err != nil {
if err := startInstance(ctx, ins, options.OptTimeout, tlsCfg); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/operation/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func upgradeInstance(ctx context.Context, topo spec.Topology, instance spec.Inst
}
}

if err := restartInstance(ctx, instance, options.OptTimeout); err != nil && !options.Force {
if err := restartInstance(ctx, instance, options.OptTimeout, tlsCfg); err != nil && !options.Force {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/spec/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type RollingUpdateInstance interface {
type Instance interface {
InstanceSpec
ID() string
Ready(context.Context, ctxt.Executor, uint64) error
Ready(context.Context, ctxt.Executor, uint64, *tls.Config) error
InitConfig(ctx context.Context, e ctxt.Executor, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
ScaleConfig(ctx context.Context, e ctxt.Executor, topo Topology, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
PrepareStart(ctx context.Context, tlsCfg *tls.Config) error
Expand Down Expand Up @@ -143,7 +143,7 @@ type BaseInstance struct {
}

// Ready implements Instance interface
func (i *BaseInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64) error {
func (i *BaseInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64, _ *tls.Config) error {
return PortStarted(ctx, e, i.Port, timeout)
}

Expand Down
59 changes: 59 additions & 0 deletions pkg/cluster/spec/tiflash.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"time"

"github.com/pingcap/errors"
perrs "github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
Expand Down Expand Up @@ -250,6 +253,11 @@ func (i *TiFlashInstance) GetServicePort() int {
return i.InstanceSpec.(*TiFlashSpec).FlashServicePort
}

// GetStatusPort returns the status port of TiFlash
func (i *TiFlashInstance) GetStatusPort() int {
return i.InstanceSpec.(*TiFlashSpec).FlashProxyStatusPort
}

// checkIncorrectKey checks TiFlash's key should not be set in config
func (i *TiFlashInstance) checkIncorrectKey(key string) error {
errMsg := "NOTE: TiFlash `%s` should NOT be set in topo's \"%s\" config, its value will be ignored, you should set `data_dir` in each host instead, please check your topology"
Expand Down Expand Up @@ -730,3 +738,54 @@ func (i *TiFlashInstance) PrepareStart(ctx context.Context, tlsCfg *tls.Config)
pdClient := api.NewPDClient(endpoints, 10*time.Second, tlsCfg)
return pdClient.UpdateReplicateConfig(bytes.NewBuffer(enablePlacementRules))
}

// Ready implements Instance interface
func (i *TiFlashInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64, tlsCfg *tls.Config) error {
// FIXME: the timeout is applied twice in the whole `Ready()` process, in the worst
// case it might wait double time as other components
if err := PortStarted(ctx, e, i.Port, timeout); err != nil {
return err
}

scheme := "http"
if i.topo.BaseTopo().GlobalOptions.TLSEnabled {
scheme = "https"
}
addr := fmt.Sprintf("%s://%s:%d/tiflash/store-status", scheme, i.Host, i.GetStatusPort())
req, err := http.NewRequest("GET", addr, nil)
if err != nil {
return err
}
req = req.WithContext(ctx)

retryOpt := utils.RetryOption{
Delay: time.Second,
Timeout: time.Second * time.Duration(timeout),
}
var queryErr error
if err := utils.Retry(func() error {
client := utils.NewHTTPClient(statusQueryTimeout, tlsCfg)
res, err := client.Client().Do(req)
if err != nil {
queryErr = err
return err
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
queryErr = err
return err
}
if res.StatusCode == http.StatusNotFound || string(body) == "Running" {
return nil
}

err = fmt.Errorf("tiflash store status '%s' not ready", string(body))
queryErr = err
return err
}, retryOpt); err != nil {
return errors.Annotatef(queryErr, "timed out waiting for tiflash %s:%d to be ready after %ds",
i.Host, i.Port, timeout)
}
return nil
}

0 comments on commit 956504c

Please sign in to comment.