Skip to content

Commit

Permalink
restore: fix the bug that gc life time ttl does not take effect (ping…
Browse files Browse the repository at this point in the history
…cap#448)

* fix gc ttl loop

* resolve comment and add tests
  • Loading branch information
glorv authored Nov 9, 2020
1 parent 176d053 commit 25a3711
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 3 deletions.
12 changes: 11 additions & 1 deletion lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"database/sql"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/br/pkg/checksum"
Expand All @@ -27,7 +28,10 @@ import (

const (
preUpdateServiceSafePointFactor = 3
serviceSafePointTTL = 10 * 60 // 10 min in seconds
)

var (
serviceSafePointTTL int64 = 10 * 60 // 10 min in seconds
)

// RemoteChecksum represents a checksum result got from tidb.
Expand Down Expand Up @@ -316,9 +320,15 @@ type gcTTLManager struct {
tableGCSafeTS []*tableChecksumTS
currentTs uint64
serviceID string
// 0 for not start, otherwise started
started uint32
}

func (m *gcTTLManager) addOneJob(ctx context.Context, table string, ts uint64) error {
// start gc ttl loop if not started yet.
if atomic.CompareAndSwapUint32(&m.started, 0, 1) {
m.start(ctx)
}
m.lock.Lock()
defer m.lock.Unlock()
var curTs uint64
Expand Down
157 changes: 155 additions & 2 deletions lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,20 @@ import (
"context"
"database/sql"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/store/tikv/oracle"
tmock "github.com/pingcap/tidb/util/mock"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tipb/go-tipb"

pd "github.com/tikv/pd/client"

"github.com/DATA-DOG/go-sqlmock"
Expand Down Expand Up @@ -144,6 +155,33 @@ func (s *checksumSuite) TestIncreaseGCLifeTimeFail(c *C) {
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

func (s *checksumSuite) TestDoChecksumWithTikv(c *C) {
// set up mock tikv checksum manager
pdClient := &testPDClient{}
resp := tipb.ChecksumResponse{Checksum: 123, TotalKvs: 10, TotalBytes: 1000}
kvClient := &mockChecksumKVClient{checksum: resp, respDur: time.Second * 5}
checksumExec := &tikvChecksumManager{manager: gcTTLManager{pdClient: pdClient}, client: kvClient}

// mock a table info
p := parser.New()
se := tmock.NewContext()
node, err := p.ParseOneStmt("CREATE TABLE `t1` (`c1` varchar(5) NOT NULL)", "utf8mb4", "utf8mb4_bin")
c.Assert(err, IsNil)
tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), 999)
c.Assert(err, IsNil)

startTs := oracle.ComposeTS(time.Now().Unix()*1000, 0)
ctx := context.WithValue(context.Background(), &checksumManagerKey, checksumExec)
_, err = DoChecksum(ctx, nil, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
c.Assert(err, IsNil)

// after checksum, safepint should be small than start ts
ts := pdClient.currentSafePoint()
// 1ms for the schedule deviation
c.Assert(ts <= startTs+1, IsTrue)
c.Assert(atomic.LoadUint32(&checksumExec.manager.started) > 0, IsTrue)
}

func (s *checksumSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
Expand All @@ -165,15 +203,83 @@ func (s *checksumSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {
c.Assert(mock.ExpectationsWereMet(), IsNil)
}

type safePointTTL struct {
safePoint uint64
// ttl is the last timestamp this safe point is valid
ttl int64
}

type testPDClient struct {
sync.Mutex
pd.Client
count int32
gcSafePoint []safePointTTL
}

func (c *testPDClient) currentSafePoint() uint64 {
ts := time.Now().Unix()
c.Lock()
defer c.Unlock()
for _, s := range c.gcSafePoint {
if s.ttl > ts {
return s.safePoint
}
}
return 0
}

func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
return 0, nil
atomic.AddInt32(&c.count, 1)
c.Lock()
idx := sort.Search(len(c.gcSafePoint), func(i int) bool {
return c.gcSafePoint[i].safePoint >= safePoint
})
sp := c.gcSafePoint
ttlEnd := time.Now().Unix() + ttl
spTTL := safePointTTL{safePoint: safePoint, ttl: ttlEnd}
switch {
case idx >= len(sp):
c.gcSafePoint = append(c.gcSafePoint, spTTL)
case sp[idx].safePoint == safePoint:
if ttlEnd > sp[idx].ttl {
sp[idx].ttl = ttlEnd
}
default:
c.gcSafePoint = append(append(sp[:idx], spTTL), sp[idx:]...)
}
c.Unlock()
return c.currentSafePoint(), nil
}

func (s *checksumSuite) TestGcTTLManagerSingle(c *C) {
pdClient := &testPDClient{}
manager := gcTTLManager{pdClient: pdClient}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oldTTL := serviceSafePointTTL
// set serviceSafePointTTL to 3 second, so lightning will update it in each 1 seconds.
serviceSafePointTTL = 3
defer func() {
serviceSafePointTTL = oldTTL
}()

err := manager.addOneJob(ctx, "test", uint64(time.Now().Unix()))
c.Assert(err, IsNil)

time.Sleep(6 * time.Second)

// after 11 seconds, must at least update 5 times
val := atomic.LoadInt32(&pdClient.count)
c.Assert(val >= 5, IsTrue)

// after remove the job, there are no job remain, gc ttl needn't to be updated
manager.removeOneJob("test")
val = atomic.LoadInt32(&pdClient.count)
time.Sleep(3 * time.Second)
c.Assert(atomic.LoadInt32(&pdClient.count), Equals, val)
}

func (s *checksumSuite) TestGcTTLManager(c *C) {
func (s *checksumSuite) TestGcTTLManagerMulti(c *C) {
manager := gcTTLManager{pdClient: &testPDClient{}}
ctx := context.Background()

Expand All @@ -198,3 +304,50 @@ func (s *checksumSuite) TestGcTTLManager(c *C) {
manager.removeOneJob("test5")
c.Assert(manager.currentTs, Equals, uint64(0))
}

type mockResponse struct {
finished bool
data []byte
}

func (r *mockResponse) Next(ctx context.Context) (resultSubset kv.ResultSubset, err error) {
if r.finished {
return nil, nil
}
r.finished = true
return &mockResultSubset{data: r.data}, nil
}
func (r *mockResponse) Close() error {
return nil
}

type mockResultSubset struct {
data []byte
}

func (r *mockResultSubset) GetData() []byte {
return r.data
}

func (r *mockResultSubset) GetStartKey() kv.Key {
return []byte{}
}
func (r *mockResultSubset) MemSize() int64 {
return 0
}
func (r *mockResultSubset) RespTime() time.Duration {
return time.Millisecond
}

type mockChecksumKVClient struct {
kv.Client
checksum tipb.ChecksumResponse
respDur time.Duration
}

// a mock client for checksum request
func (c *mockChecksumKVClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables) kv.Response {
data, _ := c.checksum.Marshal()
time.Sleep(c.respDur)
return &mockResponse{data: data}
}

0 comments on commit 25a3711

Please sign in to comment.