Skip to content

Commit

Permalink
Merge branch 'master' into disable-scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Sep 5, 2023
2 parents c2a2ddd + 1d8f89b commit 52c21bf
Show file tree
Hide file tree
Showing 27 changed files with 373 additions and 640 deletions.
8 changes: 0 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,6 @@ func WithInitMetricsOption(initMetrics bool) ClientOption {
}
}

// WithAllowTSOFallback configures the client with `allowTSOFallback` option.
// NOTICE: This should only be used for testing.
func WithAllowTSOFallback() ClientOption {
return func(c *client) {
c.option.allowTSOFallback = true
}
}

var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching.
Expand Down
1 change: 0 additions & 1 deletion client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type option struct {
enableForwarding bool
metricsLabels prometheus.Labels
initMetrics bool
allowTSOFallback bool

// Dynamic options.
dynamicOptions [dynamicOptionCount]atomic.Value
Expand Down
17 changes: 1 addition & 16 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,22 +796,7 @@ func (c *tsoClient) compareAndSwapTS(
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
if !c.option.allowTSOFallback {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID),
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt))
}
log.Error("[tso] timestamp fallback",
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
Expand Down
18 changes: 3 additions & 15 deletions pkg/audit/audit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"testing"
"time"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/requestutil"
"github.com/tikv/pd/pkg/utils/testutil"
)

func TestLabelMatcher(t *testing.T) {
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestLocalLogBackendUsingFile(t *testing.T) {
t.Parallel()
re := require.New(t)
backend := NewLocalLogBackend(true)
fname := initLog()
fname := testutil.InitTempFileLogger("info")
defer os.RemoveAll(fname)
req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:2379/test?test=test", strings.NewReader("testBody"))
re.False(backend.ProcessHTTPRequest(req))
Expand Down Expand Up @@ -125,7 +125,7 @@ func BenchmarkLocalLogAuditUsingTerminal(b *testing.B) {
func BenchmarkLocalLogAuditUsingFile(b *testing.B) {
b.StopTimer()
backend := NewLocalLogBackend(true)
fname := initLog()
fname := testutil.InitTempFileLogger("info")
defer os.RemoveAll(fname)
req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:2379/test?test=test", strings.NewReader("testBody"))
b.StartTimer()
Expand All @@ -135,15 +135,3 @@ func BenchmarkLocalLogAuditUsingFile(b *testing.B) {
backend.ProcessHTTPRequest(req)
}
}

func initLog() string {
cfg := &log.Config{}
f, _ := os.CreateTemp("/tmp", "pd_tests")
fname := f.Name()
f.Close()
cfg.File.Filename = fname
cfg.Level = "info"
lg, p, _ := log.InitLogger(cfg)
log.ReplaceGlobals(lg, p)
return fname
}
132 changes: 25 additions & 107 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ package election

import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/testutil"
Expand All @@ -35,27 +33,15 @@ const defaultLeaseTimeout = 1

func TestLeadership(t *testing.T) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

<-etcd.Server.ReadyNotify()
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()

// Campaign the same leadership
leadership1 := NewLeadership(client, "/test_leader", "test_leader_1")
leadership2 := NewLeadership(client, "/test_leader", "test_leader_2")

// leadership1 starts first and get the leadership
err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1")
err := leadership1.Campaign(defaultLeaseTimeout, "test_leader_1")
re.NoError(err)
// leadership2 starts then and can not get the leadership
err = leadership2.Campaign(defaultLeaseTimeout, "test_leader_2")
Expand Down Expand Up @@ -168,60 +154,24 @@ func TestExitWatch(t *testing.T) {
// Case6: transfer leader without client reconnection.
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() {
cfg1 := server.Config()
cfg2 := etcdutil.NewTestSingleConfig(t)
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting
peerURL := cfg2.LPUrls[0].String()
addResp, err := etcdutil.AddEtcdMember(client, []string{peerURL})
re.NoError(err)
etcd2, err := embed.StartEtcd(cfg2)
etcd2 := etcdutil.MustAddEtcdMember(t, &cfg1, client)
client2, err := etcdutil.CreateEtcdClient(nil, etcd2.Config().LCUrls)
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)
<-etcd2.Server.ReadyNotify()
ep := cfg2.LCUrls[0].String()
client1, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

// close the original leader
server.Server.HardStop()
client1.Delete(context.Background(), leaderKey)
// delete the leader key with the new client
client2.Delete(context.Background(), leaderKey)
return func() {
etcd2.Close()
client2.Close()
}
})
// Case7: loss the quorum when the watch loop is running
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() {
tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests")
defer os.RemoveAll(tempStdoutFile.Name())
logCfg := &log.Config{}
logCfg.File.Filename = tempStdoutFile.Name()
logCfg.Level = "info"
lg, p, _ := log.InitLogger(logCfg)
log.ReplaceGlobals(lg, p)

cfg1 := server.Config()
cfg2 := etcdutil.NewTestSingleConfig(t)
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting
peerURL := cfg2.LPUrls[0].String()
addResp, err := etcdutil.AddEtcdMember(client, []string{peerURL})
re.NoError(err)
etcd2, err := embed.StartEtcd(cfg2)
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)
<-etcd2.Server.ReadyNotify()

cfg3 := etcdutil.NewTestSingleConfig(t)
cfg3.InitialCluster = cfg2.InitialCluster + fmt.Sprintf(",%s=%s", cfg3.Name, &cfg3.LPUrls[0])
cfg3.ClusterState = embed.ClusterStateFlagExisting
peerURL = cfg3.LPUrls[0].String()
addResp, err = etcdutil.AddEtcdMember(client, []string{peerURL})
re.NoError(err)
etcd3, err := embed.StartEtcd(cfg3)
re.NoError(err)
re.Equal(uint64(etcd3.Server.ID()), addResp.Member.ID)
<-etcd3.Server.ReadyNotify()
etcd2 := etcdutil.MustAddEtcdMember(t, &cfg1, client)
cfg2 := etcd2.Config()
etcd3 := etcdutil.MustAddEtcdMember(t, &cfg2, client)

resp2, err := client.MemberList(context.Background())
re.NoError(err)
Expand All @@ -237,24 +187,11 @@ func TestExitWatch(t *testing.T) {

func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client) func()) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client1, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)
client2, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls)
re.NoError(err)

