Skip to content

Commit

Permalink
server/grpc_service: make update gc_safepoint concurrently safe (#5070)
Browse files Browse the repository at this point in the history
close #5018

Signed-off-by: shirly <[email protected]>

Co-authored-by: buffer <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
3 people authored Jun 7, 2022
1 parent 36db3c7 commit 12a9513
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 10 deletions.
64 changes: 64 additions & 0 deletions server/gc/safepoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gc

import (
"github.com/tikv/pd/pkg/syncutil"
"github.com/tikv/pd/server/storage/endpoint"
)

// SafePointManager is the manager for safePoint of GC and services
type SafePointManager struct {
*gcSafePointManager
// TODO add ServiceSafepointManager
}

// NewSafepointManager creates a SafePointManager of GC and services
func NewSafepointManager(store endpoint.GCSafePointStorage) *SafePointManager {
return &SafePointManager{
newGCSafePointManager(store),
}
}

type gcSafePointManager struct {
syncutil.Mutex
store endpoint.GCSafePointStorage
}

func newGCSafePointManager(store endpoint.GCSafePointStorage) *gcSafePointManager {
return &gcSafePointManager{store: store}
}

// LoadGCSafePoint loads current GC safe point from storage.
func (manager *gcSafePointManager) LoadGCSafePoint() (uint64, error) {
return manager.store.LoadGCSafePoint()
}

// UpdateGCSafePoint updates the safepoint if it is greater than the previous one
// it returns the old safepoint in the storage.
func (manager *gcSafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafePoint uint64, err error) {
manager.Lock()
defer manager.Unlock()
// TODO: cache the safepoint in the storage.
oldSafePoint, err = manager.store.LoadGCSafePoint()
if err != nil {
return
}
if oldSafePoint >= newSafePoint {
return
}
err = manager.store.SaveGCSafePoint(newSafePoint)
return
}
80 changes: 80 additions & 0 deletions server/gc/safepoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gc

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/server/storage/endpoint"
"github.com/tikv/pd/server/storage/kv"
)

func newGCStorage() endpoint.GCSafePointStorage {
return endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
}

func TestGCSafePointUpdateSequentially(t *testing.T) {
gcSafePointManager := newGCSafePointManager(newGCStorage())
re := require.New(t)
curSafePoint := uint64(0)
// update gc safePoint with asc value.
for id := 10; id < 20; id++ {
safePoint, err := gcSafePointManager.LoadGCSafePoint()
re.NoError(err)
re.Equal(curSafePoint, safePoint)
previousSafePoint := curSafePoint
curSafePoint = uint64(id)
oldSafePoint, err := gcSafePointManager.UpdateGCSafePoint(curSafePoint)
re.NoError(err)
re.Equal(previousSafePoint, oldSafePoint)
}

safePoint, err := gcSafePointManager.LoadGCSafePoint()
re.NoError(err)
re.Equal(curSafePoint, safePoint)
// update with smaller value should be failed.
oldSafePoint, err := gcSafePointManager.UpdateGCSafePoint(safePoint - 5)
re.NoError(err)
re.Equal(safePoint, oldSafePoint)
curSafePoint, err = gcSafePointManager.LoadGCSafePoint()
re.NoError(err)
// current safePoint should not change since the update value was smaller
re.Equal(safePoint, curSafePoint)
}

func TestGCSafePointUpdateCurrently(t *testing.T) {
gcSafePointManager := newGCSafePointManager(newGCStorage())
maxSafePoint := uint64(1000)
wg := sync.WaitGroup{}
re := require.New(t)

// update gc safePoint concurrently
for id := 0; id < 20; id++ {
wg.Add(1)
go func(step uint64) {
for safePoint := step; safePoint <= maxSafePoint; safePoint += step {
_, err := gcSafePointManager.UpdateGCSafePoint(safePoint)
re.NoError(err)
}
wg.Done()
}(uint64(id + 1))
}
wg.Wait()
safePoint, err := gcSafePointManager.LoadGCSafePoint()
re.NoError(err)
re.Equal(maxSafePoint, safePoint)
}
13 changes: 3 additions & 10 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,8 +1295,7 @@ func (s *GrpcServer) GetGCSafePoint(ctx context.Context, request *pdpb.GetGCSafe
return &pdpb.GetGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil
}

var storage endpoint.GCSafePointStorage = s.storage
safePoint, err := storage.LoadGCSafePoint()
safePoint, err := s.gcSafePointManager.LoadGCSafePoint()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1335,19 +1334,13 @@ func (s *GrpcServer) UpdateGCSafePoint(ctx context.Context, request *pdpb.Update
return &pdpb.UpdateGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil
}

var storage endpoint.GCSafePointStorage = s.storage
oldSafePoint, err := storage.LoadGCSafePoint()
newSafePoint := request.GetSafePoint()
oldSafePoint, err := s.gcSafePointManager.UpdateGCSafePoint(newSafePoint)
if err != nil {
return nil, err
}

newSafePoint := request.SafePoint

// Only save the safe point if it's greater than the previous one
if newSafePoint > oldSafePoint {
if err := storage.SaveGCSafePoint(newSafePoint); err != nil {
return nil, err
}
log.Info("updated gc safe point",
zap.Uint64("safe-point", newSafePoint))
} else if newSafePoint < oldSafePoint {
Expand Down
4 changes: 4 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/encryptionkm"
"github.com/tikv/pd/server/gc"
"github.com/tikv/pd/server/id"
"github.com/tikv/pd/server/member"
syncer "github.com/tikv/pd/server/region_syncer"
Expand Down Expand Up @@ -130,6 +131,8 @@ type Server struct {
encryptionKeyManager *encryptionkm.KeyManager
// for storage operation.
storage storage.Storage
// safepoint manager
gcSafePointManager *gc.SafePointManager
// for basicCluster operation.
basicCluster *core.BasicCluster
// for tso.
Expand Down Expand Up @@ -410,6 +413,7 @@ func (s *Server) startServer(ctx context.Context) error {
}
defaultStorage := storage.NewStorageWithEtcdBackend(s.client, s.rootPath)
s.storage = storage.NewCoreStorage(defaultStorage, regionStorage)
s.gcSafePointManager = gc.NewSafepointManager(s.storage)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, s.clusterID, syncer.NewRegionSyncer(s), s.client, s.httpClient)
s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, s.cluster)
Expand Down

0 comments on commit 12a9513

Please sign in to comment.