Skip to content

Commit

Permalink
Enable blockWriter write pressure in stability test (#399)
Browse files Browse the repository at this point in the history
* Support running stability test out of cluster

Signed-off-by: Aylei <[email protected]>

* Gofmt

Signed-off-by: Aylei <[email protected]>

* Address comments

Signed-off-by: Aylei <[email protected]>

* Address review comments

Signed-off-by: Aylei <[email protected]>

* Add local statbility test doc

Signed-off-by: Aylei <[email protected]>

* Refine documents

Signed-off-by: Aylei <[email protected]>

* Temp

* Enable block wirter write pressure in stability test

Signed-off-by: Aylei <[email protected]>

* Remove error merged file

Signed-off-by: Aylei <[email protected]>

* Reduce default concurrency

Signed-off-by: Aylei <[email protected]>

* Address review comments

Signed-off-by: Aylei <[email protected]>
  • Loading branch information
aylei authored Apr 17, 2019
1 parent 5a7db94 commit abf4565
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 48 deletions.
35 changes: 20 additions & 15 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ import (
)

const (
defaultTableNum int = 64
defaultConcurrency = 512
defaultBatchSize = 100
defaultRawSize = 100

period = 5 * time.Minute
)

Expand Down Expand Up @@ -87,7 +82,8 @@ type OperatorActions interface {
CheckTidbClusterStatus(info *TidbClusterConfig) error
CheckTidbClusterStatusOrDie(info *TidbClusterConfig)
BeginInsertDataTo(info *TidbClusterConfig) error
StopInsertDataTo(info *TidbClusterConfig) error
BeginInsertDataToOrDie(info *TidbClusterConfig)
StopInsertDataTo(info *TidbClusterConfig)
ScaleTidbCluster(info *TidbClusterConfig) error
ScaleTidbClusterOrDie(info *TidbClusterConfig)
CheckScaleInSafely(info *TidbClusterConfig) error
Expand Down Expand Up @@ -156,6 +152,8 @@ type TidbClusterConfig struct {
UserName string
InitSecretName string
BackupSecretName string

BlockWriteConfig blockwriter.Config
}

func (tc *TidbClusterConfig) BackupHelmSetString(m map[string]string) string {
Expand Down Expand Up @@ -321,12 +319,8 @@ func (oa *operatorActions) DeployTidbCluster(info *TidbClusterConfig) error {
}

// init blockWriter case
info.blockWriter = blockwriter.NewBlockWriterCase(blockwriter.Config{
TableNum: defaultTableNum,
Concurrency: defaultConcurrency,
BatchSize: defaultBatchSize,
RawSize: defaultRawSize,
})
info.blockWriter = blockwriter.NewBlockWriterCase(info.BlockWriteConfig)
info.blockWriter.ClusterName = info.ClusterName

return nil
}
Expand Down Expand Up @@ -482,17 +476,28 @@ func (oa *operatorActions) CheckTidbClusterStatusOrDie(info *TidbClusterConfig)

func (oa *operatorActions) BeginInsertDataTo(info *TidbClusterConfig) error {
dsn := getDSN(info.Namespace, info.ClusterName, "test", info.Password)
db, err := util.OpenDB(dsn, defaultConcurrency)
if info.blockWriter == nil {
return fmt.Errorf("block writer not initialized for cluster: %s", info.ClusterName)
}
glog.Infof("[%s] [%s] open TiDB connections, concurrency: %d",
info.blockWriter, info.ClusterName, info.blockWriter.GetConcurrency())
db, err := util.OpenDB(dsn, info.blockWriter.GetConcurrency())
if err != nil {
return err
}

return info.blockWriter.Start(db)
}

func (oa *operatorActions) StopInsertDataTo(info *TidbClusterConfig) error {
func (oa *operatorActions) BeginInsertDataToOrDie(info *TidbClusterConfig) {
err := oa.BeginInsertDataTo(info)
if err != nil {
panic(err)
}
}

func (oa *operatorActions) StopInsertDataTo(info *TidbClusterConfig) {
info.blockWriter.Stop()
return nil
}

func chartPath(name string, tag string) string {
Expand Down
21 changes: 10 additions & 11 deletions tests/backup/backupcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ func NewBackupCase(operator tests.OperatorActions, srcCluster *tests.TidbCluster
}

func (bc *BackupCase) Run() error {
//err := bc.operator.StopInsertDataTo(bc.srcCluster)
//if err != nil {
// glog.Errorf("cluster:[%s] stop insert data failed,error: %v", bc.srcCluster.ClusterName, err)
// return err
//}

// pause write pressure during backup
bc.operator.StopInsertDataTo(bc.srcCluster)
defer func() {
go func() {
if err := bc.operator.BeginInsertDataTo(bc.srcCluster); err != nil {
glog.Errorf("cluster:[%s] begin insert data failed,error: %v", bc.srcCluster.ClusterName, err)
}
}()
}()

err := bc.operator.DeployAdHocBackup(bc.srcCluster)
if err != nil {
Expand Down Expand Up @@ -119,12 +124,6 @@ func (bc *BackupCase) Run() error {
return fmt.Errorf("cluster:[%s] the src cluster data[%d] is not equals des cluster data[%d]", bc.srcCluster.FullName(), srcCount, desCount)
}

//err = bc.operator.BeginInsertDataTo(bc.srcCluster)
//if err != nil {
// glog.Errorf("cluster:[%s] begin insert data failed,error: %v", bc.srcCluster.ClusterName, err)
// return err
//}

return nil
}

Expand Down
18 changes: 10 additions & 8 deletions tests/cmd/stability/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ func main() {
"tidb.resources.requests.memory": "1Gi",
"monitor.persistent": "true",
},
Args: map[string]string{},
Monitor: true,
Args: map[string]string{},
Monitor: true,
BlockWriteConfig: conf.BlockWriter,
}
cluster2 := &tests.TidbClusterConfig{
Namespace: clusterName2,
Expand Down Expand Up @@ -123,8 +124,9 @@ func main() {
// TODO assert the the monitor's pvc exist and clean it when bootstrapping
"monitor.persistent": "true",
},
Args: map[string]string{},
Monitor: true,
Args: map[string]string{},
Monitor: true,
BlockWriteConfig: conf.BlockWriter,
}

// cluster backup and restore
Expand Down Expand Up @@ -154,10 +156,10 @@ func main() {
oa.CheckTidbClusterStatusOrDie(cluster1)
oa.CheckTidbClusterStatusOrDie(cluster2)

//go func() {
// oa.BeginInsertDataTo(cluster1)
// oa.BeginInsertDataTo(cluster2)
//}()
go oa.BeginInsertDataToOrDie(cluster1)
defer oa.StopInsertDataTo(cluster1)
go oa.BeginInsertDataToOrDie(cluster2)
defer oa.StopInsertDataTo(cluster2)

// TODO add DDL
//var workloads []workload.Workload
Expand Down
20 changes: 19 additions & 1 deletion tests/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@ package tests
import (
"flag"
"fmt"
"github.com/pingcap/tidb-operator/tests/pkg/blockwriter"
"io/ioutil"
"strings"

"github.com/golang/glog"
"gopkg.in/yaml.v2"
)

const (
defaultTableNum int = 64
defaultConcurrency = 128
defaultBatchSize = 100
defaultRawSize = 100
)

// Config defines the config of operator tests
type Config struct {
configFile string
Expand All @@ -23,6 +31,9 @@ type Config struct {
ETCDs []Nodes `yaml:"etcds" json:"etcds"`
APIServers []Nodes `yaml:"apiservers" json:"apiservers"`

// Block writer
BlockWriter blockwriter.Config `yaml:"block_writer,omitempty"`

// For local test
OperatorRepoDir string `yaml:"operator_repo_dir" json:"operator_repo_dir"`
}
Expand All @@ -35,7 +46,14 @@ type Nodes struct {

// NewConfig creates a new config.
func NewConfig() *Config {
cfg := &Config{}
cfg := &Config{
BlockWriter: blockwriter.Config{
TableNum: defaultTableNum,
Concurrency: defaultConcurrency,
BatchSize: defaultBatchSize,
RawSize: defaultRawSize,
},
}
flag.StringVar(&cfg.configFile, "config", "", "Config file")
flag.StringVar(&cfg.LogDir, "log-dir", "/logDir", "log directory")
flag.IntVar(&cfg.FaultTriggerPort, "fault-trigger-port", 23332, "the http port of fault trigger service")
Expand Down
32 changes: 19 additions & 13 deletions tests/pkg/blockwriter/blockwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,24 @@ const (

// BlockWriterCase is for concurrent writing blocks.
type BlockWriterCase struct {
cfg Config
bws []*blockWriter

isRunning uint32
isInit uint32
stopChan chan struct{}

cfg Config
ClusterName string

sync.RWMutex
}

// Config defines the config of BlockWriterCase
type Config struct {
TableNum int
Concurrency int
BatchSize int
RawSize int
TableNum int `yaml:"table_num" json:"table_num"`
Concurrency int `yaml:"concurrency" json:"concurrency"`
BatchSize int `yaml:"batch_size" json:"batch_size"`
RawSize int `yaml:"raw_size" json:"raw_size"`
}

type blockWriter struct {
Expand All @@ -73,6 +75,10 @@ func NewBlockWriterCase(cfg Config) *BlockWriterCase {
return c
}

func (c *BlockWriterCase) GetConcurrency() int {
return c.cfg.Concurrency
}

func (c *BlockWriterCase) initBlocks() {
c.bws = make([]*blockWriter, c.cfg.Concurrency)
for i := 0; i < c.cfg.Concurrency; i++ {
Expand All @@ -90,7 +96,7 @@ func (c *BlockWriterCase) newBlockWriter() *blockWriter {

func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []string, wg *sync.WaitGroup) {
defer func() {
glog.Infof("[%s] [action: generate Query] stopped", c)
glog.Infof("[%s] [%s] [action: generate Query] stopped", c, c.ClusterName)
wg.Done()
}()

Expand Down Expand Up @@ -121,7 +127,7 @@ func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []st
if len(queryChan) < queryChanSize {
queryChan <- querys
} else {
glog.Infof("[%s] [action: generate Query] query channel is full, sleep 10 seconds", c)
glog.Infof("[%s] [%s] [action: generate Query] query channel is full, sleep 10 seconds", c, c.ClusterName)
util.Sleep(ctx, 10*time.Second)
}
}
Expand All @@ -131,7 +137,7 @@ func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []st
func (bw *blockWriter) batchExecute(db *sql.DB, query string) error {
_, err := db.Exec(query)
if err != nil {
glog.V(4).Infof("[block_writer] exec sql [%s] failed, err: %v", query, err)
glog.V(4).Infof("[%s] exec sql [%s] failed, err: %v", query, err)
return err
}

Expand Down Expand Up @@ -169,10 +175,10 @@ func (bw *blockWriter) run(ctx context.Context, db *sql.DB, queryChan chan []str

// Initialize inits case
func (c *BlockWriterCase) initialize(db *sql.DB) error {
glog.Infof("[%s] start to init...", c)
glog.Infof("[%s] [%s] start to init...", c, c.ClusterName)
defer func() {
atomic.StoreUint32(&c.isInit, 1)
glog.Infof("[%s] init end...", c)
glog.Infof("[%s] [%s] init end...", c, c.ClusterName)
}()

for i := 0; i < c.cfg.TableNum; i++ {
Expand Down Expand Up @@ -210,14 +216,14 @@ func (c *BlockWriterCase) initialize(db *sql.DB) error {
// Start starts to run cases
func (c *BlockWriterCase) Start(db *sql.DB) error {
if !atomic.CompareAndSwapUint32(&c.isRunning, 0, 1) {
err := fmt.Errorf("[%s] is running, you can't start it again", c)
err := fmt.Errorf("[%s] [%s] is running, you can't start it again", c, c.ClusterName)
glog.Error(err)
return nil
}

defer func() {
c.RLock()
glog.Infof("[%s] stopped", c)
glog.Infof("[%s] [%s] stopped", c, c.ClusterName)
atomic.SwapUint32(&c.isRunning, 0)
}()

Expand All @@ -227,7 +233,7 @@ func (c *BlockWriterCase) Start(db *sql.DB) error {
}
}

glog.Infof("[%s] start to execute case...", c)
glog.Infof("[%s] [%s] start to execute case...", c, c.ClusterName)

var wg sync.WaitGroup

Expand Down

0 comments on commit abf4565

Please sign in to comment.