Skip to content

Commit

Permalink
logtailreplay: fix missing Vector.Free (#17685)
Browse files Browse the repository at this point in the history
fix missing Vector.Free in logtailreplay, and minor naming fixes to make codes more readable.

Approved by: @triump2020
  • Loading branch information
reusee authored Jul 26, 2024
1 parent e19fcee commit 7bc7dba
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 48 deletions.
14 changes: 7 additions & 7 deletions pkg/vm/engine/disttae/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done := part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF, packer, e.mp)
done()
e.catalog.InsertDatabase(bat)
bat.Clean(m)
Expand All @@ -97,7 +97,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done := part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_TABLES_REL_ID_IDX, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_TABLES_REL_ID_IDX, packer, e.mp)
done()
e.catalog.InsertTable(bat)
bat.Clean(m)
Expand Down Expand Up @@ -131,7 +131,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done = part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_COLUMNS_ATT_UNIQ_NAME_IDX, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_COLUMNS_ATT_UNIQ_NAME_IDX, packer, e.mp)
done()
e.catalog.InsertColumns(bat)
bat.Clean(m)
Expand All @@ -157,7 +157,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done := part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_TABLES_REL_ID_IDX, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_TABLES_REL_ID_IDX, packer, e.mp)
done()
e.catalog.InsertTable(bat)
bat.Clean(m)
Expand Down Expand Up @@ -191,7 +191,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done = part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_COLUMNS_ATT_UNIQ_NAME_IDX, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_COLUMNS_ATT_UNIQ_NAME_IDX, packer, e.mp)
done()
e.catalog.InsertColumns(bat)
bat.Clean(m)
Expand All @@ -217,7 +217,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done := part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_TABLES_REL_ID_IDX, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_TABLES_REL_ID_IDX, packer, e.mp)
done()
e.catalog.InsertTable(bat)
bat.Clean(m)
Expand Down Expand Up @@ -251,7 +251,7 @@ func (e *Engine) init(ctx context.Context) error {
return err
}
state, done = part.MutateState()
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_COLUMNS_ATT_UNIQ_NAME_IDX, packer)
state.HandleRowsInsert(ctx, ibat, MO_PRIMARY_OFF+catalog.MO_COLUMNS_ATT_UNIQ_NAME_IDX, packer, e.mp)
done()
e.catalog.InsertColumns(bat)
bat.Clean(m)
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/logtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func consumeEntry(

if state != nil {
t0 := time.Now()
state.HandleLogtailEntry(ctx, engine.fs, e, primarySeqnum, packer)
state.HandleLogtailEntry(ctx, engine.fs, e, primarySeqnum, packer, engine.mp)
v2.LogtailUpdatePartitonConsumeLogtailOneEntryLogtailReplayDurationHistogram.Observe(time.Since(t0).Seconds())
}

Expand Down
155 changes: 124 additions & 31 deletions pkg/vm/engine/disttae/logtailreplay/partition_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync/atomic"
"unsafe"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
Expand Down Expand Up @@ -358,39 +359,64 @@ func (p *PartitionState) HandleLogtailEntry(
entry *api.Entry,
primarySeqnum int,
packer *types.Packer,
pool *mpool.MPool,
) {

txnTrace.GetService(p.service).ApplyLogtail(entry, 1)

switch entry.EntryType {

case api.Entry_Insert:
if IsBlkTable(entry.TableName) {
p.HandleMetadataInsert(ctx, fs, entry.Bat)
p.HandleMetadataInsert(ctx, fs, entry.Bat, pool)
} else if IsObjTable(entry.TableName) {
p.HandleObjectInsert(ctx, entry.Bat, fs)
p.HandleObjectInsert(ctx, entry.Bat, fs, pool)
} else {
p.HandleRowsInsert(ctx, entry.Bat, primarySeqnum, packer)
p.HandleRowsInsert(ctx, entry.Bat, primarySeqnum, packer, pool)
}

case api.Entry_Delete:
if IsBlkTable(entry.TableName) {
p.HandleMetadataDelete(ctx, entry.TableId, entry.Bat)
} else if IsObjTable(entry.TableName) {
p.HandleObjectDelete(entry.TableId, entry.Bat)
p.HandleObjectDelete(entry.TableId, entry.Bat, pool)
} else {
p.HandleRowsDelete(ctx, entry.Bat, packer)
p.HandleRowsDelete(ctx, entry.Bat, packer, pool)
}

default:
panic("unknown entry type")
}
}

func (p *PartitionState) HandleObjectDelete(
tableID uint64,
bat *api.Batch) {
bat *api.Batch,
pool *mpool.MPool,
) {

statsVec := mustVectorFromProto(bat.Vecs[2])
stateCol := vector.MustFixedCol[bool](mustVectorFromProto(bat.Vecs[3]))
sortedCol := vector.MustFixedCol[bool](mustVectorFromProto(bat.Vecs[4]))
createTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[7]))
deleteTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[8]))
commitTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[11]))
defer statsVec.Free(pool)

