Skip to content

Commit

Permalink
update 3
Browse files Browse the repository at this point in the history
  • Loading branch information
XuPeng-SH committed Dec 22, 2024
1 parent e50d075 commit fa02805
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 26 deletions.
6 changes: 3 additions & 3 deletions pkg/vm/engine/tae/db/checkpoint/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type TestRunner interface {
ForceGlobalCheckpoint(end types.TS, versionInterval time.Duration) error
ForceGlobalCheckpointSynchronously(ctx context.Context, end types.TS, versionInterval time.Duration) error
ForceCheckpointForBackup(end types.TS) (string, error)
ForceIncrementalCheckpoint2(end types.TS) error
ForceIncrementalCheckpoint(end types.TS) error
MaxLSNInRange(end types.TS) uint64

GCNeeded() bool
Expand Down Expand Up @@ -93,7 +93,7 @@ func (r *runner) ForceGlobalCheckpoint(end types.TS, interval time.Duration) err
case <-timeout:
return moerr.NewInternalError(r.ctx, "timeout")
default:
err = r.ForceIncrementalCheckpoint2(end)
err = r.ForceIncrementalCheckpoint(end)
if err != nil {
if dbutils.IsRetrieableCheckpoint(err) {
retryTime++
Expand Down Expand Up @@ -136,7 +136,7 @@ func (r *runner) ForceGlobalCheckpointSynchronously(ctx context.Context, end typ
return nil
}

func (r *runner) ForceIncrementalCheckpoint2(ts types.TS) (err error) {
func (r *runner) ForceIncrementalCheckpoint(ts types.TS) (err error) {
var intent Intent
if intent, err = r.TryScheduleCheckpoint(ts, true); err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (db *DB) ForceCheckpoint(
err = moerr.NewInternalError(ctx, "force checkpoint timeout")
return
default:
err = db.BGCheckpointRunner.ForceIncrementalCheckpoint2(ts)
err = db.BGCheckpointRunner.ForceIncrementalCheckpoint(ts)
if dbutils.IsRetrieableCheckpoint(err) {
db.BGCheckpointRunner.CleanPenddingCheckpoint()
interval := flushDuration.Milliseconds() / 400
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/db/test/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestCheckpointCatalog2(t *testing.T) {
}
wg.Wait()
ts := types.BuildTS(time.Now().UTC().UnixNano(), 0)
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(ts)
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(ts)
assert.NoError(t, err)
lsn := tae.BGCheckpointRunner.MaxLSNInRange(ts)
entry, err := tae.Wal.RangeCheckpoint(1, lsn)
Expand Down
10 changes: 5 additions & 5 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2710,7 +2710,7 @@ func TestSegDelLogtail(t *testing.T) {
testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemDBSchema, false)
testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemTableSchema, false)
testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemColumnSchema, false)
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)

check := func() {
Expand Down Expand Up @@ -3687,7 +3687,7 @@ func TestDropCreated3(t *testing.T) {
testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemDBSchema, false)
testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemTableSchema, false)
testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemColumnSchema, false)
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.Nil(t, err)

tae.Restart(ctx)
Expand Down Expand Up @@ -3744,7 +3744,7 @@ func TestDropCreated4(t *testing.T) {
assert.Nil(t, err)
assert.Nil(t, txn.Commit(context.Background()))

err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.Nil(t, err)

tae.Restart(ctx)
Expand Down Expand Up @@ -8033,7 +8033,7 @@ func TestForceCheckpoint(t *testing.T) {

err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10)
assert.Error(t, err)
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)
}

Expand Down Expand Up @@ -8129,7 +8129,7 @@ func TestMarshalPartioned(t *testing.T) {
testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemDBSchema, false)
testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemTableSchema, false)
testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemColumnSchema, false)
err := tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err := tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)
lsn := tae.BGCheckpointRunner.MaxLSNInRange(tae.TxnMgr.Now())
entry, err := tae.Wal.RangeCheckpoint(1, lsn)
Expand Down
24 changes: 12 additions & 12 deletions pkg/vm/engine/tae/db/test/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestReplayCatalog1(t *testing.T) {
testutil.CompactBlocks(t, 0, tae, pkgcatalog.MO_CATALOG, catalog.SystemDBSchema, false)
testutil.CompactBlocks(t, 0, tae, pkgcatalog.MO_CATALOG, catalog.SystemTableSchema, false)
testutil.CompactBlocks(t, 0, tae, pkgcatalog.MO_CATALOG, catalog.SystemColumnSchema, false)
err := tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err := tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)
}
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestReplayCatalog2(t *testing.T) {
assert.Nil(t, err)
assert.Nil(t, txn.Commit(context.Background()))
t.Log(tae.Catalog.SimplePPString(common.PPL1))
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)
tae.Close()

Expand Down Expand Up @@ -461,7 +461,7 @@ func TestReplay2(t *testing.T) {

err = tae2.ForceFlush(tae2.TxnMgr.Now(), context.Background(), time.Second*10)
assert.NoError(t, err)
err = tae2.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae2.TxnMgr.Now())
err = tae2.BGCheckpointRunner.ForceIncrementalCheckpoint(tae2.TxnMgr.Now())
assert.NoError(t, err)

