Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#5310
Browse files Browse the repository at this point in the history
close tikv#5161, ref tikv#5161

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
HuSharp authored and ti-chi-bot committed Jun 30, 2023
1 parent eeaf839 commit e90eb02
Show file tree
Hide file tree
Showing 9 changed files with 557 additions and 0 deletions.
456 changes: 456 additions & 0 deletions client/base_client.go

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis
return globalConfigWatcherCh, err
}

<<<<<<< HEAD
func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
protoClient := c.getClient()
if protoClient == nil {
Expand Down Expand Up @@ -1305,17 +1306,24 @@ func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) err
return nil
}

=======
>>>>>>> bde0a1b42 (*: put gRPC unknown error into the header. (#5310))
func (c *client) respForErr(observer prometheus.Observer, start time.Time, err error, header *pdpb.ResponseHeader) error {
if err != nil || header.GetError() != nil {
observer.Observe(time.Since(start).Seconds())
if err != nil {
<<<<<<< HEAD
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
=======
c.ScheduleCheckLeader()
>>>>>>> bde0a1b42 (*: put gRPC unknown error into the header. (#5310))
return errors.WithStack(err)
}
return errors.WithStack(errors.New(header.GetError().String()))
}
return nil
}
<<<<<<< HEAD

// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
// For test only.
Expand All @@ -1326,3 +1334,5 @@ func (c *client) GetTSOAllocators() *sync.Map {
}
return tsoClient.GetTSOAllocators()
}
=======
>>>>>>> bde0a1b42 (*: put gRPC unknown error into the header. (#5310))
1 change: 1 addition & 0 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package tso
import (
"context"
"fmt"
"github.com/pingcap/errors"
"math"
"path"
"strconv"
Expand Down
6 changes: 6 additions & 0 deletions pkg/tso/global_allocator.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"github.com/pingcap/errors"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -383,6 +384,11 @@ func (gta *GlobalTSOAllocator) SyncMaxTS(
errs.ZapError(errors.New(syncMaxTSResp.rpcRes.GetHeader().GetError().String())))
return
}
if syncMaxTSResp.rpcRes.GetHeader().GetError() != nil {
log.Error("sync max ts rpc failed, got an error", zap.String("local-allocator-leader-url", leaderConn.Target()),
errs.ZapError(errors.Errorf("%s", syncMaxTSResp.rpcRes.GetHeader().GetError().String())))
return
}
}(ctx, leaderConn, respCh)
}
wg.Wait()
Expand Down
6 changes: 6 additions & 0 deletions server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,18 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) {
if members.GetHeader().GetError() != nil {
return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String()))
}
<<<<<<< HEAD
dclocationDistribution := make(map[string][]uint64)
if !svr.IsAPIServiceMode() {
dclocationDistribution, err = svr.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd()
if err != nil {
return nil, errors.WithStack(err)
}
=======
dclocationDistribution, err := svr.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd()
if err != nil {
return nil, errors.WithStack(err)
>>>>>>> bde0a1b42 (*: put gRPC unknown error into the header. (#5310))
}
for _, m := range members.GetMembers() {
var e error
Expand Down
8 changes: 8 additions & 0 deletions server/grpc_service.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (s *GrpcServer) unaryMiddleware(ctx context.Context, req request, fn forwar
return nil, nil
}

<<<<<<< HEAD
// GetClusterInfo implements gRPC PDServer.
func (s *GrpcServer) GetClusterInfo(ctx context.Context, _ *pdpb.GetClusterInfoRequest) (*pdpb.GetClusterInfoResponse, error) {
// Here we purposely do not check the cluster ID because the client does not know the correct cluster ID
Expand All @@ -117,6 +118,13 @@ func (s *GrpcServer) GetClusterInfo(ctx context.Context, _ *pdpb.GetClusterInfoR
Header: s.header(),
ServiceModes: svcModes,
}, nil
=======
func (s *GrpcServer) wrapErrorToHeader(errorType pdpb.ErrorType, message string) *pdpb.ResponseHeader {
return s.errorHeader(&pdpb.Error{
Type: errorType,
Message: message,
})
>>>>>>> bde0a1b42 (*: put gRPC unknown error into the header. (#5310))
}

// GetMembers implements gRPC PDServer.
Expand Down
45 changes: 45 additions & 0 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,51 @@ func (suite *clientTestSuite) bootstrapServer(header *pdpb.RequestHeader, client
resp, err := client.Bootstrap(context.Background(), req)
suite.NoError(err)
suite.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType())
<<<<<<< HEAD:tests/integrations/client/client_test.go
=======
}

func (suite *clientTestSuite) TestNormalTSO() {
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
var lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
physical, logical, err := suite.client.GetTS(context.Background())
suite.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
suite.Less(lastTS, ts)
lastTS = ts
}
}()
}
wg.Wait()
}

func (suite *clientTestSuite) TestGetTSAsync() {
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
tsFutures := make([]pd.TSFuture, tsoRequestRound)
for i := range tsFutures {
tsFutures[i] = suite.client.GetTSAsync(context.Background())
}
var lastTS uint64 = math.MaxUint64
for i := len(tsFutures) - 1; i >= 0; i-- {
physical, logical, err := tsFutures[i].Wait()
suite.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
suite.Greater(lastTS, ts)
lastTS = ts
}
}()
}
wg.Wait()
>>>>>>> bde0a1b42 (*: put gRPC unknown error into the header. (#5310)):tests/client/client_test.go
}

func (suite *clientTestSuite) TestGetRegion() {
Expand Down
22 changes: 22 additions & 0 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func initClusterID(ctx context.Context, cli pdpb.PDClient) {
if res.GetHeader().GetError() != nil {
log.Fatal("failed to get members", zap.String("err", res.GetHeader().GetError().String()))
}
if res.GetHeader().GetError() != nil {
log.Fatal(res.GetHeader().GetError())
}
clusterID = res.GetHeader().GetClusterId()
log.Info("init cluster ID successfully", zap.Uint64("cluster-id", clusterID))
}
Expand Down Expand Up @@ -113,16 +116,26 @@ func bootstrap(ctx context.Context, cli pdpb.PDClient) {
Store: store,
Region: region,
}
<<<<<<< HEAD
cctx, cancel = context.WithCancel(ctx)
resp, err := cli.Bootstrap(cctx, req)
cancel()
=======
resp, err := cli.Bootstrap(context.TODO(), req)
>>>>>>> bde0a1b42 (*: put gRPC unknown error into the header. (#5310))
if err != nil {
log.Fatal("failed to bootstrap the cluster", zap.Error(err))
}
if resp.GetHeader().GetError() != nil {
<<<<<<< HEAD
log.Fatal("failed to bootstrap the cluster", zap.String("err", resp.GetHeader().GetError().String()))
}
log.Info("bootstrapped")
=======
log.Fatalf("bootstrap failed: %s", resp.GetHeader().GetError().String())
}
log.Println("bootstrapped")
>>>>>>> bde0a1b42 (*: put gRPC unknown error into the header. (#5310))
}

func putStores(ctx context.Context, cfg *config.Config, cli pdpb.PDClient, stores *Stores) {
Expand All @@ -132,13 +145,18 @@ func putStores(ctx context.Context, cfg *config.Config, cli pdpb.PDClient, store
Address: fmt.Sprintf("localhost:%d", i),
Version: "6.4.0-alpha",
}
<<<<<<< HEAD
cctx, cancel := context.WithCancel(ctx)
resp, err := cli.PutStore(cctx, &pdpb.PutStoreRequest{Header: header(), Store: store})
cancel()
=======
resp, err := cli.PutStore(context.TODO(), &pdpb.PutStoreRequest{Header: header(), Store: store})
>>>>>>> bde0a1b42 (*: put gRPC unknown error into the header. (#5310))
if err != nil {
log.Fatal("failed to put store", zap.Uint64("store-id", i), zap.Error(err))
}
if resp.GetHeader().GetError() != nil {
<<<<<<< HEAD
log.Fatal("failed to put store", zap.Uint64("store-id", i), zap.String("err", resp.GetHeader().GetError().String()))
}
go func(ctx context.Context, storeID uint64) {
Expand All @@ -153,6 +171,10 @@ func putStores(ctx context.Context, cfg *config.Config, cli pdpb.PDClient, store
}
}
}(ctx, i)
=======
log.Fatalf("put store failed: %s", resp.GetHeader().GetError().String())
}
>>>>>>> bde0a1b42 (*: put gRPC unknown error into the header. (#5310))
}
}

Expand Down
3 changes: 3 additions & 0 deletions tools/pd-simulator/simulator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,12 @@ func (c *client) Bootstrap(ctx context.Context, store *metapb.Store, region *met
if err != nil {
return err
}
<<<<<<< HEAD
newStore := typeutil.DeepClone(store, core.StoreFactory)
newRegion := typeutil.DeepClone(region, core.RegionFactory)

=======
>>>>>>> bde0a1b42 (*: put gRPC unknown error into the header. (#5310))
res, err := c.pdClient().Bootstrap(ctx, &pdpb.BootstrapRequest{
Header: c.requestHeader(),
Store: newStore,
Expand Down

0 comments on commit e90eb02

Please sign in to comment.