vec := mustVectorFromProto(bat.Vecs[3])
defer vec.Free(pool)
stateCol := vector.MustFixedCol[bool](vec)

vec = mustVectorFromProto(bat.Vecs[4])
defer vec.Free(pool)
sortedCol := vector.MustFixedCol[bool](vec)

vec = mustVectorFromProto(bat.Vecs[7])
defer vec.Free(pool)
createTSCol := vector.MustFixedCol[types.TS](vec)

vec = mustVectorFromProto(bat.Vecs[8])
defer vec.Free(pool)
deleteTSCol := vector.MustFixedCol[types.TS](vec)

vec = mustVectorFromProto(bat.Vecs[11])
defer vec.Free(pool)
commitTSCol := vector.MustFixedCol[types.TS](vec)

for idx := 0; idx < len(stateCol); idx++ {
var objEntry ObjectEntry
Expand All @@ -410,16 +436,41 @@ func (p *PartitionState) HandleObjectDelete(
}
}

func (p *PartitionState) HandleObjectInsert(ctx context.Context, bat *api.Batch, fs fileservice.FileService) {
func (p *PartitionState) HandleObjectInsert(
ctx context.Context,
bat *api.Batch,
fs fileservice.FileService,
pool *mpool.MPool,
) {

var numDeleted, blockDeleted, scanCnt int64

statsVec := mustVectorFromProto(bat.Vecs[2])
stateCol := vector.MustFixedCol[bool](mustVectorFromProto(bat.Vecs[3]))
sortedCol := vector.MustFixedCol[bool](mustVectorFromProto(bat.Vecs[4]))
createTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[7]))
deleteTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[8]))
startTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[9]))
commitTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[11]))
defer statsVec.Free(pool)

vec := mustVectorFromProto(bat.Vecs[3])
defer vec.Free(pool)
stateCol := vector.MustFixedCol[bool](vec)

vec = mustVectorFromProto(bat.Vecs[4])
defer vec.Free(pool)
sortedCol := vector.MustFixedCol[bool](vec)

vec = mustVectorFromProto(bat.Vecs[7])
defer vec.Free(pool)
createTSCol := vector.MustFixedCol[types.TS](vec)

vec = mustVectorFromProto(bat.Vecs[8])
defer vec.Free(pool)
deleteTSCol := vector.MustFixedCol[types.TS](vec)

vec = mustVectorFromProto(bat.Vecs[9])
defer vec.Free(pool)
startTSCol := vector.MustFixedCol[types.TS](vec)

vec = mustVectorFromProto(bat.Vecs[11])
defer vec.Free(pool)
commitTSCol := vector.MustFixedCol[types.TS](vec)