txn, rel = testutil.GetRelation(t, 0, tae2, "db", schema.Name)
Expand Down Expand Up @@ -562,7 +562,7 @@ func TestReplay3(t *testing.T) {
assert.NoError(t, txn.Commit(context.Background()))

txn, _ = tae.GetRelation()
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)
assert.NoError(t, txn.Commit(context.Background()))
}
Expand Down Expand Up @@ -816,7 +816,7 @@ func TestReplay5(t *testing.T) {
testutil.CompactBlocks(t, 0, tae, testutil.DefaultTestDB, schema, false)
err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10)
assert.NoError(t, err)
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)
lsn := tae.BGCheckpointRunner.MaxLSNInRange(tae.TxnMgr.Now())
entry, err := tae.Wal.RangeCheckpoint(1, lsn)
Expand All @@ -843,7 +843,7 @@ func TestReplay5(t *testing.T) {
testutil.CompactBlocks(t, 0, tae, testutil.DefaultTestDB, schema, false)
err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10)
assert.NoError(t, err)
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)
lsn = tae.BGCheckpointRunner.MaxLSNInRange(tae.TxnMgr.Now())
entry, err = tae.Wal.RangeCheckpoint(1, lsn)
Expand Down Expand Up @@ -880,7 +880,7 @@ func TestReplay5(t *testing.T) {

err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10)
assert.NoError(t, err)
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)
lsn = tae.BGCheckpointRunner.MaxLSNInRange(tae.TxnMgr.Now())
entry, err = tae.Wal.RangeCheckpoint(1, lsn)
Expand Down Expand Up @@ -941,7 +941,7 @@ func TestReplay6(t *testing.T) {
testutil.MergeBlocks(t, 0, tae, testutil.DefaultTestDB, schema, false)
err = tae.BGFlusher.ForceFlushWithInterval(tae.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10)
assert.NoError(t, err)
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)

_ = tae.Close()
Expand Down Expand Up @@ -1121,7 +1121,7 @@ func TestReplay8(t *testing.T) {
_ = txn.Rollback(context.Background())

tae.CompactBlocks(false)
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)
tae.Restart(ctx)

Expand Down Expand Up @@ -1322,7 +1322,7 @@ func TestReplaySnapshots(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, txn.Commit(context.Background()))

err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)

txn, err = tae.StartTxn(nil)
Expand All @@ -1346,7 +1346,7 @@ func TestReplaySnapshots(t *testing.T) {
assert.False(t, baseNode.IsEmpty())
assert.NoError(t, txn.Commit(context.Background()))

err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)
t.Log(tae.Catalog.SimplePPString(3))

Expand Down Expand Up @@ -1382,7 +1382,7 @@ func TestReplayDatabaseEntry(t *testing.T) {
assert.Equal(t, createSqlStr, dbEntry.GetCreateSql())

testutil.CompactBlocks(t, 0, tae.DB, pkgcatalog.MO_CATALOG, catalog.SystemDBSchema, false)
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint2(tae.TxnMgr.Now())
err = tae.BGCheckpointRunner.ForceIncrementalCheckpoint(tae.TxnMgr.Now())
assert.NoError(t, err)

tae.Restart(ctx)
Expand Down
8 changes: 4 additions & 4 deletions pkg/vm/engine/tae/db/testutil/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,21 @@ func (e *TestEngine) CheckRowsByScan(exp int, applyDelete bool) {
func (e *TestEngine) ForceCheckpoint() {
err := e.BGFlusher.ForceFlushWithInterval(e.TxnMgr.Now(), context.Background(), time.Second*2, time.Millisecond*10)
assert.NoError(e.T, err)
err = e.BGCheckpointRunner.ForceIncrementalCheckpoint2(e.TxnMgr.Now())
err = e.BGCheckpointRunner.ForceIncrementalCheckpoint(e.TxnMgr.Now())
assert.NoError(e.T, err)
}

func (e *TestEngine) ForceLongCheckpoint() {
err := e.BGFlusher.ForceFlush(e.TxnMgr.Now(), context.Background(), 20*time.Second)
assert.NoError(e.T, err)
err = e.BGCheckpointRunner.ForceIncrementalCheckpoint2(e.TxnMgr.Now())
err = e.BGCheckpointRunner.ForceIncrementalCheckpoint(e.TxnMgr.Now())
assert.NoError(e.T, err)
}

func (e *TestEngine) ForceLongCheckpointTruncate() {
err := e.BGFlusher.ForceFlush(e.TxnMgr.Now(), context.Background(), 20*time.Second)
assert.NoError(e.T, err)
err = e.BGCheckpointRunner.ForceIncrementalCheckpoint2(e.TxnMgr.Now())
err = e.BGCheckpointRunner.ForceIncrementalCheckpoint(e.TxnMgr.Now())
assert.NoError(e.T, err)
}

Expand Down Expand Up @@ -309,7 +309,7 @@ func (e *TestEngine) IncrementalCheckpoint(
flushed := e.DB.BGFlusher.IsAllChangesFlushed(types.TS{}, end, true)
require.True(e.T, flushed)
}
err := e.DB.BGCheckpointRunner.ForceIncrementalCheckpoint2(end)
err := e.DB.BGCheckpointRunner.ForceIncrementalCheckpoint(end)
require.NoError(e.T, err)
if truncate {
lsn := e.DB.BGCheckpointRunner.MaxLSNInRange(end)
Expand Down

0 comments on commit fa02805

Please sign in to comment.