Skip to content

Commit

Permalink
tso: support multi-keyspace, fault injection and keyspace-name in pd-…
Browse files Browse the repository at this point in the history
…tso-bench (tikv#6608)

ref tikv#5895

support multi-keyspace, fault injection and keyspace-name in pd-tso-bench

Signed-off-by: Bin Shi <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent ff1e38e commit cf0240c
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 46 deletions.
8 changes: 8 additions & 0 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,14 @@ func (c *tsoServiceDiscovery) updateMember() error {
}
}

oldGroupID := c.GetKeyspaceGroupID()
if oldGroupID != keyspaceGroup.Id {
log.Info("[tso] the keyspace group changed",
zap.Uint32("keyspace-id", keyspaceGroup.Id),
zap.Uint32("new-keyspace-group-id", keyspaceGroup.Id),
zap.Uint32("old-keyspace-group-id", oldGroupID))
}

// Initialize the serving addresses from the returned keyspace group info.
primaryAddr := ""
secondaryAddrs := make([]string, 0)
Expand Down
157 changes: 111 additions & 46 deletions tools/pd-tso-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"flag"
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"os"
Expand All @@ -43,20 +44,25 @@ const (
)

var (
pdAddrs = flag.String("pd", "127.0.0.1:2379", "pd address")
clientNumber = flag.Int("client", 1, "the number of pd clients involved in each benchmark")
concurrency = flag.Int("c", 1000, "concurrency")
count = flag.Int("count", 1, "the count number that the test will run")
duration = flag.Duration("duration", 60*time.Second, "how many seconds the test will last")
dcLocation = flag.String("dc", "global", "which dc-location this bench will request")
verbose = flag.Bool("v", false, "output statistics info every interval and output metrics info at the end")
interval = flag.Duration("interval", time.Second, "interval to output the statistics")
caPath = flag.String("cacert", "", "path of file that contains list of trusted SSL CAs")
certPath = flag.String("cert", "", "path of file that contains X509 certificate in PEM format")
keyPath = flag.String("key", "", "path of file that contains X509 key in PEM format")
maxBatchWaitInterval = flag.Duration("batch-interval", 0, "the max batch wait interval")
enableTSOFollowerProxy = flag.Bool("enable-tso-follower-proxy", false, "whether enable the TSO Follower Proxy")
wg sync.WaitGroup
pdAddrs = flag.String("pd", "127.0.0.1:2379", "pd address")
clientNumber = flag.Int("client", 1, "the number of pd clients involved in each benchmark")
concurrency = flag.Int("c", 1000, "concurrency")
count = flag.Int("count", 1, "the count number that the test will run")
duration = flag.Duration("duration", 60*time.Second, "how many seconds the test will last")
dcLocation = flag.String("dc", "global", "which dc-location this bench will request")
verbose = flag.Bool("v", false, "output statistics info every interval and output metrics info at the end")
interval = flag.Duration("interval", time.Second, "interval to output the statistics")
caPath = flag.String("cacert", "", "path of file that contains list of trusted SSL CAs")
certPath = flag.String("cert", "", "path of file that contains X509 certificate in PEM format")
keyPath = flag.String("key", "", "path of file that contains X509 key in PEM format")
maxBatchWaitInterval = flag.Duration("batch-interval", 0, "the max batch wait interval")
enableTSOFollowerProxy = flag.Bool("enable-tso-follower-proxy", false, "whether enable the TSO Follower Proxy")
enableFaultInjection = flag.Bool("enable-fault-injection", false, "whether enable fault injection")
faultInjectionRate = flag.Float64("fault-injection-rate", 0.01, "the failure rate [0.0001, 1]. 0.01 means 1% failure rate")
maxTSOSendIntervalMilliseconds = flag.Int("max-send-interval-ms", 0, "max tso send interval in milliseconds, 60s by default")
keyspaceID = flag.Uint("keyspace-id", 0, "the id of the keyspace to access")
keyspaceName = flag.String("keyspace-name", "", "the name of the keyspace to access")
wg sync.WaitGroup
)

var promServer *httptest.Server
Expand Down Expand Up @@ -97,26 +103,7 @@ func bench(mainCtx context.Context) {
fmt.Printf("Create %d client(s) for benchmark\n", *clientNumber)
pdClients := make([]pd.Client, *clientNumber)
for idx := range pdClients {
var (
pdCli pd.Client
err error
)

opt := pd.WithGRPCDialOptions(
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: keepaliveTime,
Timeout: keepaliveTimeout,
}),
)

pdCli, err = pd.NewClientWithContext(mainCtx, []string{*pdAddrs}, pd.SecurityOption{
CAPath: *caPath,
CertPath: *certPath,
KeyPath: *keyPath,
}, opt)