for idx := 0; idx < len(stateCol); idx++ {
p.shared.Lock()
Expand Down Expand Up @@ -581,14 +632,21 @@ func (p *PartitionState) HandleRowsInsert(
input *api.Batch,
primarySeqnum int,
packer *types.Packer,
pool *mpool.MPool,
) (
primaryKeys [][]byte,
) {
ctx, task := trace.NewTask(ctx, "PartitionState.HandleRowsInsert")
defer task.End()

rowIDVector := vector.MustFixedCol[types.Rowid](mustVectorFromProto(input.Vecs[0]))
timeVector := vector.MustFixedCol[types.TS](mustVectorFromProto(input.Vecs[1]))
vec := mustVectorFromProto(input.Vecs[0])
defer vec.Free(pool)
rowIDVector := vector.MustFixedCol[types.Rowid](vec)

vec = mustVectorFromProto(input.Vecs[1])
defer vec.Free(pool)
timeVector := vector.MustFixedCol[types.TS](vec)

batch, err := batch.ProtoBatchToBatch(input)
if err != nil {
panic(err)
Expand Down Expand Up @@ -647,12 +705,19 @@ func (p *PartitionState) HandleRowsDelete(
ctx context.Context,
input *api.Batch,
packer *types.Packer,
pool *mpool.MPool,
) {
ctx, task := trace.NewTask(ctx, "PartitionState.HandleRowsDelete")
defer task.End()

rowIDVector := vector.MustFixedCol[types.Rowid](mustVectorFromProto(input.Vecs[0]))
timeVector := vector.MustFixedCol[types.TS](mustVectorFromProto(input.Vecs[1]))
vec := mustVectorFromProto(input.Vecs[0])
defer vec.Free(pool)
rowIDVector := vector.MustFixedCol[types.Rowid](vec)

vec = mustVectorFromProto(input.Vecs[1])
defer vec.Free(pool)
timeVector := vector.MustFixedCol[types.TS](vec)

batch, err := batch.ProtoBatchToBatch(input)
if err != nil {
panic(err)
Expand Down Expand Up @@ -720,19 +785,46 @@ func (p *PartitionState) HandleRowsDelete(
func (p *PartitionState) HandleMetadataInsert(
ctx context.Context,
fs fileservice.FileService,
input *api.Batch) {
input *api.Batch,
pool *mpool.MPool,
) {

ctx, task := trace.NewTask(ctx, "PartitionState.HandleMetadataInsert")
defer task.End()

createTimeVector := vector.MustFixedCol[types.TS](mustVectorFromProto(input.Vecs[1]))
blockIDVector := vector.MustFixedCol[types.Blockid](mustVectorFromProto(input.Vecs[2]))
entryStateVector := vector.MustFixedCol[bool](mustVectorFromProto(input.Vecs[3]))
sortedStateVector := vector.MustFixedCol[bool](mustVectorFromProto(input.Vecs[4]))
vec := mustVectorFromProto(input.Vecs[1])
defer vec.Free(pool)
createTimeVector := vector.MustFixedCol[types.TS](vec)

vec = mustVectorFromProto(input.Vecs[2])
defer vec.Free(pool)
blockIDVector := vector.MustFixedCol[types.Blockid](vec)

vec = mustVectorFromProto(input.Vecs[3])
defer vec.Free(pool)
entryStateVector := vector.MustFixedCol[bool](vec)

vec = mustVectorFromProto(input.Vecs[4])
defer vec.Free(pool)
sortedStateVector := vector.MustFixedCol[bool](vec)

metaLocationVector := mustVectorFromProto(input.Vecs[5])
defer metaLocationVector.Free(pool)

deltaLocationVector := mustVectorFromProto(input.Vecs[6])
commitTimeVector := vector.MustFixedCol[types.TS](mustVectorFromProto(input.Vecs[7]))
//segmentIDVector := vector.MustFixedCol[types.Uuid](mustVectorFromProto(input.Vecs[8]))
memTruncTSVector := vector.MustFixedCol[types.TS](mustVectorFromProto(input.Vecs[9]))
defer deltaLocationVector.Free(pool)

vec = mustVectorFromProto(input.Vecs[7])
defer vec.Free(pool)
commitTimeVector := vector.MustFixedCol[types.TS](vec)

//vec = mustVectorFromProto(input.Vecs[8])
//defer vec.Free(pool)
//segmentIDVector := vector.MustFixedCol[types.Uuid](vec)

vec = mustVectorFromProto(input.Vecs[9])
defer vec.Free(pool)
memTruncTSVector := vector.MustFixedCol[types.TS](vec)

var numInserted, numDeleted int64
for i, blockID := range blockIDVector {
Expand Down Expand Up @@ -975,7 +1067,8 @@ func (p *PartitionState) objectDeleteHelper(
func (p *PartitionState) HandleMetadataDelete(
ctx context.Context,
tableID uint64,
input *api.Batch) {
input *api.Batch,
) {
ctx, task := trace.NewTask(ctx, "PartitionState.HandleMetadataDelete")
defer task.End()

Expand Down
18 changes: 9 additions & 9 deletions pkg/vm/engine/disttae/logtailreplay/rows_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestPartitionStateRowsIter(t *testing.T) {
mustVectorToProto(tsVec),
mustVectorToProto(vec1),
},
}, 0, packer)
}, 0, packer, pool)
}

// rows iter
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestPartitionStateRowsIter(t *testing.T) {
mustVectorToProto(tsVec),
mustVectorToProto(vec1),
},
}, 0, packer)
}, 0, packer, pool)
}

// rows iter
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestPartitionStateRowsIter(t *testing.T) {
mustVectorToProto(rowIDVec),
mustVectorToProto(tsVec),
},
}, packer)
}, packer, pool)
}

for i := 0; i < num; i++ {
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestPartitionStateRowsIter(t *testing.T) {
mustVectorToProto(rowIDVec),
mustVectorToProto(tsVec),
},
}, packer)
}, packer, pool)
}

for i := 0; i < num; i++ {
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestInsertAndDeleteAtTheSameTimestamp(t *testing.T) {
mustVectorToProto(tsVec),
mustVectorToProto(vec1),
},
}, 0, packer)
}, 0, packer, pool)
}

{
Expand All @@ -313,7 +313,7 @@ func TestInsertAndDeleteAtTheSameTimestamp(t *testing.T) {
mustVectorToProto(rowIDVec),
mustVectorToProto(tsVec),
},
}, packer)
}, packer, pool)
}

{
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestDeleteBeforeInsertAtTheSameTime(t *testing.T) {
mustVectorToProto(rowIDVec),
mustVectorToProto(tsVec),
},
}, packer)
}, packer, pool)
}

{
Expand All @@ -405,7 +405,7 @@ func TestDeleteBeforeInsertAtTheSameTime(t *testing.T) {
mustVectorToProto(tsVec),
mustVectorToProto(vec1),
},
}, 0, packer)
}, 0, packer, pool)
}

{
Expand Down Expand Up @@ -480,7 +480,7 @@ func TestPrimaryKeyModifiedWithDeleteOnly(t *testing.T) {
mustVectorToProto(tsVec),
mustVectorToProto(primaryKeyVec), // with primary key
},
}, packer)
}, packer, pool)
}

// should be detectable
Expand Down

0 comments on commit 7bc7dba

Please sign in to comment.