Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug for snapshot-read(1.2-dev) #16033

Merged
merged 5 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 48 additions & 38 deletions pkg/vm/engine/disttae/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,48 +393,33 @@ func (e *Engine) getOrCreateSnapCatalogCache(

func (e *Engine) getOrCreateSnapPart(
ctx context.Context,
databaseId uint64,
dbName string,
tableId uint64,
tblName string,
primaySeqnum int,
tbl *txnTable,
ts types.TS) (*logtailreplay.Partition, error) {

//First, check whether the latest partition is available.
e.Lock()
partition, ok := e.partitions[[2]uint64{databaseId, tableId}]
e.Unlock()
if ok && partition.CanServe(ts) {
return partition, nil
}

//Then, check whether the snapshot partitions is available.
//check whether the snapshot partitions are available for reuse.
e.mu.Lock()
snaps, ok := e.mu.snapParts[[2]uint64{databaseId, tableId}]
tblSnaps, ok := e.mu.snapParts[[2]uint64{tbl.db.databaseId, tbl.tableId}]
if !ok {
e.mu.snapParts[[2]uint64{databaseId, tableId}] = &struct {
e.mu.snapParts[[2]uint64{tbl.db.databaseId, tbl.tableId}] = &struct {
sync.Mutex
snaps []*logtailreplay.Partition
}{}
snaps = e.mu.snapParts[[2]uint64{databaseId, tableId}]
tblSnaps = e.mu.snapParts[[2]uint64{tbl.db.databaseId, tbl.tableId}]
}
e.mu.Unlock()

snaps.Lock()
defer snaps.Unlock()
for _, snap := range snaps.snaps {
tblSnaps.Lock()
defer tblSnaps.Unlock()
for _, snap := range tblSnaps.snaps {
if snap.CanServe(ts) {
return snap, nil
}
}

//new snapshot partition and apply checkpoints into it.
snap := logtailreplay.NewPartition()
//TODO::if tableId is mo_tables, or mo_colunms, or mo_database,
// we should init the partition,ref to engine.init
ckps, err := checkpoint.ListSnapshotCheckpoint(ctx, e.fs, ts, tableId, nil)
if ckps == nil {
return nil, moerr.NewInternalErrorNoCtx("No checkpoints for snapshot read")
}
ckps, err := checkpoint.ListSnapshotCheckpoint(ctx, e.fs, ts, tbl.tableId, nil)
if err != nil {
return nil, err
}
Expand All @@ -448,10 +433,10 @@ func (e *Engine) getOrCreateSnapPart(
entries, closeCBs, err := logtail.LoadCheckpointEntries(
ctx,
locations,
tableId,
tblName,
databaseId,
dbName,
tbl.tableId,
tbl.tableName,
tbl.db.databaseId,
tbl.db.databaseName,
e.mp,
e.fs)
if err != nil {
Expand All @@ -463,26 +448,46 @@ func (e *Engine) getOrCreateSnapPart(
}
}()
for _, entry := range entries {
if err = consumeEntry(ctx, primaySeqnum, e, nil, state, entry); err != nil {
if err = consumeEntry(
ctx,
tbl.primarySeqnum,
e,
nil,
state,
entry); err != nil {
return err
}
}
return nil
})
if snap.CanServe(ts) {
tblSnaps.snaps = append(tblSnaps.snaps, snap)
return snap, nil
}

start, end := snap.GetDuration()
if ts.Greater(&end) || ts.Less(&start) {
//if has no checkpoints or ts > snap.end, use latest partition.
if snap.IsEmpty() || ts.Greater(&end) {
err := tbl.updateLogtail(ctx)
if err != nil {
return nil, err
}
return e.getOrCreateLatestPart(tbl.db.databaseId, tbl.tableId), nil
}
if ts.Less(&start) {
return nil, moerr.NewInternalErrorNoCtx(
"Invalid checkpoints for snapshot read,snapshot:%s, start:%s, end:%s",
"No valid checkpoints for snapshot read,maybe snapshot is too old, "+
"snapshot:%s, start:%s, end:%s",
ts.ToTimestamp().DebugString(),
start.ToTimestamp().DebugString(),
end.ToTimestamp().DebugString())
}
snaps.snaps = append(snaps.snaps, snap)

return snap, nil
panic("impossible path")
}

func (e *Engine) getOrCreateLatestPart(databaseId, tableId uint64) *logtailreplay.Partition {
func (e *Engine) getOrCreateLatestPart(
databaseId,
tableId uint64) *logtailreplay.Partition {
e.Lock()
defer e.Unlock()
partition, ok := e.partitions[[2]uint64{databaseId, tableId}]
Expand All @@ -493,7 +498,9 @@ func (e *Engine) getOrCreateLatestPart(databaseId, tableId uint64) *logtailrepla
return partition
}

func (e *Engine) lazyLoadLatestCkp(ctx context.Context, tbl *txnTable) (*logtailreplay.Partition, error) {
func (e *Engine) lazyLoadLatestCkp(
ctx context.Context,
tbl *txnTable) (*logtailreplay.Partition, error) {
part := e.getOrCreateLatestPart(tbl.db.databaseId, tbl.tableId)
cache := e.getLatestCatalogCache()

Expand Down Expand Up @@ -531,6 +538,9 @@ func (e *Engine) lazyLoadLatestCkp(ctx context.Context, tbl *txnTable) (*logtail
return part, nil
}

func (e *Engine) UpdateOfPush(ctx context.Context, databaseId, tableId uint64, ts timestamp.Timestamp) error {
func (e *Engine) UpdateOfPush(
ctx context.Context,
databaseId,
tableId uint64, ts timestamp.Timestamp) error {
return e.pClient.TryToSubscribeTable(ctx, databaseId, tableId)
}
3 changes: 0 additions & 3 deletions pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,6 @@ func (e *Engine) Database(ctx context.Context, name string,
if !txn.op.IsSnapOp() {
catalog = e.getLatestCatalogCache()
} else {
if name == "test" {
logutil.Infof("xxxx Database-getOrCreateSnapCatalogCache: txn:%s", txn.op.Txn().DebugString())
}
catalog, err = e.getOrCreateSnapCatalogCache(
ctx,
types.TimestampToTS(txn.op.SnapshotTS()))
Expand Down
19 changes: 13 additions & 6 deletions pkg/vm/engine/disttae/logtailreplay/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"sync"
"sync/atomic"

"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"

"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -107,12 +107,18 @@ func (p *Partition) Unlock() {
p.lock <- struct{}{}
}

func (p *Partition) checkValid() bool {
func (p *Partition) IsValid() bool {
p.mu.Lock()
defer p.mu.Unlock()
return p.mu.start.LessEq(&p.mu.end)
}

func (p *Partition) IsEmpty() bool {
p.mu.Lock()
defer p.mu.Unlock()
return p.mu.start == types.MaxTs()
}

func (p *Partition) UpdateStart(ts types.TS) {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down Expand Up @@ -145,6 +151,9 @@ func (p *Partition) ConsumeSnapCkps(
) (
err error,
) {
if len(ckps) == 0 {
return nil
}
//Notice that checkpoints must contain only one or zero global checkpoint
//followed by zero or multi continuous incremental checkpoints.
state := p.state.Load()
Expand Down Expand Up @@ -173,10 +182,9 @@ func (p *Partition) ConsumeSnapCkps(
end = start
}
p.UpdateDuration(start, end)
if !p.checkValid() {
panic("invalid checkpoint")
if !p.IsValid() {
return moerr.NewInternalErrorNoCtx("invalid checkpoints duration")
}

return nil
}

Expand Down Expand Up @@ -207,7 +215,6 @@ func (p *Partition) ConsumeCheckpoints(

curState = p.state.Load()
if len(curState.checkpoints) == 0 {
logutil.Infof("xxxx impossible path")
p.UpdateDuration(types.TS{}, types.MaxTs())
return nil
}
Expand Down
1 change: 0 additions & 1 deletion pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,6 @@ func (txn *Transaction) forEachTableWrites(databaseId uint64, tableId uint64, of
// getCachedTable returns the cached table in this transaction if it exists, nil otherwise.
// Before it gets the cached table, it checks whether the table is deleted by another
// transaction by go through the delete tables slice, and advance its cachedIndex.
// TODO::get snapshot table from cache for snapshot read
func (txn *Transaction) getCachedTable(
ctx context.Context,
k tableKey,
Expand Down
33 changes: 8 additions & 25 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,33 +104,19 @@ func (tbl *txnTable) stats(ctx context.Context) (*pb.StatsInfo, error) {
logutil.Errorf("failed to unmarshal partition table: %v", err)
return nil, err
}
var cataChe *cache.CatalogCache
if !tbl.db.op.IsSnapOp() {
cataChe = e.getLatestCatalogCache()
} else {
cataChe, err = e.getOrCreateSnapCatalogCache(
ctx,
types.TimestampToTS(tbl.db.op.SnapshotTS()))
for _, partitionTableName := range partitionInfo.PartitionTableNames {
partitionTable, err := tbl.db.Relation(ctx, partitionTableName, nil)
if err != nil {
return nil, err
}
}
for _, partitionTableName := range partitionInfo.PartitionTableNames {
partitionTable := cataChe.GetTableByName(
tbl.db.databaseId, partitionTableName)
partitionsTableDef = append(partitionsTableDef, partitionTable.TableDef)

partitionsTableDef = append(partitionsTableDef, partitionTable.(*txnTable).tableDef)
var ps *logtailreplay.PartitionState
if !tbl.db.op.IsSnapOp() {
ps = e.getOrCreateLatestPart(tbl.db.databaseId, partitionTable.Id).Snapshot()
ps = e.getOrCreateLatestPart(tbl.db.databaseId, partitionTable.(*txnTable).tableId).Snapshot()
} else {
p, err := e.getOrCreateSnapPart(
ctx,
tbl.db.databaseId,
partitionTable.TableDef.GetDbName(),
partitionTable.Id,
partitionTable.TableDef.GetName(),
partitionTable.PrimarySeqnum,
partitionTable.(*txnTable),
types.TimestampToTS(tbl.db.op.SnapshotTS()),
)
if err != nil {
Expand Down Expand Up @@ -2253,12 +2239,9 @@ func (tbl *txnTable) getPartitionState(

// for snapshot txnOp
if tbl._partState.Load() == nil {
p, err := tbl.getTxn().engine.getOrCreateSnapPart(ctx,
tbl.db.databaseId,
tbl.db.databaseName,
tbl.tableId,
tbl.tableName,
tbl.primarySeqnum,
p, err := tbl.getTxn().engine.getOrCreateSnapPart(
ctx,
tbl,
types.TimestampToTS(tbl.db.op.Txn().SnapshotTS))
if err != nil {
return nil, err
Expand Down
1 change: 0 additions & 1 deletion pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ type Transaction struct {
rowId [6]uint32
segId types.Uuid
// use to cache opened snapshot tables by current txn.
//TODO::cache snapshot tables for snapshot read.
tableCache struct {
cachedIndex int
tableMap *sync.Map
Expand Down
2 changes: 0 additions & 2 deletions test/distributed/cases/snapshot/snapshot.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
-- create snapshot success
-- @bvt:issue#14784
create snapshot snapshot_01 for cluster;
create account default_1 ADMIN_NAME admin IDENTIFIED BY '111111';
create snapshot snapshot_02 for account default_1;
Expand Down Expand Up @@ -65,4 +64,3 @@ drop snapshot if exists snapshot_09;
drop snapshot if exists snapshot_10;
drop account default_1;
drop account default_2;
-- @bvt:issue
2 changes: 0 additions & 2 deletions test/distributed/cases/snapshot/snapshot_read.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- @bvt:issue#14784
create database if not exists snapshot_read;
use snapshot_read;
create table test_snapshot_read (a int);
Expand Down Expand Up @@ -182,4 +181,3 @@ drop snapshot sp_01;
drop database if exists snapshot_read;
-- @session
drop account if exists test_account;
-- @bvt:issue
2 changes: 0 additions & 2 deletions test/distributed/cases/snapshot/snapshot_read_1.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- @bvt:issue#14784
create snapshot snapshot_01 for account sys;
create database if not exists snapshot_read;
use snapshot_read;
Expand Down Expand Up @@ -54,4 +53,3 @@ drop database snapshot_read;
select count(*) from snapshot_read.test_snapshot_read{snapshot = 'snapshot_01'};
drop database if exists snapshot_read;
drop snapshot snapshot_01;
-- @bvt:issue
2 changes: 0 additions & 2 deletions test/distributed/cases/snapshot/snapshot_read_2.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- @bvt:issue#14784
create database if not exists snapshot_read;
use snapshot_read;
create table test_snapshot_read (a int);
Expand Down Expand Up @@ -50,4 +49,3 @@ select count(*) from t3;
drop database snapshot_read;
drop snapshot snapshot_01;
drop snapshot snapshot_02;
-- @bvt:issue
2 changes: 0 additions & 2 deletions test/distributed/cases/snapshot/snapshot_read_3.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- @bvt:issue#14784
create database if not exists snapshot_read;
use snapshot_read;
create table test_snapshot_read (a int);
Expand Down Expand Up @@ -49,4 +48,3 @@ show create table snapshot_read.test_snapshot_read {snapshot = 'snapshot_01'};
show create table snapshot_read.test_snapshot_read_view {snapshot = 'snapshot_01'};
show databases like 'snapshot_read' {snapshot = 'snapshot_01'};
drop snapshot snapshot_01;
-- @bvt:issue
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- @bvt:issue#14784
-- account level
create database if not exists snapshot_read;
use snapshot_read;
Expand Down Expand Up @@ -602,4 +601,3 @@ select count(*) from snapshot_read.students;
drop account test_account;
drop account test_account_01;
drop snapshot snapshot_01;
-- @bvt:issue
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- @bvt:issue#14784
-- database level update/delete/insert
create database if not exists snapshot_read;
use snapshot_read;
Expand Down Expand Up @@ -344,4 +343,3 @@ select count(*) from snapshot_read.users;
drop snapshot sp_01;
drop account test_account;
drop account test_account_2;
-- @bvt:issue
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- @bvt:issue#14784
-- table level noraml
create database if not exists snapshot_read;
use snapshot_read;
Expand Down Expand Up @@ -343,4 +342,3 @@ select count(*) from snapshot_read.users;
drop snapshot sp_01;
drop account test_account;
drop account test_account_2;
-- @bvt:issue
Loading