From 99ea97621e08bdec11e7db5de5ef9a52b668da46 Mon Sep 17 00:00:00 2001
From: Allen Zhong <zhongbenli@pingcap.com>
Date: Wed, 20 Oct 2021 18:53:48 +0800
Subject: [PATCH 1/2] implement running status check for tiflash

---
 pkg/cluster/operation/action.go |  9 +++--
 pkg/cluster/spec/tiflash.go     | 59 +++++++++++++++++++++++++++++++++
 2 files changed, 66 insertions(+), 2 deletions(-)

diff --git a/pkg/cluster/operation/action.go b/pkg/cluster/operation/action.go
index 2ddd3f6a80..20e8be3892 100644
--- a/pkg/cluster/operation/action.go
+++ b/pkg/cluster/operation/action.go
@@ -471,8 +471,13 @@ func StartComponent(ctx context.Context, instances []spec.Instance, noAgentHosts
 	// eg: PD has more strict restrictions on the capacity expansion process,
 	// that is, there should be only one node in the peer-join stage at most
 	// ref https://github.com/tikv/pd/blob/d38b36714ccee70480c39e07126e3456b5fb292d/server/join/join.go#L179-L191
-	if options.Operation == ScaleOutOperation && (name == spec.ComponentPD || name == spec.ComponentDMMaster) {
-		return serialStartInstances(ctx, instances, options, tlsCfg)
+	if options.Operation == ScaleOutOperation {
+		switch name {
+		case spec.ComponentPD,
+			spec.ComponentTiFlash,
+			spec.ComponentDMMaster:
+			return serialStartInstances(ctx, instances, options, tlsCfg)
+		}
 	}
 
 	errg, _ := errgroup.WithContext(ctx)
diff --git a/pkg/cluster/spec/tiflash.go b/pkg/cluster/spec/tiflash.go
index 8e406cc283..1d7bac172b 100644
--- a/pkg/cluster/spec/tiflash.go
+++ b/pkg/cluster/spec/tiflash.go
@@ -19,12 +19,15 @@ import (
 	"crypto/tls"
 	"encoding/json"
 	"fmt"
+	"io"
+	"net/http"
 	"os"
 	"path/filepath"
 	"sort"
 	"strings"
 	"time"
 
+	"github.com/pingcap/errors"
 	perrs "github.com/pingcap/errors"
 	"github.com/pingcap/tiup/pkg/cluster/api"
 	"github.com/pingcap/tiup/pkg/cluster/ctxt"
@@ -250,6 +253,11 @@ func (i *TiFlashInstance) GetServicePort() int {
 	return i.InstanceSpec.(*TiFlashSpec).FlashServicePort
 }
 
+// GetStatusPort returns the status port of TiFlash
+func (i *TiFlashInstance) GetStatusPort() int {
+	return i.InstanceSpec.(*TiFlashSpec).FlashProxyStatusPort
+}
+
 // checkIncorrectKey checks TiFlash's key should not be set in config
 func (i *TiFlashInstance) checkIncorrectKey(key string) error {
 	errMsg := "NOTE: TiFlash `%s` should NOT be set in topo's \"%s\" config, its value will be ignored, you should set `data_dir` in each host instead, please check your topology"
@@ -730,3 +738,54 @@ func (i *TiFlashInstance) PrepareStart(ctx context.Context, tlsCfg *tls.Config)
 	pdClient := api.NewPDClient(endpoints, 10*time.Second, tlsCfg)
 	return pdClient.UpdateReplicateConfig(bytes.NewBuffer(enablePlacementRules))
 }
+
+// Ready implements Instance interface
+func (i *TiFlashInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64) error {
+	if err := PortStarted(ctx, e, i.Port, timeout); err != nil {
+		return err
+	}
+
+	scheme := "http"
+	if i.topo.BaseTopo().GlobalOptions.TLSEnabled {
+		scheme = "https"
+		// TODO: implement tls config for tiflash instances
+		// (we don't support tiflash instance in tls enabled cluster yet)
+	}
+	addr := fmt.Sprintf("%s://%s:%d/tiflash/store-status", scheme, i.Host, i.GetStatusPort())
+	req, err := http.NewRequest("GET", addr, nil)
+	if err != nil {
+		return err
+	}
+	req = req.WithContext(ctx)
+
+	retryOpt := utils.RetryOption{
+		Delay:   time.Second,
+		Timeout: time.Second * time.Duration(timeout),
+	}
+	var queryErr error
+	if err := utils.Retry(func() error {
+		client := utils.NewHTTPClient(statusQueryTimeout, nil)
+		res, err := client.Client().Do(req)
+		if err != nil {
+			queryErr = err
+			return err
+		}
+		defer res.Body.Close()
+		body, err := io.ReadAll(res.Body)
+		if err != nil {
+			queryErr = err
+			return err
+		}
+		if res.StatusCode == http.StatusNotFound || string(body) == "Running" {
+			return nil
+		}
+
+		err = fmt.Errorf("tiflash store status '%s' not ready", string(body))
+		queryErr = err
+		return err
+	}, retryOpt); err != nil {
+		return errors.Annotatef(queryErr, "timed out waiting for tiflash %s:%d to be ready after %ds",
+			i.Host, i.Port, timeout)
+	}
+	return nil
+}

From 5761491760848b734a9f51820aaf7749687492c6 Mon Sep 17 00:00:00 2001
From: Allen Zhong <zhongbenli@pingcap.com>
Date: Tue, 26 Oct 2021 17:24:25 +0800
Subject: [PATCH 2/2] implement tls support

---
 pkg/cluster/operation/action.go  | 31 ++++++-------------------------
 pkg/cluster/operation/upgrade.go |  2 +-
 pkg/cluster/spec/instance.go     |  4 ++--
 pkg/cluster/spec/tiflash.go      |  8 ++++----
 4 files changed, 13 insertions(+), 32 deletions(-)

diff --git a/pkg/cluster/operation/action.go b/pkg/cluster/operation/action.go
index 20e8be3892..5f4a6579a8 100644
--- a/pkg/cluster/operation/action.go
+++ b/pkg/cluster/operation/action.go
@@ -310,7 +310,7 @@ func systemctlMonitor(ctx context.Context, hosts []string, noAgentHosts set.Stri
 	return nil
 }
 
-func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64) error {
+func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64, tlsCfg *tls.Config) error {
 	e := ctxt.GetInner(ctx).Get(ins.GetHost())
 	log.Infof("\tRestarting instance %s", ins.ID())
 
@@ -319,7 +319,7 @@ func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64) err
 	}
 
 	// Check ready.
-	if err := ins.Ready(ctx, e, timeout); err != nil {
+	if err := ins.Ready(ctx, e, timeout, tlsCfg); err != nil {
 		return toFailedActionError(err, "restart", ins.GetHost(), ins.ServiceName(), ins.LogDir())
 	}
 
@@ -328,25 +328,6 @@ func restartInstance(ctx context.Context, ins spec.Instance, timeout uint64) err
 	return nil
 }
 
-// RestartComponent restarts the component.
-func RestartComponent(ctx context.Context, instances []spec.Instance, timeout uint64) error {
-	if len(instances) == 0 {
-		return nil
-	}
-
-	name := instances[0].ComponentName()
-	log.Infof("Restarting component %s", name)
-
-	for _, ins := range instances {
-		err := restartInstance(ctx, ins, timeout)
-		if err != nil {
-			return err
-		}
-	}
-
-	return nil
-}
-
 func enableInstance(ctx context.Context, ins spec.Instance, timeout uint64, isEnable bool) error {
 	e := ctxt.GetInner(ctx).Get(ins.GetHost())
 
@@ -366,7 +347,7 @@ func enableInstance(ctx context.Context, ins spec.Instance, timeout uint64, isEn
 	return nil
 }
 
-func startInstance(ctx context.Context, ins spec.Instance, timeout uint64) error {
+func startInstance(ctx context.Context, ins spec.Instance, timeout uint64, tlsCfg *tls.Config) error {
 	e := ctxt.GetInner(ctx).Get(ins.GetHost())
 	log.Infof("\tStarting instance %s", ins.ID())
 
@@ -375,7 +356,7 @@ func startInstance(ctx context.Context, ins spec.Instance, timeout uint64) error
 	}
 
 	// Check ready.
-	if err := ins.Ready(ctx, e, timeout); err != nil {
+	if err := ins.Ready(ctx, e, timeout, tlsCfg); err != nil {
 		return toFailedActionError(err, "start", ins.GetHost(), ins.ServiceName(), ins.LogDir())
 	}
 
@@ -501,7 +482,7 @@ func StartComponent(ctx context.Context, instances []spec.Instance, noAgentHosts
 			if err := ins.PrepareStart(nctx, tlsCfg); err != nil {
 				return err
 			}
-			return startInstance(nctx, ins, options.OptTimeout)
+			return startInstance(nctx, ins, options.OptTimeout, tlsCfg)
 		})
 	}
 
@@ -513,7 +494,7 @@ func serialStartInstances(ctx context.Context, instances []spec.Instance, option
 		if err := ins.PrepareStart(ctx, tlsCfg); err != nil {
 			return err
 		}
-		if err := startInstance(ctx, ins, options.OptTimeout); err != nil {
+		if err := startInstance(ctx, ins, options.OptTimeout, tlsCfg); err != nil {
 			return err
 		}
 	}
diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go
index 1cd89bfc9f..905568f883 100644
--- a/pkg/cluster/operation/upgrade.go
+++ b/pkg/cluster/operation/upgrade.go
@@ -147,7 +147,7 @@ func upgradeInstance(ctx context.Context, topo spec.Topology, instance spec.Inst
 		}
 	}
 
-	if err := restartInstance(ctx, instance, options.OptTimeout); err != nil && !options.Force {
+	if err := restartInstance(ctx, instance, options.OptTimeout, tlsCfg); err != nil && !options.Force {
 		return err
 	}
 
diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go
index 5e77b41914..80e740c94e 100644
--- a/pkg/cluster/spec/instance.go
+++ b/pkg/cluster/spec/instance.go
@@ -81,7 +81,7 @@ type RollingUpdateInstance interface {
 type Instance interface {
 	InstanceSpec
 	ID() string
-	Ready(context.Context, ctxt.Executor, uint64) error
+	Ready(context.Context, ctxt.Executor, uint64, *tls.Config) error
 	InitConfig(ctx context.Context, e ctxt.Executor, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
 	ScaleConfig(ctx context.Context, e ctxt.Executor, topo Topology, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
 	PrepareStart(ctx context.Context, tlsCfg *tls.Config) error
@@ -143,7 +143,7 @@ type BaseInstance struct {
 }
 
 // Ready implements Instance interface
-func (i *BaseInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64) error {
+func (i *BaseInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64, _ *tls.Config) error {
 	return PortStarted(ctx, e, i.Port, timeout)
 }
 
diff --git a/pkg/cluster/spec/tiflash.go b/pkg/cluster/spec/tiflash.go
index 1d7bac172b..087d22617b 100644
--- a/pkg/cluster/spec/tiflash.go
+++ b/pkg/cluster/spec/tiflash.go
@@ -740,7 +740,9 @@ func (i *TiFlashInstance) PrepareStart(ctx context.Context, tlsCfg *tls.Config)
 }
 
 // Ready implements Instance interface
-func (i *TiFlashInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64) error {
+func (i *TiFlashInstance) Ready(ctx context.Context, e ctxt.Executor, timeout uint64, tlsCfg *tls.Config) error {
+	// FIXME: the timeout is applied twice in the whole `Ready()` process, in the worst
+	// case it might wait double time as other components
 	if err := PortStarted(ctx, e, i.Port, timeout); err != nil {
 		return err
 	}
@@ -748,8 +750,6 @@ func (i *TiFlashInstance) Ready(ctx context.Context, e ctxt.Executor, timeout ui
 	scheme := "http"
 	if i.topo.BaseTopo().GlobalOptions.TLSEnabled {
 		scheme = "https"
-		// TODO: implement tls config for tiflash instances
-		// (we don't support tiflash instance in tls enabled cluster yet)
 	}
 	addr := fmt.Sprintf("%s://%s:%d/tiflash/store-status", scheme, i.Host, i.GetStatusPort())
 	req, err := http.NewRequest("GET", addr, nil)
@@ -764,7 +764,7 @@ func (i *TiFlashInstance) Ready(ctx context.Context, e ctxt.Executor, timeout ui
 	}
 	var queryErr error
 	if err := utils.Retry(func() error {
-		client := utils.NewHTTPClient(statusQueryTimeout, nil)
+		client := utils.NewHTTPClient(statusQueryTimeout, tlsCfg)
 		res, err := client.Client().Do(req)
 		if err != nil {
 			queryErr = err