<-etcd.Server.ReadyNotify()
defer client2.Close()

leadership1 := NewLeadership(client1, leaderKey, "test_leader_1")
leadership2 := NewLeadership(client2, leaderKey, "test_leader_2")
Expand All @@ -268,7 +205,7 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe
done <- struct{}{}
}()

cleanFunc := injectFunc(etcd, client2)
cleanFunc := injectFunc(servers[0], client2)
defer cleanFunc()

testutil.Eventually(re, func() bool {
Expand All @@ -283,33 +220,14 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe

func TestRequestProgress(t *testing.T) {
checkWatcherRequestProgress := func(injectWatchChanBlock bool) {
tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests")
defer os.RemoveAll(tempStdoutFile.Name())
logCfg := &log.Config{}
logCfg.File.Filename = tempStdoutFile.Name()
logCfg.Level = "debug"
lg, p, _ := log.InitLogger(logCfg)
log.ReplaceGlobals(lg, p)

re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
fname := testutil.InitTempFileLogger("debug")
defer os.RemoveAll(fname)
servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls)
re.NoError(err)

ep := cfg.LCUrls[0].String()
client1, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)
client2, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

<-etcd.Server.ReadyNotify()
defer client2.Close()

leaderKey := "/test_leader"
leadership1 := NewLeadership(client1, leaderKey, "test_leader_1")
Expand All @@ -328,14 +246,14 @@ func TestRequestProgress(t *testing.T) {
if injectWatchChanBlock {
failpoint.Enable("github.com/tikv/pd/pkg/election/watchChanBlock", "return(true)")
testutil.Eventually(re, func() bool {
b, _ := os.ReadFile(tempStdoutFile.Name())
b, _ := os.ReadFile(fname)
l := string(b)
return strings.Contains(l, "watch channel is blocked for a long time")
})
failpoint.Disable("github.com/tikv/pd/pkg/election/watchChanBlock")
} else {
testutil.Eventually(re, func() bool {
b, _ := os.ReadFile(tempStdoutFile.Name())
b, _ := os.ReadFile(fname)
l := string(b)
return strings.Contains(l, "watcher receives progress notify in watch loop")
})
Expand Down
33 changes: 4 additions & 29 deletions pkg/election/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,12 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)

func TestLease(t *testing.T) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

<-etcd.Server.ReadyNotify()
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()

// Create the lease.
lease1 := &lease{
Expand Down Expand Up @@ -104,20 +91,8 @@ func TestLease(t *testing.T) {

func TestLeaseKeepAlive(t *testing.T) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

<-etcd.Server.ReadyNotify()
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()

// Create the lease.
lease := &lease{
Expand Down
Loading

0 comments on commit 52c21bf

Please sign in to comment.