From 6554482b2ef75849d45ec7b0e73d35eb5a0726d5 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Fri, 12 May 2023 13:04:21 +0800 Subject: [PATCH] Provide GetMinTS API to solve the compatibility issue brought by multi-timeline tso (#6421) ref tikv/pd#6142 1. Import kvproto change to introduce GetMinTS rpc in the TSO service. 6. Add server side implementation for GetMinTS rpc. 7. Add client side implementation for GetMinTS rpc. 8. Add unit test Signed-off-by: Bin Shi --- client/client.go | 53 +++++--- client/client_test.go | 13 +- client/errs/errno.go | 1 + client/go.mod | 2 +- client/go.sum | 4 +- client/tso_client.go | 127 +++++++++++++++++- client/tso_dispatcher.go | 21 +-- client/tso_service_discovery.go | 8 +- client/tsoutil/tsoutil.go | 46 +++++++ errors.toml | 5 + go.mod | 2 +- go.sum | 4 +- pkg/errs/errno.go | 1 + pkg/mcs/tso/server/grpc_service.go | 51 ++++++- pkg/tso/allocator_manager.go | 2 +- pkg/tso/keyspace_group_manager.go | 46 +++++++ tests/integrations/client/go.mod | 2 +- tests/integrations/client/go.sum | 4 +- tests/integrations/mcs/go.mod | 2 +- tests/integrations/mcs/go.sum | 4 +- .../mcs/tso/keyspace_group_manager_test.go | 16 ++- tests/integrations/tso/client_test.go | 50 +++++++ tests/integrations/tso/go.mod | 2 +- tests/integrations/tso/go.sum | 4 +- tools/pd-tso-bench/go.sum | 4 +- 25 files changed, 402 insertions(+), 72 deletions(-) create mode 100644 client/tsoutil/tsoutil.go diff --git a/client/client.go b/client/client.go index d9cb0358f3f..e1498a49899 100644 --- a/client/client.go +++ b/client/client.go @@ -252,20 +252,20 @@ type serviceModeKeeper struct { // triggering service mode switching concurrently. sync.RWMutex serviceMode pdpb.ServiceMode - tsoClient atomic.Value // *tsoClient + tsoClient *tsoClient tsoSvcDiscovery ServiceDiscovery } -func (smk *serviceModeKeeper) close() { - smk.Lock() - defer smk.Unlock() - switch smk.serviceMode { +func (k *serviceModeKeeper) close() { + k.Lock() + defer k.Unlock() + switch k.serviceMode { case pdpb.ServiceMode_API_SVC_MODE: - smk.tsoSvcDiscovery.Close() + k.tsoSvcDiscovery.Close() fallthrough case pdpb.ServiceMode_PD_SVC_MODE: - if tsoCli := smk.tsoClient.Load(); tsoCli != nil { - tsoCli.(*tsoClient).Close() + if k.tsoClient != nil { + k.tsoClient.Close() } case pdpb.ServiceMode_UNKNOWN_SVC_MODE: } @@ -486,8 +486,8 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { } newTSOCli.Setup() // Replace the old TSO client. - oldTSOClient := c.getTSOClient() - c.tsoClient.Store(newTSOCli) + oldTSOClient := c.tsoClient + c.tsoClient = newTSOCli oldTSOClient.Close() // Replace the old TSO service discovery if needed. oldTSOSvcDiscovery := c.tsoSvcDiscovery @@ -506,11 +506,10 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { zap.String("new-mode", newMode.String())) } -func (c *client) getTSOClient() *tsoClient { - if tsoCli := c.tsoClient.Load(); tsoCli != nil { - return tsoCli.(*tsoClient) - } - return nil +func (c *client) getServiceClientProxy() (*tsoClient, pdpb.ServiceMode) { + c.RLock() + defer c.RUnlock() + return c.tsoClient, c.serviceMode } func (c *client) scheduleUpdateTokenConnection() { @@ -675,7 +674,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur req := tsoReqPool.Get().(*tsoRequest) req.requestCtx = ctx req.clientCtx = c.ctx - tsoClient := c.getTSOClient() + tsoClient, _ := c.getServiceClientProxy() req.start = time.Now() req.dcLocation = dcLocation @@ -704,6 +703,26 @@ func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical in return resp.Wait() } +func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) { + tsoClient, serviceMode := c.getServiceClientProxy() + if tsoClient == nil { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("tso client is nil") + } + + switch serviceMode { + case pdpb.ServiceMode_UNKNOWN_SVC_MODE: + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode") + case pdpb.ServiceMode_PD_SVC_MODE: + // If the service mode is switched to API during GetTS() call, which happens during migration, + // returning the default timeline should be fine. + return c.GetTS(ctx) + case pdpb.ServiceMode_API_SVC_MODE: + return tsoClient.getMinTS(ctx) + default: + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode") + } +} + func handleRegionResponse(res *pdpb.GetRegionResponse) *Region { if res.Region == nil { return nil @@ -1395,7 +1414,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e // GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map // For test only. func (c *client) GetTSOAllocators() *sync.Map { - tsoClient := c.getTSOClient() + tsoClient, _ := c.getServiceClientProxy() if tsoClient == nil { return nil } diff --git a/client/client_test.go b/client/client_test.go index 5f6a0b89b42..e82fe861a0e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/client/testutil" "github.com/tikv/pd/client/tlsutil" + "github.com/tikv/pd/client/tsoutil" "go.uber.org/goleak" "google.golang.org/grpc" ) @@ -32,13 +33,13 @@ func TestMain(m *testing.M) { goleak.VerifyTestMain(m, testutil.LeakOptions...) } -func TestTsLessEqual(t *testing.T) { +func TestTSLessEqual(t *testing.T) { re := require.New(t) - re.True(tsLessEqual(9, 9, 9, 9)) - re.True(tsLessEqual(8, 9, 9, 8)) - re.False(tsLessEqual(9, 8, 8, 9)) - re.False(tsLessEqual(9, 8, 9, 6)) - re.True(tsLessEqual(9, 6, 9, 8)) + re.True(tsoutil.TSLessEqual(9, 9, 9, 9)) + re.True(tsoutil.TSLessEqual(8, 9, 9, 8)) + re.False(tsoutil.TSLessEqual(9, 8, 8, 9)) + re.False(tsoutil.TSLessEqual(9, 8, 9, 6)) + re.True(tsoutil.TSLessEqual(9, 6, 9, 8)) } func TestUpdateURLs(t *testing.T) { diff --git a/client/errs/errno.go b/client/errs/errno.go index e4bb7a21a9b..73bbd41541e 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -43,6 +43,7 @@ var ( ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed")) ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout")) ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO")) + ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO")) ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader")) ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember")) ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo")) diff --git a/client/go.mod b/client/go.mod index aa19f4f3e6a..1bb3a49045f 100644 --- a/client/go.mod +++ b/client/go.mod @@ -8,7 +8,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be + github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.11.1 github.com/stretchr/testify v1.8.2 diff --git a/client/go.sum b/client/go.sum index f06d1ee10da..1cfc3e28631 100644 --- a/client/go.sum +++ b/client/go.sum @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/client/tso_client.go b/client/tso_client.go index 7585fdc34f6..9aae31bba5a 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -22,8 +22,11 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -31,14 +34,17 @@ import ( // TSOClient is the client used to get timestamps. type TSOClient interface { - // GetTS gets a timestamp from PD. + // GetTS gets a timestamp from PD or TSO microservice. GetTS(ctx context.Context) (int64, int64, error) - // GetTSAsync gets a timestamp from PD, without block the caller. + // GetTSAsync gets a timestamp from PD or TSO microservice, without block the caller. GetTSAsync(ctx context.Context) TSFuture - // GetLocalTS gets a local timestamp from PD. + // GetLocalTS gets a local timestamp from PD or TSO microservice. GetLocalTS(ctx context.Context, dcLocation string) (int64, int64, error) - // GetLocalTSAsync gets a local timestamp from PD, without block the caller. + // GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller. GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture + // GetMinTS gets a timestamp from PD or the minimal timestamp across all keyspace groups from + // the TSO microservice. + GetMinTS(ctx context.Context) (int64, int64, error) } type tsoRequest struct { @@ -275,3 +281,116 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) { } return nil, "" } + +// getMinTS gets a timestamp from PD or the minimal timestamp across all keyspace groups from the TSO microservice. +func (c *tsoClient) getMinTS(ctx context.Context) (physical, logical int64, err error) { + // Immediately refresh the TSO server/pod list + addrs, err := c.svcDiscovery.DiscoverMicroservice(tsoService) + if err != nil { + return 0, 0, errs.ErrClientGetMinTSO.Wrap(err).GenWithStackByCause() + } + if len(addrs) == 0 { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("no tso servers/pods discovered") + } + + // Get the minimal timestamp from the TSO servers/pods + var mutex sync.Mutex + resps := make([]*tsopb.GetMinTSResponse, 0) + wg := sync.WaitGroup{} + wg.Add(len(addrs)) + for _, addr := range addrs { + go func(addr string) { + defer wg.Done() + resp, err := c.getMinTSFromSingleServer(ctx, addr, c.option.timeout) + if err != nil || resp == nil { + log.Warn("[tso] failed to get min ts from tso server", + zap.String("address", addr), zap.Error(err)) + return + } + mutex.Lock() + defer mutex.Unlock() + resps = append(resps, resp) + }(addr) + } + wg.Wait() + + // Check the results. The returned minimal timestamp is valid if all the conditions are met: + // 1. The number of responses is equal to the number of TSO servers/pods. + // 2. The number of keyspace groups asked is equal to the number of TSO servers/pods. + // 3. The minimal timestamp is not zero. + var ( + minTS *pdpb.Timestamp + keyspaceGroupsAsked uint32 + ) + if len(resps) == 0 { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("none of tso server/pod responded") + } + emptyTS := &pdpb.Timestamp{} + keyspaceGroupsTotal := resps[0].KeyspaceGroupsTotal + for _, resp := range resps { + if resp.KeyspaceGroupsTotal == 0 { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service has no keyspace group") + } + if resp.KeyspaceGroupsTotal != keyspaceGroupsTotal { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs( + "the tso service has inconsistent keyspace group total count") + } + keyspaceGroupsAsked += resp.KeyspaceGroupsServing + if tsoutil.CompareTimestamp(resp.Timestamp, emptyTS) > 0 && + (minTS == nil || tsoutil.CompareTimestamp(resp.Timestamp, minTS) < 0) { + minTS = resp.Timestamp + } + } + + if keyspaceGroupsAsked != keyspaceGroupsTotal { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs( + fmt.Sprintf("can't query all the tso keyspace groups. Asked %d, expected %d", + keyspaceGroupsAsked, keyspaceGroupsTotal)) + } + + if minTS == nil { + return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("the tso service is not ready") + } + + return minTS.Physical, tsoutil.AddLogical(minTS.Logical, 0, minTS.SuffixBits), nil +} + +func (c *tsoClient) getMinTSFromSingleServer( + ctx context.Context, tsoSrvAddr string, timeout time.Duration, +) (*tsopb.GetMinTSResponse, error) { + cc, err := c.svcDiscovery.GetOrCreateGRPCConn(tsoSrvAddr) + if err != nil { + return nil, errs.ErrClientGetMinTSO.FastGenByArgs( + fmt.Sprintf("can't connect to tso server %s", tsoSrvAddr)) + } + + cctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + resp, err := tsopb.NewTSOClient(cc).GetMinTS( + cctx, &tsopb.GetMinTSRequest{ + Header: &tsopb.RequestHeader{ + ClusterId: c.svcDiscovery.GetClusterID(), + KeyspaceId: c.svcDiscovery.GetKeyspaceID(), + KeyspaceGroupId: c.svcDiscovery.GetKeyspaceGroupID(), + }, + DcLocation: globalDCLocation, + }) + if err != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", + err, cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() + } + if resp == nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", + "no min ts info collected", cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() + } + if resp.GetHeader().GetError() != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", + resp.GetHeader().GetError().String(), cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMinTSO.Wrap(attachErr).GenWithStackByCause() + } + + return resp, nil +} diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 04d2ea41235..d2d62814619 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -715,19 +716,14 @@ func (c *tsoClient) processRequests( return err } // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. - firstLogical := addLogical(logical, -count+1, suffixBits) + firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count) c.finishRequest(requests, physical, firstLogical, suffixBits, nil) return nil } -// Because of the suffix, we need to shift the count before we add it to the logical part. -func addLogical(logical, count int64, suffixBits uint32) int64 { - return logical + count<= len(t.addrs) } func (t *tsoServerDiscovery) resetFailure() { @@ -414,8 +415,9 @@ func (c *tsoServiceDiscovery) updateMember() error { } keyspaceGroup, err := c.findGroupByKeyspaceID(c.keyspaceID, tsoServerAddr, updateMemberTimeout) if err != nil { - c.tsoServerDiscovery.countFailure() - log.Error("[tso] failed to find the keyspace group", errs.ZapError(err)) + if c.tsoServerDiscovery.countFailure() { + log.Error("[tso] failed to find the keyspace group", errs.ZapError(err)) + } return err } c.tsoServerDiscovery.resetFailure() diff --git a/client/tsoutil/tsoutil.go b/client/tsoutil/tsoutil.go new file mode 100644 index 00000000000..ffc449640ac --- /dev/null +++ b/client/tsoutil/tsoutil.go @@ -0,0 +1,46 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsoutil + +import ( + "github.com/pingcap/kvproto/pkg/pdpb" +) + +// AddLogical shifts the count before we add it to the logical part. +func AddLogical(logical, count int64, suffixBits uint32) int64 { + return logical + count< tsoTwo, returns 1. +// If tsoOne = tsoTwo, returns 0. +// If tsoOne < tsoTwo, returns -1. +func CompareTimestamp(tsoOne, tsoTwo *pdpb.Timestamp) int { + if tsoOne.GetPhysical() > tsoTwo.GetPhysical() || (tsoOne.GetPhysical() == tsoTwo.GetPhysical() && tsoOne.GetLogical() > tsoTwo.GetLogical()) { + return 1 + } + if tsoOne.GetPhysical() == tsoTwo.GetPhysical() && tsoOne.GetLogical() == tsoTwo.GetLogical() { + return 0 + } + return -1 +} diff --git a/errors.toml b/errors.toml index d425288d955..540ed5c3e13 100644 --- a/errors.toml +++ b/errors.toml @@ -746,6 +746,11 @@ error = ''' get local allocator failed, %s ''' +["PD:tso:ErrGetMinTS"] +error = ''' +get min ts failed, %s +''' + ["PD:tso:ErrKeyspaceGroupIDInvalid"] error = ''' the keyspace group id is invalid, %s diff --git a/go.mod b/go.mod index 92bf0fb4f0f..8413fd77b47 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be + github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 diff --git a/go.sum b/go.sum index 56e556f7789..1c4b9276aa5 100644 --- a/go.sum +++ b/go.sum @@ -422,8 +422,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 37524798046..8e9fb83de09 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -54,6 +54,7 @@ var ( ErrLoadKeyspaceGroupsRetryExhausted = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("PD:tso:ErrLoadKeyspaceGroupsRetryExhausted")) ErrKeyspaceGroupNotInitialized = errors.Normalize("the keyspace group %d isn't initialized", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupNotInitialized")) ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned")) + ErrGetMinTS = errors.Normalize("get min ts failed, %s", errors.RFCCodeText("PD:tso:ErrGetMinTS")) ) // member errors diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index 8b5765b1875..dd0a96b1cba 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -36,7 +36,8 @@ import ( // gRPC errors var ( - ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched") ) var _ tsopb.TSOServer = (*Service)(nil) @@ -157,6 +158,13 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { func (s *Service) FindGroupByKeyspaceID( ctx context.Context, request *tsopb.FindGroupByKeyspaceIDRequest, ) (*tsopb.FindGroupByKeyspaceIDResponse, error) { + respKeyspaceGroup := request.GetHeader().GetKeyspaceGroupId() + if errorType, err := s.validRequest(request.GetHeader()); err != nil { + return &tsopb.FindGroupByKeyspaceIDResponse{ + Header: s.wrapErrorToHeader(errorType, err.Error(), respKeyspaceGroup), + }, nil + } + keyspaceID := request.GetKeyspaceId() am, keyspaceGroup, keyspaceGroupID, err := s.keyspaceGroupManager.FindGroupByKeyspaceID(keyspaceID) if err != nil { @@ -199,6 +207,47 @@ func (s *Service) FindGroupByKeyspaceID( }, nil } +// GetMinTS gets the minimum timestamp across all keyspace groups served by the TSO server +// who receives and handles the request. +func (s *Service) GetMinTS( + ctx context.Context, request *tsopb.GetMinTSRequest, +) (*tsopb.GetMinTSResponse, error) { + respKeyspaceGroup := request.GetHeader().GetKeyspaceGroupId() + if errorType, err := s.validRequest(request.GetHeader()); err != nil { + return &tsopb.GetMinTSResponse{ + Header: s.wrapErrorToHeader(errorType, err.Error(), respKeyspaceGroup), + }, nil + } + + minTS, kgAskedCount, kgTotalCount, err := s.keyspaceGroupManager.GetMinTS(request.GetDcLocation()) + if err != nil { + return &tsopb.GetMinTSResponse{ + Header: s.wrapErrorToHeader( + tsopb.ErrorType_UNKNOWN, err.Error(), respKeyspaceGroup), + Timestamp: &minTS, + KeyspaceGroupsServing: kgAskedCount, + KeyspaceGroupsTotal: kgTotalCount, + }, nil + } + + return &tsopb.GetMinTSResponse{ + Header: s.header(respKeyspaceGroup), + Timestamp: &minTS, + KeyspaceGroupsServing: kgAskedCount, + KeyspaceGroupsTotal: kgTotalCount, + }, nil +} + +func (s *Service) validRequest(header *tsopb.RequestHeader) (tsopb.ErrorType, error) { + if s.IsClosed() || s.keyspaceGroupManager == nil { + return tsopb.ErrorType_NOT_BOOTSTRAPPED, ErrNotStarted + } + if header == nil || header.GetClusterId() != s.clusterID { + return tsopb.ErrorType_CLUSTER_MISMATCHED, ErrClusterMismatched + } + return tsopb.ErrorType_OK, nil +} + func (s *Service) header(keyspaceGroupBelongTo uint32) *tsopb.ResponseHeader { if s.clusterID == 0 { return s.wrapErrorToHeader( diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 9141e85af19..a780e7da74e 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -1070,7 +1070,7 @@ func (am *AllocatorManager) deleteAllocatorGroup(dcLocation string) { // HandleRequest forwards TSO allocation requests to correct TSO Allocators. func (am *AllocatorManager) HandleRequest(dcLocation string, count uint32) (pdpb.Timestamp, error) { - if dcLocation == "" { + if len(dcLocation) == 0 { dcLocation = GlobalDCLocation } allocatorGroup, exist := am.getAllocatorGroup(dcLocation) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 732b3954797..7038ef8e373 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -688,6 +688,52 @@ func (kgm *KeyspaceGroupManager) checkKeySpaceGroupID(id uint32) error { fmt.Sprintf("%d shouldn't >= %d", id, mcsutils.MaxKeyspaceGroupCountInUse)) } +// GetMinTS returns the minimum timestamp across all keyspace groups served by this TSO server/pod. +func (kgm *KeyspaceGroupManager) GetMinTS( + dcLocation string, +) (_ pdpb.Timestamp, kgAskedCount, kgTotalCount uint32, err error) { + kgm.RLock() + defer kgm.RUnlock() + + var minTS *pdpb.Timestamp + for i, am := range kgm.ams { + if kgm.kgs[i] != nil { + kgTotalCount++ + } + // If any keyspace group hasn't elected primary, we can't know its current timestamp of + // the group, so as to the min ts across all keyspace groups. Return error in this case. + if am != nil && !am.member.IsLeaderElected() { + return pdpb.Timestamp{}, kgAskedCount, kgTotalCount, errs.ErrGetMinTS.FastGenByArgs("leader is not elected") + } + // Skip the keyspace groups that are not served by this TSO Server/Pod. + if am == nil || !am.IsLeader() { + continue + } + kgAskedCount++ + // Skip the keyspace groups that are split targets, because they always have newer + // time lines than the existing split sources thus won't contribute to the min ts. + if kgm.kgs[i] != nil && kgm.kgs[i].IsSplitTarget() { + continue + } + ts, err := am.HandleRequest(dcLocation, 1) + if err != nil { + return pdpb.Timestamp{}, kgAskedCount, kgTotalCount, err + } + if minTS == nil || tsoutil.CompareTimestamp(&ts, minTS) < 0 { + minTS = &ts + } + } + + if minTS == nil { + // This TSO server/pod is not serving any keyspace group, return an empty timestamp, + // and the client needs to skip the empty timestamps when collecting the min timestamp + // from all TSO servers/pods. + return pdpb.Timestamp{}, kgAskedCount, kgTotalCount, nil + } + + return *minTS, kgAskedCount, kgTotalCount, nil +} + func genNotServedErr(perr *perrors.Error, keyspaceGroupID uint32) error { return perr.FastGenByArgs( fmt.Sprintf( diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index 98f6b733717..c0c409d24c9 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be + github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 697e6e4ceb1..056801db320 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -385,8 +385,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index db801e1ad63..aae3bdeb93a 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -12,7 +12,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be + github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.2 diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index 54afef52efe..68c319e0b94 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -385,8 +385,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 31286c8e190..799fccd42e4 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -98,20 +98,22 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp // a keyspace group before, will be served by the default keyspace group. re := suite.Require() testutil.Eventually(re, func() bool { - for _, server := range suite.tsoCluster.GetServers() { - allServed := true - for _, keyspaceID := range []uint32{0, 1, 2} { + for _, keyspaceID := range []uint32{0, 1, 2} { + served := false + for _, server := range suite.tsoCluster.GetServers() { if server.IsKeyspaceServing(keyspaceID, mcsutils.DefaultKeyspaceGroupID) { tam, err := server.GetTSOAllocatorManager(mcsutils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(tam) - } else { - allServed = false + served = true + break } } - return allServed + if !served { + return false + } } - return false + return true }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) // Any keyspace that was assigned to a keyspace group before, except default keyspace, diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 7b787d3c16f..204534e7ffa 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -137,6 +137,24 @@ func (suite *tsoClientTestSuite) SetupSuite() { suite.keyspaceIDs = append(suite.keyspaceIDs, keyspaceGroup.keyspaceIDs...) } + // Make sure all keyspace groups are available. + testutil.Eventually(re, func() bool { + for _, keyspaceID := range suite.keyspaceIDs { + served := false + for _, server := range suite.tsoCluster.GetServers() { + if server.IsKeyspaceServing(keyspaceID, mcsutils.DefaultKeyspaceGroupID) { + served = true + break + } + } + if !served { + return false + } + } + return true + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + // Create clients and make sure they all have discovered the tso service. suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) re.Equal(len(suite.keyspaceIDs), len(suite.clients)) @@ -225,6 +243,38 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { wg.Wait() } +// TestGetMinTS tests the correctness of GetMinTS. +func (suite *tsoClientTestSuite) TestGetMinTS() { + var wg sync.WaitGroup + wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + for _, client := range suite.clients { + go func(client pd.Client) { + defer wg.Done() + var lastMinTS uint64 + for j := 0; j < tsoRequestRound; j++ { + physical, logical, err := client.GetMinTS(suite.ctx) + suite.NoError(err) + minTS := tsoutil.ComposeTS(physical, logical) + suite.Less(lastMinTS, minTS) + lastMinTS = minTS + + // Now we check whether the returned ts is the minimum one + // among all keyspace groups, i.e., the returned ts is + // less than the new timestamps of all keyspace groups. + for _, client := range suite.clients { + physical, logical, err := client.GetTS(suite.ctx) + suite.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + suite.Less(minTS, ts) + } + } + }(client) + } + } + wg.Wait() +} + // More details can be found in this issue: https://github.com/tikv/pd/issues/4884 func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { re := suite.Require() diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index 401d05bb151..910be73dea0 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be + github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 8b573f3ed9e..cc3d7e6bfd2 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -383,8 +383,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tools/pd-tso-bench/go.sum b/tools/pd-tso-bench/go.sum index ab85155db70..f4e31d3679a 100644 --- a/tools/pd-tso-bench/go.sum +++ b/tools/pd-tso-bench/go.sum @@ -851,8 +851,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E= -github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4= +github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=