pdCli.UpdateOption(pd.MaxTSOBatchWaitInterval, *maxBatchWaitInterval)
pdCli.UpdateOption(pd.EnableTSOFollowerProxy, *enableTSOFollowerProxy)
pdCli, err := createPDClient(mainCtx)
if err != nil {
log.Fatal(fmt.Sprintf("create pd client #%d failed: %v", idx, err))
}
Expand All @@ -134,10 +121,18 @@ func bench(mainCtx context.Context) {

durCh := make(chan time.Duration, 2*(*concurrency)*(*clientNumber))

wg.Add((*concurrency) * (*clientNumber))
for _, pdCli := range pdClients {
for i := 0; i < *concurrency; i++ {
go reqWorker(ctx, pdCli, durCh)
if *enableFaultInjection {
fmt.Printf("Enable fault injection, failure rate: %f\n", *faultInjectionRate)
wg.Add(*clientNumber)
for i := 0; i < *clientNumber; i++ {
go reqWorker(ctx, pdClients, i, durCh)
}
} else {
wg.Add((*concurrency) * (*clientNumber))
for i := 0; i < *clientNumber; i++ {
for j := 0; j < *concurrency; j++ {
go reqWorker(ctx, pdClients, i, durCh)
}
}
}

Expand Down Expand Up @@ -352,22 +347,51 @@ func (s *stats) calculate(count int) float64 {
return float64(count) * 100 / float64(s.count)
}

func reqWorker(ctx context.Context, pdCli pd.Client, durCh chan time.Duration) {
func reqWorker(ctx context.Context, pdClients []pd.Client, clientIdx int, durCh chan time.Duration) {
defer wg.Done()

reqCtx, cancel := context.WithCancel(ctx)
defer cancel()
var (
err error
maxRetryTime int = 120
sleepIntervalOnFailure time.Duration = 1000 * time.Millisecond
totalSleepBeforeGetTS time.Duration
)
pdCli := pdClients[clientIdx]

for {
if pdCli == nil || (*enableFaultInjection && shouldInjectFault()) {
if pdCli != nil {
pdCli.Close()
}
pdCli, err = createPDClient(ctx)
if err != nil {
log.Error(fmt.Sprintf("re-create pd client #%d failed: %v", clientIdx, err))
select {
case <-reqCtx.Done():
case <-time.After(100 * time.Millisecond):
}
continue
}
pdClients[clientIdx] = pdCli
}

totalSleepBeforeGetTS = 0
start := time.Now()

var (
i int32
err error
maxRetryTime int32 = 50
sleepIntervalOnFailure time.Duration = 100 * time.Millisecond
)
i := 0
for ; i < maxRetryTime; i++ {
if *maxTSOSendIntervalMilliseconds > 0 {
sleepBeforeGetTS := time.Duration(rand.Intn(*maxTSOSendIntervalMilliseconds)) * time.Millisecond
ticker := time.NewTicker(sleepBeforeGetTS)
defer ticker.Stop()
select {
case <-reqCtx.Done():
case <-ticker.C:
totalSleepBeforeGetTS += sleepBeforeGetTS
}
}
_, _, err = pdCli.GetLocalTS(reqCtx, *dcLocation)
if errors.Cause(err) == context.Canceled {
return
Expand All @@ -381,7 +405,7 @@ func reqWorker(ctx context.Context, pdCli pd.Client, durCh chan time.Duration) {
if err != nil {
log.Fatal(fmt.Sprintf("%v", err))
}
dur := time.Since(start) - time.Duration(i)*sleepIntervalOnFailure
dur := time.Since(start) - time.Duration(i)*sleepIntervalOnFailure - totalSleepBeforeGetTS

select {
case <-reqCtx.Done():
Expand All @@ -390,3 +414,44 @@ func reqWorker(ctx context.Context, pdCli pd.Client, durCh chan time.Duration) {
}
}
}

func createPDClient(ctx context.Context) (pd.Client, error) {
var (
pdCli pd.Client
err error
)

opts := make([]pd.ClientOption, 0)
opts = append(opts, pd.WithGRPCDialOptions(
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: keepaliveTime,
Timeout: keepaliveTimeout,
}),
))

if len(*keyspaceName) > 0 {
apiCtx := pd.NewAPIContextV2(*keyspaceName)
pdCli, err = pd.NewClientWithAPIContext(ctx, apiCtx, []string{*pdAddrs}, pd.SecurityOption{
CAPath: *caPath,
CertPath: *certPath,
KeyPath: *keyPath,
}, opts...)
} else {
pdCli, err = pd.NewClientWithKeyspace(ctx, uint32(*keyspaceID), []string{*pdAddrs}, pd.SecurityOption{
CAPath: *caPath,
CertPath: *certPath,
KeyPath: *keyPath,
}, opts...)
}
if err != nil {
return nil, err
}

pdCli.UpdateOption(pd.MaxTSOBatchWaitInterval, *maxBatchWaitInterval)
pdCli.UpdateOption(pd.EnableTSOFollowerProxy, *enableTSOFollowerProxy)
return pdCli, err
}

func shouldInjectFault() bool {
return rand.Intn(10000) < int(*faultInjectionRate*10000)
}

0 comments on commit cf0240c

Please sign in to comment.