Skip to content

Commit

Permalink
fix transfer failed (#17873)
Browse files Browse the repository at this point in the history
fix wrong order of pk and rowids

Approved by: @LeftHandCold, @XuPeng-SH
  • Loading branch information
aptend authored Aug 2, 2024
1 parent 1edf649 commit d73f1c2
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,9 +656,9 @@ var (
SystemColAttr_AccID, SystemColAttr_DBName, SystemColAttr_RelName)

MoColumnsRowidsQueryFormat = fmt.Sprintf(
"select %s from `%s`.`%s` where %s = %%d and %s = %%q and %s = %%q and %s = %%d",
"select %s from `%s`.`%s` where %s = %%d and %s = %%q and %s = %%q and %s = %%d order by %s",
Row_ID, MO_CATALOG, MO_COLUMNS,
SystemColAttr_AccID, SystemColAttr_DBName, SystemColAttr_RelName, SystemColAttr_RelID)
SystemColAttr_AccID, SystemColAttr_DBName, SystemColAttr_RelName, SystemColAttr_RelID, SystemColAttr_Num)

MoColumnsSchema_V1 = []string{
SystemColAttr_UniqName,
Expand Down
3 changes: 3 additions & 0 deletions pkg/container/types/tuple.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ func findTerminator(b []byte) int {
length += 2
bp = bp[idx+2:]
}
if length == -1 {
return len(b)
}

return length
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/objectio/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@ type ObjectFS struct {
}

func TmpNewFileservice(ctx context.Context, dir string) fileservice.FileService {
return tmpNewFileservice(ctx, defines.LocalFileServiceName, dir)
}

func TmpNewSharedFileservice(ctx context.Context, dir string) fileservice.FileService {
return tmpNewFileservice(ctx, defines.SharedFileServiceName, dir)
}

func tmpNewFileservice(ctx context.Context, kind string, dir string) fileservice.FileService {
c := fileservice.Config{
Name: defines.LocalFileServiceName,
Name: kind,
Backend: "DISK",
DataDir: dir,
Cache: fileservice.DisabledCacheConfig,
Expand Down
10 changes: 8 additions & 2 deletions pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,9 @@ func (txn *Transaction) getCachedTable(

func (txn *Transaction) Commit(ctx context.Context) ([]txn.TxnRequest, error) {
logDebugf(txn.op.Txn(), "Transaction.Commit")
txn.IncrStatementID(ctx, true)
if err := txn.IncrStatementID(ctx, true); err != nil {
return nil, err
}
defer txn.delTransaction()
if txn.readOnly.Load() {
return nil, nil
Expand Down Expand Up @@ -1156,6 +1158,8 @@ func (txn *Transaction) GetSnapshotWriteOffset() int {
return txn.snapshotWriteOffset
}

type UT_ForceTransCheck struct{}

func (txn *Transaction) transferDeletesLocked(ctx context.Context, commit bool) error {
var latestTs timestamp.Timestamp
txn.timestamps = append(txn.timestamps, txn.op.SnapshotTS())
Expand All @@ -1170,7 +1174,9 @@ func (txn *Transaction) transferDeletesLocked(ctx context.Context, commit bool)
}
if commit {
if time.Since(txn.start) < time.Second*5 {
return nil
if ctx.Value(UT_ForceTransCheck{}) == nil {
return nil
}
}
//It's important to push the snapshot ts to the latest ts
if err := txn.op.UpdateSnapshot(
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,7 @@ func execReadSql(ctx context.Context, op client.TxnOperator, sql string, disable
service := op.GetWorkspace().(*Transaction).proc.GetService()
v, ok := moruntime.ServiceRuntime(service).GetGlobalVariables(moruntime.InternalSQLExecutor)
if !ok {
panic("missing lock service")
panic(fmt.Sprintf("missing sql executor in service %q", service))
}
exec := v.(executor.SQLExecutor)
proc := op.GetWorkspace().(*Transaction).proc
Expand Down
65 changes: 60 additions & 5 deletions pkg/vm/engine/test/disttae_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks"
ops "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils/config"
"github.com/matrixorigin/matrixone/pkg/vm/engine/test/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -137,7 +141,7 @@ func TestSystemDB1(t *testing.T) {

schema := catalog2.MockSchema(2, 0)
schema.Name = "test1inDb2"
p.CreateDBAndTables(txnop, "db2", schema)
p.CreateDBAndTable(txnop, "db2", schema)

dbs, err := p.D.Engine.Databases(p.Ctx, txnop)
require.NoError(t, err)
Expand Down Expand Up @@ -246,9 +250,8 @@ func TestLogtailBasic(t *testing.T) {
schema.Comment = "rows:10;blks=1"
// craete 2 db and 2 tables
txnop := p.StartCNTxn()
p.CreateDBAndTables(txnop, "todrop", schema)
_, tHs := p.CreateDBAndTables(txnop, "db", schema)
tH := tHs[0]
p.CreateDBAndTable(txnop, "todrop", schema)
_, tH := p.CreateDBAndTable(txnop, "db", schema)
dbID := tH.GetDBID(p.Ctx)
tableID := tH.GetTableID(p.Ctx)
require.NoError(t, txnop.Commit(p.Ctx))
Expand Down Expand Up @@ -458,7 +461,7 @@ func TestAlterTableBasic(t *testing.T) {
schema.Comment = initComment

txnop := p.StartCNTxn()
p.CreateDBAndTables(txnop, "db", schema)
p.CreateDBAndTable(txnop, "db", schema)
require.NoError(t, txnop.Commit(p.Ctx))

txnop = p.StartCNTxn()
Expand Down Expand Up @@ -528,6 +531,58 @@ func TestAlterTableBasic(t *testing.T) {
close()
}

func TestColumnsTransfer(t *testing.T) {
opts := config.WithLongScanAndCKPOpts(nil)
dir := testutil.MakeDefaultTestPath("partition_state", t)
opts.Fs = objectio.TmpNewSharedFileservice(context.Background(), dir)
p := testutil.InitEnginePack(testutil.TestOptions{TaeEngineOptions: opts}, t)
defer p.Close()
tae := p.T.GetDB()

schema := catalog2.MockSchemaAll(8188, -1)
schema.Name = "test"

schema2 := catalog2.MockSchemaAll(10, -1)
schema2.Name = "todrop"

txnop := p.StartCNTxn()
p.CreateDBAndTables(txnop, "db", schema, schema2)
require.NoError(t, txnop.Commit(p.Ctx))

txnop = p.StartCNTxn()
txnop.GetWorkspace().StartStatement()
p.DeleteTableInDB(txnop, "db", schema2.Name)

txn, _ := tae.StartTxn(nil)
catalogDB, _ := txn.GetDatabaseByID(catalog.MO_CATALOG_ID)
columnsTbl, _ := catalogDB.GetRelationByID(catalog.MO_COLUMNS_ID)

worker := ops.NewOpWorker(context.Background(), "xx")
worker.Start()
defer worker.Stop()

it := columnsTbl.MakeObjectIt()
it.Next()
firstEntry := it.GetObject().GetMeta().(*catalog2.ObjectEntry)
t.Log(firstEntry.ID().ShortStringEx())
task1, err := jobs.NewFlushTableTailTask(
tasks.WaitableCtx, txn,
[]*catalog2.ObjectEntry{firstEntry},
tae.Runtime, txn.GetStartTS())
require.NoError(t, err)
worker.SendOp(task1)
err = task1.WaitDone(context.Background())
require.NoError(t, err)

require.NoError(t, txn.Commit(p.Ctx))

time.Sleep(200 * time.Millisecond)
ctx := context.WithValue(p.Ctx, disttae.UT_ForceTransCheck{}, 42)
require.NoError(t, txnop.GetWorkspace().IncrStatementID(ctx, true))
require.NoError(t, txnop.Commit(p.Ctx))

}

func TestCacheGC(t *testing.T) {
opts := config.WithLongScanAndCKPOpts(nil)
p := testutil.InitEnginePack(testutil.TestOptions{TaeEngineOptions: opts}, t)
Expand Down
5 changes: 5 additions & 0 deletions pkg/vm/engine/test/testutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func (p *EnginePack) CreateDB(txnop client.TxnOperator, dbname string) engine.Da
return db
}

func (p *EnginePack) CreateDBAndTable(txnop client.TxnOperator, dbname string, schema *catalog2.Schema) (engine.Database, engine.Relation) {
db, rels := p.CreateDBAndTables(txnop, dbname, schema)
return db, rels[0]
}

func (p *EnginePack) CreateDBAndTables(txnop client.TxnOperator, dbname string, schema ...*catalog2.Schema) (engine.Database, []engine.Relation) {
db := p.CreateDB(txnop, dbname)
rels := make([]engine.Relation, 0, len(schema))
Expand Down

0 comments on commit d73f1c2

Please sign in to comment.