diff --git a/server/gc/safepoint.go b/server/gc/safepoint.go new file mode 100644 index 00000000000..3cec08d8951 --- /dev/null +++ b/server/gc/safepoint.go @@ -0,0 +1,64 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import ( + "github.com/tikv/pd/pkg/syncutil" + "github.com/tikv/pd/server/storage/endpoint" +) + +// SafePointManager is the manager for safePoint of GC and services +type SafePointManager struct { + *gcSafePointManager + // TODO add ServiceSafepointManager +} + +// NewSafepointManager creates a SafePointManager of GC and services +func NewSafepointManager(store endpoint.GCSafePointStorage) *SafePointManager { + return &SafePointManager{ + newGCSafePointManager(store), + } +} + +type gcSafePointManager struct { + syncutil.Mutex + store endpoint.GCSafePointStorage +} + +func newGCSafePointManager(store endpoint.GCSafePointStorage) *gcSafePointManager { + return &gcSafePointManager{store: store} +} + +// LoadGCSafePoint loads current GC safe point from storage. +func (manager *gcSafePointManager) LoadGCSafePoint() (uint64, error) { + return manager.store.LoadGCSafePoint() +} + +// UpdateGCSafePoint updates the safepoint if it is greater than the previous one +// it returns the old safepoint in the storage. +func (manager *gcSafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafePoint uint64, err error) { + manager.Lock() + defer manager.Unlock() + // TODO: cache the safepoint in the storage. + oldSafePoint, err = manager.store.LoadGCSafePoint() + if err != nil { + return + } + if oldSafePoint >= newSafePoint { + return + } + err = manager.store.SaveGCSafePoint(newSafePoint) + return +} diff --git a/server/gc/safepoint_test.go b/server/gc/safepoint_test.go new file mode 100644 index 00000000000..2af82ba7145 --- /dev/null +++ b/server/gc/safepoint_test.go @@ -0,0 +1,80 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tikv/pd/server/storage/endpoint" + "github.com/tikv/pd/server/storage/kv" +) + +func newGCStorage() endpoint.GCSafePointStorage { + return endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) +} + +func TestGCSafePointUpdateSequentially(t *testing.T) { + gcSafePointManager := newGCSafePointManager(newGCStorage()) + re := require.New(t) + curSafePoint := uint64(0) + // update gc safePoint with asc value. + for id := 10; id < 20; id++ { + safePoint, err := gcSafePointManager.LoadGCSafePoint() + re.NoError(err) + re.Equal(curSafePoint, safePoint) + previousSafePoint := curSafePoint + curSafePoint = uint64(id) + oldSafePoint, err := gcSafePointManager.UpdateGCSafePoint(curSafePoint) + re.NoError(err) + re.Equal(previousSafePoint, oldSafePoint) + } + + safePoint, err := gcSafePointManager.LoadGCSafePoint() + re.NoError(err) + re.Equal(curSafePoint, safePoint) + // update with smaller value should be failed. + oldSafePoint, err := gcSafePointManager.UpdateGCSafePoint(safePoint - 5) + re.NoError(err) + re.Equal(safePoint, oldSafePoint) + curSafePoint, err = gcSafePointManager.LoadGCSafePoint() + re.NoError(err) + // current safePoint should not change since the update value was smaller + re.Equal(safePoint, curSafePoint) +} + +func TestGCSafePointUpdateCurrently(t *testing.T) { + gcSafePointManager := newGCSafePointManager(newGCStorage()) + maxSafePoint := uint64(1000) + wg := sync.WaitGroup{} + re := require.New(t) + + // update gc safePoint concurrently + for id := 0; id < 20; id++ { + wg.Add(1) + go func(step uint64) { + for safePoint := step; safePoint <= maxSafePoint; safePoint += step { + _, err := gcSafePointManager.UpdateGCSafePoint(safePoint) + re.NoError(err) + } + wg.Done() + }(uint64(id + 1)) + } + wg.Wait() + safePoint, err := gcSafePointManager.LoadGCSafePoint() + re.NoError(err) + re.Equal(maxSafePoint, safePoint) +} diff --git a/server/grpc_service.go b/server/grpc_service.go index d487f059ec5..53b74ba517d 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1295,8 +1295,7 @@ func (s *GrpcServer) GetGCSafePoint(ctx context.Context, request *pdpb.GetGCSafe return &pdpb.GetGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil } - var storage endpoint.GCSafePointStorage = s.storage - safePoint, err := storage.LoadGCSafePoint() + safePoint, err := s.gcSafePointManager.LoadGCSafePoint() if err != nil { return nil, err } @@ -1335,19 +1334,13 @@ func (s *GrpcServer) UpdateGCSafePoint(ctx context.Context, request *pdpb.Update return &pdpb.UpdateGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil } - var storage endpoint.GCSafePointStorage = s.storage - oldSafePoint, err := storage.LoadGCSafePoint() + newSafePoint := request.GetSafePoint() + oldSafePoint, err := s.gcSafePointManager.UpdateGCSafePoint(newSafePoint) if err != nil { return nil, err } - newSafePoint := request.SafePoint - - // Only save the safe point if it's greater than the previous one if newSafePoint > oldSafePoint { - if err := storage.SaveGCSafePoint(newSafePoint); err != nil { - return nil, err - } log.Info("updated gc safe point", zap.Uint64("safe-point", newSafePoint)) } else if newSafePoint < oldSafePoint { diff --git a/server/server.go b/server/server.go index bd13193532f..c27941f7c85 100644 --- a/server/server.go +++ b/server/server.go @@ -52,6 +52,7 @@ import ( "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/encryptionkm" + "github.com/tikv/pd/server/gc" "github.com/tikv/pd/server/id" "github.com/tikv/pd/server/member" syncer "github.com/tikv/pd/server/region_syncer" @@ -130,6 +131,8 @@ type Server struct { encryptionKeyManager *encryptionkm.KeyManager // for storage operation. storage storage.Storage + // safepoint manager + gcSafePointManager *gc.SafePointManager // for basicCluster operation. basicCluster *core.BasicCluster // for tso. @@ -410,6 +413,7 @@ func (s *Server) startServer(ctx context.Context) error { } defaultStorage := storage.NewStorageWithEtcdBackend(s.client, s.rootPath) s.storage = storage.NewCoreStorage(defaultStorage, regionStorage) + s.gcSafePointManager = gc.NewSafepointManager(s.storage) s.basicCluster = core.NewBasicCluster() s.cluster = cluster.NewRaftCluster(ctx, s.clusterID, syncer.NewRegionSyncer(s), s.client, s.httpClient) s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, s.cluster)