Skip to content

Commit

Permalink
update 14
Browse files Browse the repository at this point in the history
  • Loading branch information
XuPeng-SH committed Dec 23, 2024
1 parent e7e8343 commit 3a0c867
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 104 deletions.
19 changes: 10 additions & 9 deletions pkg/vm/engine/tae/db/checkpoint/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,26 @@ func (job *checkpointJob) RunICKP(ctx context.Context) (err error) {
if lsn > job.runner.options.reservedWALEntryCount {
lsnToTruncate = lsn - job.runner.options.reservedWALEntryCount
}
entry.SetLSN(lsn, lsnToTruncate)

if file, err = job.runner.saveCheckpoint(
entry.start, entry.end,
); err != nil {
errPhase = "save-ckp"
if prepared := job.runner.store.PrepareCommitICKPIntent(entry); !prepared {
errPhase = "prepare"
rollback()
err = moerr.NewInternalErrorNoCtxf("cannot prepare ickp")
return
}

entry.SetLSN(lsn, lsnToTruncate)
if !job.runner.store.CommitICKPIntent(entry, false) {
errPhase = "commit"
if file, err = job.runner.saveCheckpoint(
entry.start, entry.end,
); err != nil {
errPhase = "save-ckp"
job.runner.store.RollbackICKPIntent(entry)
rollback()
err = moerr.NewInternalErrorNoCtxf("cannot commit ickp")
return
}

job.runner.store.CommitICKPIntent(entry)
v2.TaskCkpEntryPendingDurationHistogram.Observe(entry.Age().Seconds())
defer entry.Done()

files = append(files, file)

Expand Down
63 changes: 40 additions & 23 deletions pkg/vm/engine/tae/db/checkpoint/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,10 @@ func Test_RunnerStore1(t *testing.T) {
assert.True(t, taken.IsRunning())
assert.True(t, taken.end.EQ(&t3))

committed := store.CommitICKPIntent(taken, true)
assert.True(t, committed)
prepared := store.PrepareCommitICKPIntent(taken)
assert.True(t, prepared)

store.CommitICKPIntent(taken)

intent, updated = store.UpdateICKPIntent(&t3, true, true)
assert.False(t, updated)
Expand Down Expand Up @@ -496,13 +498,14 @@ func Test_RunnerStore3(t *testing.T) {

rollback()
intent6 := store.incrementalIntent.Load()
assert.True(t, intent6.IsPendding())
assert.True(t, intent6.end.EQ(&t2))
assert.True(t, intent6.start.IsEmpty())
assert.Equal(t, intent6.bornTime, intent5.bornTime)
assert.Equal(t, intent6.refreshCnt, intent5.refreshCnt)
assert.Equal(t, intent6.policyChecked, intent5.policyChecked)
assert.Equal(t, intent6.flushChecked, intent5.flushChecked)
assert.Nil(t, intent6)
// assert.True(t, intent6.IsPendding())
// assert.True(t, intent6.end.EQ(&t2))
// assert.True(t, intent6.start.IsEmpty())
// assert.Equal(t, intent6.bornTime, intent5.bornTime)
// assert.Equal(t, intent6.refreshCnt, intent5.refreshCnt)
// assert.Equal(t, intent6.policyChecked, intent5.policyChecked)
// assert.Equal(t, intent6.flushChecked, intent5.flushChecked)

ii2, updated = store.UpdateICKPIntent(&t3, true, true)
assert.True(t, updated)
Expand All @@ -527,8 +530,9 @@ func Test_RunnerStore3(t *testing.T) {
maxEntry := store.MaxIncrementalCheckpoint()
assert.Nil(t, maxEntry)

committed := store.CommitICKPIntent(taken, true)
assert.True(t, committed)
prepared := store.PrepareCommitICKPIntent(taken)
assert.True(t, prepared)
store.CommitICKPIntent(taken)
assert.True(t, taken.IsFinished())

wg.Wait()
Expand Down Expand Up @@ -563,21 +567,36 @@ func Test_RunnerStore3(t *testing.T) {
assert.True(t, taken2.IsRunning())
intent11 := store.incrementalIntent.Load()
assert.Equal(t, intent11, taken2)
t.Logf("taken2: %s", taken2.String())
entries := store.incrementals.Items()
for i, entry := range entries {
t.Logf("entry[%d]: %s", i, entry.String())
}

// cannot commit a different intent with the incremental intent
t5 := types.NextGlobalTsForTest()
taken2_1 := InheritCheckpointEntry(taken2, WithEndEntryOption(t5))
committed = store.CommitICKPIntent(taken2_1, true)
assert.False(t, committed)
prepared = store.PrepareCommitICKPIntent(taken2_1)
assert.False(t, prepared)

taken2.start = taken2.start.Next()
committed = store.CommitICKPIntent(taken2, true)
assert.False(t, committed)
prepared = store.PrepareCommitICKPIntent(taken2)
assert.True(t, prepared)

taken2.start = taken2.start.Prev()
committed = store.CommitICKPIntent(taken2, true)
assert.True(t, committed)
store.CommitICKPIntent(taken2)
assert.True(t, taken2.IsFinished())
entries = store.incrementals.Items()
assert.Equal(t, 2, len(entries))
for _, entry := range entries {
assert.True(t, entry.IsFinished())
}
assert.Equalf(
t,
taken2.end.Next(),
store.GetCheckpointed(),
"%s:%s",
taken2.end.ToString(),
store.GetCheckpointed().ToString(),
)

timer := time.After(time.Second * 10)
select {
Expand Down Expand Up @@ -617,10 +636,8 @@ func Test_RunnerStore4(t *testing.T) {

rollback()
intent4 := store.incrementalIntent.Load()
assert.True(t, intent4.IsPendding())
assert.True(t, intent4.end.EQ(&t2))
assert.True(t, intent4.start.IsEmpty())
assert.True(t, intent4.AllChecked())
assert.Nil(t, intent4)
<-taken.Wait()
}

func Test_RunnerStore5(t *testing.T) {
Expand Down
104 changes: 44 additions & 60 deletions pkg/vm/engine/tae/db/checkpoint/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,9 @@ func (s *runnerStore) TakeICKPIntent() (taken *CheckpointEntry, rollback func())
)
if s.incrementalIntent.CompareAndSwap(old, taken) {
rollback = func() {
// rollback the intent
putBack := InheritCheckpointEntry(
taken,
WithStateEntryOption(ST_Pending),
)
s.incrementalIntent.Store(putBack)
// clear the intent and notify the intent is done
s.incrementalIntent.Store(nil)
taken.Done()
}
break
}
Expand All @@ -248,72 +245,59 @@ func (s *runnerStore) TakeICKPIntent() (taken *CheckpointEntry, rollback func())
return
}

// intent must be in Running state
func (s *runnerStore) CommitICKPIntent(intent *CheckpointEntry, done bool) (committed bool) {
defer func() {
if done && committed {
intent.Done()
}
}()
old := s.incrementalIntent.Load()
func (s *runnerStore) PrepareCommitICKPIntent(
intent *CheckpointEntry,
) (ok bool) {
expect := s.incrementalIntent.Load()
// should not happen
if old != intent {
if intent != expect || !intent.IsRunning() {
logutil.Error(
"CommitICKPIntent-Error",
zap.String("intent", intent.String()),
zap.String("expected", old.String()),
"ICKP-PrepareCommit",
zap.Any("expected", expect),
zap.Any("actual", intent),
)
return
}
s.Lock()
defer s.Unlock()
maxICKP, _ := s.incrementals.Max()
maxGCKP, _ := s.globals.Max()
var (
maxICKPEndNext types.TS
maxGCKPEnd types.TS
)
if maxICKP != nil {
maxICKPEndNext = maxICKP.end.Next()
}
if maxGCKP != nil {
maxGCKPEnd = maxGCKP.end
}
if maxICKP == nil && maxGCKP == nil {
if !intent.start.IsEmpty() {
logutil.Error(
"CommitICKPIntent-Error",
zap.String("intent", intent.String()),
zap.String("max-i", "nil"),
zap.String("max-g", "nil"),
)
// PXU TODO: err = xxx
return
}
} else if (maxICKP == nil && !maxGCKPEnd.EQ(&intent.start)) ||
(maxICKP != nil && !maxICKPEndNext.EQ(&intent.start)) {
maxi := "nil"
maxg := "nil"
if maxICKP != nil {
maxi = maxICKP.String()
}
if maxGCKP != nil {
maxg = maxGCKP.String()
}
logutil.Error(
s.incrementals.Set(intent)
ok = true
return
}

func (s *runnerStore) RollbackICKPIntent(
intent *CheckpointEntry,
) {
expect := s.incrementalIntent.Load()
// should not happen
if intent != expect || !intent.IsRunning() {
logutil.Fatal(
"ICKP-Rollback",
zap.Any("expected", expect),
zap.Any("actual", intent),
)
}
s.Lock()
s.Unlock()
s.incrementals.Delete(intent)
}

// intent must be in Running state
func (s *runnerStore) CommitICKPIntent(
intent *CheckpointEntry,
) {
defer intent.Done()
old := s.incrementalIntent.Load()
// should not happen
if old != intent {
logutil.Fatal(
"CommitICKPIntent-Error",
zap.String("intent", intent.String()),
zap.String("max-i", maxi),
zap.String("max-g", maxg),
zap.String("expected", old.String()),
)
// PXU TODO: err = xxx
return
}
s.incrementalIntent.Store(nil)
intent.SetState(ST_Finished)
s.incrementals.Set(intent)
committed = true
return
s.incrementalIntent.Store(nil)
}

func (s *runnerStore) AddMetaFile(name string) {
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/tae/db/checkpoint/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (r *runner) ForceIncrementalCheckpoint(ts types.TS) (err error) {
return
case <-intent.Wait():
checkpointed := r.store.GetCheckpointed()
// if checkpointed < ts, something wrong may be happend and the previous intent was rollbacked
if checkpointed.LT(&ts) {
err = ErrPendingCheckpoint
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/vm/engine/tae/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ func (db *DB) ForceCheckpoint(
timeout := time.After(wait)
for {
select {
case <-ctx.Done():
err = context.Cause(ctx)
return
case <-timeout:
err = moerr.NewInternalError(ctx, "force checkpoint timeout")
return
Expand Down
16 changes: 4 additions & 12 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7784,22 +7784,19 @@ func Test_CheckpointChaos1(t *testing.T) {
assert.NoError(t, err)

now := tae.TxnMgr.Now()
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
err = tae.DB.ForceCheckpoint(ctx, now, time.Minute)
assert.Error(t, err)

intent := tae.BGCheckpointRunner.GetICKPIntentOnlyForTest()
t.Logf("intent: %v", intent)
assert.NotNil(t, intent)
assert.True(t, intent.AllChecked())
assert.True(t, intent.IsRunning())
assert.Nil(t, intent)

entries := tae.BGCheckpointRunner.GetAllIncrementalCheckpoints()
assert.Equal(t, 0, len(entries))
for i, entry := range entries {
t.Logf("checkpoint %d: %s", i, entry.String())
}

rmFn()
ctx = context.Background()
err = tae.DB.ForceCheckpoint(ctx, now, time.Minute)
assert.NoError(t, err)

Expand All @@ -7808,14 +7805,9 @@ func Test_CheckpointChaos1(t *testing.T) {
assert.True(t, entries[0].IsFinished())

intent = tae.BGCheckpointRunner.GetICKPIntentOnlyForTest()
t.Logf("intent: %v", intent)
assert.Nil(t, intent)
}

func Test_CheckpointChaos2(t *testing.T) {
// TODO
}

func TestGCCatalog1(t *testing.T) {
defer testutils.AfterTest(t)()
ctx := context.Background()
Expand Down

0 comments on commit 3a0c867

Please sign in to comment.