Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: check tiflash status by requesting it's api #1600

Merged
merged 2 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 13 additions & 27 deletions pkg/cluster/operation/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func systemctlMonitor(ctx context.Context, hosts []string, noAgentHosts set.Stri
return nil
}

func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64) error {
func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64, tlsCfg *tls.Config) error {
e := ctxt.GetInner(ctx).Get(ins.GetHost())
log.Infof("\tRestarting instance %s", ins.ID())

Expand All @@ -319,7 +319,7 @@ func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64) err
}

// Check ready.
if err := ins.Ready(ctx, e, timeout); err != nil {
if err := ins.Ready(ctx, e, timeout, tlsCfg); err != nil {
return toFailedActionError(err, "restart", ins.GetHost(), ins.ServiceName(), ins.LogDir())
}

Expand All @@ -328,25 +328,6 @@ func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64) err
return nil
}

// RestartComponent restarts the component.
func RestartComponent(ctx context.Context, instances []spec.Instance, timeout uint64) error {
if len(instances) == 0 {
return nil
}

name := instances[0].ComponentName()
log.Infof("Restarting component %s", name)

for _, ins := range instances {
err := restartInstance(ctx, ins, timeout)
if err != nil {
return err
}
}

return nil
}

func enableInstance(ctx context.Context, ins spec.Instance, timeout uint64, isEnable bool) error {
e := ctxt.GetInner(ctx).Get(ins.GetHost())

Expand All @@ -366,7 +347,7 @@ func enableInstance(ctx context.Context, ins spec.Instance, timeout uint64, isEn
return nil
}

func startInstance(ctx context.Context, ins spec.Instance, timeout uint64) error {
func startInstance(ctx context.Context, ins spec.Instance, timeout uint64, tlsCfg *tls.Config) error {
e := ctxt.GetInner(ctx).Get(ins.GetHost())
log.Infof("\tStarting instance %s", ins.ID())

Expand All @@ -375,7 +356,7 @@ func startInstance(ctx context.Context, ins spec.Instance, timeout uint64) error
}

// Check ready.
if err := ins.Ready(ctx, e, timeout); err != nil {
if err := ins.Ready(ctx, e, timeout, tlsCfg); err != nil {
return toFailedActionError(err, "start", ins.GetHost(), ins.ServiceName(), ins.LogDir())
}

Expand Down Expand Up @@ -471,8 +452,13 @@ func StartComponent(ctx context.Context, instances []spec.Instance, noAgentHosts
// eg: PD has more strict restrictions on the capacity expansion process,
// that is, there should be only one node in the peer-join stage at most
// ref https://github.com/tikv/pd/blob/d38b36714ccee70480c39e07126e3456b5fb292d/server/join/join.go#L179-L191
if options.Operation == ScaleOutOperation && (name == spec.ComponentPD || name == spec.ComponentDMMaster) {
return serialStartInstances(ctx, instances, options, tlsCfg)
if options.Operation == ScaleOutOperation {
switch name {
case spec.ComponentPD,
spec.ComponentTiFlash,
spec.ComponentDMMaster:
return serialStartInstances(ctx, instances, options, tlsCfg)
}
}

errg, _ := errgroup.WithContext(ctx)
Expand All @@ -496,7 +482,7 @@ func StartComponent(ctx context.Context, instances []spec.Instance, noAgentHosts
if err := ins.PrepareStart(nctx, tlsCfg); err != nil {
return err
}
return startInstance(nctx, ins, options.OptTimeout)
return startInstance(nctx, ins, options.OptTimeout, tlsCfg)
})
}

Expand All @@ -508,7 +494,7 @@ func serialStartInstances(ctx context.Context, instances []spec.Instance, option
if err := ins.PrepareStart(ctx, tlsCfg); err != nil {
return err
}
if err := startInstance(ctx, ins, options.OptTimeout); err != nil {
if err := startInstance(ctx, ins, options.OptTimeout, tlsCfg); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/operation/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func upgradeInstance(ctx context.Context, topo spec.Topology, instance spec.Inst
}
}

if err := restartInstance(ctx, instance, options.OptTimeout); err != nil && !options.Force {
if err := restartInstance(ctx, instance, options.OptTimeout, tlsCfg); err != nil && !options.Force {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/spec/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type RollingUpdateInstance interface {
type Instance interface {
InstanceSpec
ID() string
Ready(context.Context, ctxt.Executor, uint64) error
Ready(context.Context, ctxt.Executor, uint64, *tls.Config) error
InitConfig(ctx context.Context, e ctxt.Executor, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
ScaleConfig(ctx context.Context, e ctxt.Executor, topo Topology, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
PrepareStart(ctx context.Context, tlsCfg *tls.Config) error
Expand Down Expand Up @@ -143,7 +143,7 @@ type BaseInstance struct {
}

// Ready implements Instance interface
func (i *BaseInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64) error {
func (i *BaseInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64, _ *tls.Config) error {
return PortStarted(ctx, e, i.Port, timeout)
}

Expand Down
59 changes: 59 additions & 0 deletions pkg/cluster/spec/tiflash.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"time"

"github.com/pingcap/errors"
perrs "github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
Expand Down Expand Up @@ -250,6 +253,11 @@ func (i *TiFlashInstance) GetServicePort() int {
return i.InstanceSpec.(*TiFlashSpec).FlashServicePort
}

// GetStatusPort returns the status port of TiFlash
func (i *TiFlashInstance) GetStatusPort() int {
return i.InstanceSpec.(*TiFlashSpec).FlashProxyStatusPort
}

// checkIncorrectKey checks TiFlash's key should not be set in config
func (i *TiFlashInstance) checkIncorrectKey(key string) error {
errMsg := "NOTE: TiFlash `%s` should NOT be set in topo's \"%s\" config, its value will be ignored, you should set `data_dir` in each host instead, please check your topology"
Expand Down Expand Up @@ -730,3 +738,54 @@ func (i *TiFlashInstance) PrepareStart(ctx context.Context, tlsCfg *tls.Config)
pdClient := api.NewPDClient(endpoints, 10*time.Second, tlsCfg)
return pdClient.UpdateReplicateConfig(bytes.NewBuffer(enablePlacementRules))
}

// Ready implements Instance interface
func (i *TiFlashInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64, tlsCfg *tls.Config) error {
// FIXME: the timeout is applied twice in the whole `Ready()` process, in the worst
// case it might wait double time as other components
if err := PortStarted(ctx, e, i.Port, timeout); err != nil {
return err
}

scheme := "http"
if i.topo.BaseTopo().GlobalOptions.TLSEnabled {
scheme = "https"
}
addr := fmt.Sprintf("%s://%s:%d/tiflash/store-status", scheme, i.Host, i.GetStatusPort())
req, err := http.NewRequest("GET", addr, nil)
if err != nil {
return err
}
req = req.WithContext(ctx)

retryOpt := utils.RetryOption{
Delay: time.Second,
Timeout: time.Second * time.Duration(timeout),
}
var queryErr error
if err := utils.Retry(func() error {
client := utils.NewHTTPClient(statusQueryTimeout, tlsCfg)
res, err := client.Client().Do(req)
if err != nil {
queryErr = err
return err
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
queryErr = err
return err
}
if res.StatusCode == http.StatusNotFound || string(body) == "Running" {
return nil
}

err = fmt.Errorf("tiflash store status '%s' not ready", string(body))
queryErr = err
return err
}, retryOpt); err != nil {
return errors.Annotatef(queryErr, "timed out waiting for tiflash %s:%d to be ready after %ds",
i.Host, i.Port, timeout)
}
return nil
}