diff --git a/pkg/cnservice/server.go b/pkg/cnservice/server.go index ef1ec435f4062..6deeb2bf509e7 100644 --- a/pkg/cnservice/server.go +++ b/pkg/cnservice/server.go @@ -614,7 +614,7 @@ func (s *service) getTxnSender() (sender rpc.TxnSender, err error) { resp.CNOpResponse = &txn.CNOpResponse{Payload: payload} } case txn.TxnMethod_Commit: - _, err = storage.Commit(ctx, req.Txn) + _, err = storage.Commit(ctx, req.Txn, nil, nil) if err == nil { resp.Txn.Status = txn.TxnStatus_Committed } diff --git a/pkg/tnservice/store_rpc_handler.go b/pkg/tnservice/store_rpc_handler.go index 1105c193f540b..d49d5a2c6dcd4 100644 --- a/pkg/tnservice/store_rpc_handler.go +++ b/pkg/tnservice/store_rpc_handler.go @@ -110,16 +110,7 @@ func (s *store) handleCommit(ctx context.Context, request *txn.TxnRequest, respo return nil } r.waitStarted() - if request.CommitRequest != nil { - for _, req := range request.CommitRequest.Payload { - //response is shared by all requests - prepareResponse(req, response) - err := s.handleWrite(ctx, req, response) - if err != nil { - return err - } - } - } + prepareResponse(request, response) return r.service.Commit(ctx, request, response) } diff --git a/pkg/txn/service/service_cn_handler.go b/pkg/txn/service/service_cn_handler.go index 62aaa5cc3b9c6..a8f5b6731a8a7 100644 --- a/pkg/txn/service/service_cn_handler.go +++ b/pkg/txn/service/service_cn_handler.go @@ -217,7 +217,8 @@ func (s *service) Commit(ctx context.Context, request *txn.TxnRequest, response } txnID := request.Txn.ID - txnCtx := s.getTxnContext(txnID) + txnCtx, _ := s.maybeAddTxn(request.Txn) + if txnCtx == nil { util.LogTxnNotFoundOn(s.logger, request.Txn, s.shard) response.TxnError = txn.WrapError(moerr.NewTNShardNotFound(ctx, "", request.GetTargetTN().ShardID), 0) @@ -261,7 +262,7 @@ func (s *service) Commit(ctx context.Context, request *txn.TxnRequest, response if len(newTxn.TNShards) == 1 { util.LogTxnStart1PCCommit(s.logger, newTxn) - commitTS, err := s.storage.Commit(ctx, newTxn) + commitTS, err := s.storage.Commit(ctx, newTxn, response, request.CommitRequest) v2.TxnTNCommitHandledCounter.Inc() if err != nil { util.LogTxnStart1PCCommitFailed(s.logger, newTxn, err) @@ -469,7 +470,7 @@ func (s *service) startAsyncCommitTask(txnCtx *txnContext) error { util.TxnIDFieldWithID(txnMeta.ID)) } - if _, err := s.storage.Commit(ctx, txnMeta); err != nil { + if _, err := s.storage.Commit(ctx, txnMeta, nil, nil); err != nil { err = moerr.AttachCause(ctx, err) s.logger.Fatal("commit failed after prepared", util.TxnIDFieldWithID(txnMeta.ID), diff --git a/pkg/txn/service/service_dn_handler.go b/pkg/txn/service/service_dn_handler.go index 60bb7786f051a..734c7de3fd38c 100644 --- a/pkg/txn/service/service_dn_handler.go +++ b/pkg/txn/service/service_dn_handler.go @@ -147,7 +147,7 @@ func (s *service) CommitTNShard(ctx context.Context, request *txn.TxnRequest, re } newTxn.CommitTS = request.Txn.CommitTS - if _, err := s.storage.Commit(ctx, newTxn); err != nil { + if _, err := s.storage.Commit(ctx, newTxn, response, request.CommitRequest); err != nil { response.TxnError = txn.WrapError(err, moerr.ErrTAECommit) return nil } diff --git a/pkg/txn/storage/mem/kv_txn_storage.go b/pkg/txn/storage/mem/kv_txn_storage.go index 9f1ee55ce4ec7..0d621644a21b7 100644 --- a/pkg/txn/storage/mem/kv_txn_storage.go +++ b/pkg/txn/storage/mem/kv_txn_storage.go @@ -330,7 +330,12 @@ func (kv *KVTxnStorage) Committing(ctx context.Context, txnMeta txn.TxnMeta) err return nil } -func (kv *KVTxnStorage) Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) { +func (kv *KVTxnStorage) Commit( + ctx context.Context, + txnMeta txn.TxnMeta, + response *txn.TxnResponse, + commitRequests *txn.TxnCommitRequest, +) (timestamp.Timestamp, error) { kv.Lock() defer kv.Unlock() diff --git a/pkg/txn/storage/mem/kv_txn_storage_test.go b/pkg/txn/storage/mem/kv_txn_storage_test.go index 1f3d743f374ac..198c61d3f40a8 100644 --- a/pkg/txn/storage/mem/kv_txn_storage_test.go +++ b/pkg/txn/storage/mem/kv_txn_storage_test.go @@ -320,7 +320,7 @@ func committingTestTxn(t *testing.T, s *KVTxnStorage, wTxn *txn.TxnMeta, ts int6 func commitTestTxn(t *testing.T, s *KVTxnStorage, wTxn *txn.TxnMeta, ts int64, errCode uint16) { wTxn.CommitTS = newTimestamp(ts) - _, e := s.Commit(context.TODO(), *wTxn) + _, e := s.Commit(context.TODO(), *wTxn, nil, nil) assert.True(t, moerr.IsMoErrCode(e, errCode)) wTxn.Status = txn.TxnStatus_Committed } diff --git a/pkg/txn/storage/memorystorage/dynamic_storage.go b/pkg/txn/storage/memorystorage/dynamic_storage.go index f7b24832a7d25..646e3f945c6a6 100644 --- a/pkg/txn/storage/memorystorage/dynamic_storage.go +++ b/pkg/txn/storage/memorystorage/dynamic_storage.go @@ -59,12 +59,17 @@ func (d *DynamicStorage) Close(ctx context.Context) error { return storage.Close(ctx) } -func (d *DynamicStorage) Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) { +func (d *DynamicStorage) Commit( + ctx context.Context, + txnMeta txn.TxnMeta, + response *txn.TxnResponse, + commitRequests *txn.TxnCommitRequest, +) (timestamp.Timestamp, error) { storage, err := d.get(ctx) if err != nil { return timestamp.Timestamp{}, err } - return storage.Commit(ctx, txnMeta) + return storage.Commit(ctx, txnMeta, response, commitRequests) } func (d *DynamicStorage) Committing(ctx context.Context, txnMeta txn.TxnMeta) error { diff --git a/pkg/txn/storage/memorystorage/storage.go b/pkg/txn/storage/memorystorage/storage.go index be501979aee39..4467686802e5c 100644 --- a/pkg/txn/storage/memorystorage/storage.go +++ b/pkg/txn/storage/memorystorage/storage.go @@ -38,7 +38,12 @@ func New( var _ storage.TxnStorage = new(Storage) -func (s *Storage) Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) { +func (s *Storage) Commit( + ctx context.Context, + txnMeta txn.TxnMeta, + response *txn.TxnResponse, + commitRequests *txn.TxnCommitRequest, +) (timestamp.Timestamp, error) { return s.handler.HandleCommit(ctx, txnMeta) } diff --git a/pkg/txn/storage/memorystorage/storage_test.go b/pkg/txn/storage/memorystorage/storage_test.go index efce2d23c6579..af0ea9480b190 100644 --- a/pkg/txn/storage/memorystorage/storage_test.go +++ b/pkg/txn/storage/memorystorage/storage_test.go @@ -58,7 +58,7 @@ func testDatabase( }, } defer func() { - _, err := s.Commit(ctx, txnMeta) + _, err := s.Commit(ctx, txnMeta, nil, nil) assert.Nil(t, err) }() diff --git a/pkg/txn/storage/memorystorage/storage_txn_client.go b/pkg/txn/storage/memorystorage/storage_txn_client.go index 553953b6a2947..609e5f7de628e 100644 --- a/pkg/txn/storage/memorystorage/storage_txn_client.go +++ b/pkg/txn/storage/memorystorage/storage_txn_client.go @@ -198,7 +198,7 @@ func (s *StorageTxnOperator) Debug(ctx context.Context, ops []txn.TxnRequest) (* func (s *StorageTxnOperator) Commit(ctx context.Context) error { for _, storage := range s.storages { - if _, err := storage.Commit(ctx, s.meta); err != nil { + if _, err := storage.Commit(ctx, s.meta, nil, nil); err != nil { return err } } diff --git a/pkg/txn/storage/tae/storage.go b/pkg/txn/storage/tae/storage.go index 3bf717ed60ced..bbab55edbb8e8 100644 --- a/pkg/txn/storage/tae/storage.go +++ b/pkg/txn/storage/tae/storage.go @@ -86,8 +86,13 @@ func (s *taeStorage) Close(ctx context.Context) error { } // Commit implements storage.TxnTAEStorage -func (s *taeStorage) Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) { - return s.taeHandler.HandleCommit(ctx, txnMeta) +func (s *taeStorage) Commit( + ctx context.Context, + txnMeta txn.TxnMeta, + response *txn.TxnResponse, + commitRequests *txn.TxnCommitRequest, +) (timestamp.Timestamp, error) { + return s.taeHandler.HandleCommit(ctx, txnMeta, response, commitRequests) } // Committing implements storage.TxnTAEStorage diff --git a/pkg/txn/storage/tae/write.go b/pkg/txn/storage/tae/write.go index 393f25fcb0b28..355810301f76d 100644 --- a/pkg/txn/storage/tae/write.go +++ b/pkg/txn/storage/tae/write.go @@ -16,7 +16,6 @@ package taestorage import ( "context" - "github.com/matrixorigin/matrixone/pkg/common/moerr" apipb "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/pb/txn" @@ -28,9 +27,8 @@ func (s *taeStorage) Write( txnMeta txn.TxnMeta, op uint32, payload []byte) (result []byte, err error) { + switch op { - case uint32(apipb.OpCode_OpPreCommit): - return HandleWrite(ctx, txnMeta, payload, s.taeHandler.HandlePreCommitWrite) case uint32(apipb.OpCode_OpCommitMerge): return HandleWrite(ctx, txnMeta, payload, s.taeHandler.HandleCommitMerge) default: diff --git a/pkg/txn/storage/types.go b/pkg/txn/storage/types.go index 2859a26b28d56..6c2ff312a23a7 100644 --- a/pkg/txn/storage/types.go +++ b/pkg/txn/storage/types.go @@ -59,7 +59,11 @@ type TxnStorage interface { // of the transaction is logged to the LogService. Committing(ctx context.Context, txnMeta txn.TxnMeta) error // Commit commit the transaction. TxnStorage needs to do conflict locally. - Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) + Commit( + ctx context.Context, + txnMeta txn.TxnMeta, + response *txn.TxnResponse, + commitRequests *txn.TxnCommitRequest) (timestamp.Timestamp, error) // Rollback rollback the transaction. Rollback(ctx context.Context, txnMeta txn.TxnMeta) error diff --git a/pkg/vm/engine/tae/iface/rpchandle/handler.go b/pkg/vm/engine/tae/iface/rpchandle/handler.go index 46e93e56ef519..78d9f183cf139 100644 --- a/pkg/vm/engine/tae/iface/rpchandle/handler.go +++ b/pkg/vm/engine/tae/iface/rpchandle/handler.go @@ -27,6 +27,8 @@ type Handler interface { HandleCommit( ctx context.Context, meta txn.TxnMeta, + response *txn.TxnResponse, + commitRequests *txn.TxnCommitRequest, ) (timestamp.Timestamp, error) HandleRollback( diff --git a/pkg/vm/engine/tae/rpc/base_test.go b/pkg/vm/engine/tae/rpc/base_test.go index 66915aeccc5ad..6c09fa2d1efb8 100644 --- a/pkg/vm/engine/tae/rpc/base_test.go +++ b/pkg/vm/engine/tae/rpc/base_test.go @@ -70,7 +70,7 @@ func (h *mockHandle) HandleCommit(ctx context.Context, meta *txn.TxnMeta) (times if len(meta.TNShards) > 1 && meta.CommitTS.IsEmpty() { meta.CommitTS = meta.PreparedTS.Next() } - return h.Handle.HandleCommit(ctx, *meta) + return h.Handle.HandleCommit(ctx, *meta, nil, nil) } func (h *mockHandle) HandleCommitting(ctx context.Context, meta *txn.TxnMeta) error { diff --git a/pkg/vm/engine/tae/rpc/handle.go b/pkg/vm/engine/tae/rpc/handle.go index 06469ce34a946..68208f0f682d6 100644 --- a/pkg/vm/engine/tae/rpc/handle.go +++ b/pkg/vm/engine/tae/rpc/handle.go @@ -53,8 +53,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" - "go.uber.org/zap" ) @@ -64,9 +62,10 @@ const ( ) type Handle struct { - db *db.DB + db *db.DB + // only used for UT txnCtxs *common.Map[string, *txnContext] - GCJob *tasks.CancelableJob + //GCJob *tasks.CancelableJob interceptMatchRegexp atomic.Pointer[regexp.Regexp] } @@ -120,18 +119,9 @@ func NewTAEHandle(ctx context.Context, path string, opt *options.Options) *Handl h := &Handle{ db: tae, } + h.txnCtxs = common.NewMap[string, *txnContext](runtime.GOMAXPROCS(0)) h.interceptMatchRegexp.Store(regexp.MustCompile(`.*bmsql_stock.*`)) - h.GCJob = tasks.NewCancelableCronJob( - "clean-txn-cache", - MAX_TXN_COMMIT_LATENCY, - func(ctx context.Context) { - h.GCCache(time.Now()) - }, - true, - 1, - ) - h.GCJob.Start() return h } @@ -164,29 +154,6 @@ func (h *Handle) UpdateInterceptMatchRegexp(name string) { h.interceptMatchRegexp.Store(regexp.MustCompile(fmt.Sprintf(`.*%s.*`, name))) } -// TODO: vast items within h.mu.txnCtxs would incur performance penality. -func (h *Handle) GCCache(now time.Time) error { - - var ( - cnt, deleteCnt int - ) - - h.txnCtxs.DeleteIf(func(k string, v *txnContext) bool { - cnt++ - ok := v.deadline.Before(now) - if ok { - deleteCnt++ - } - return ok - }) - logutil.Info( - "GC-RPC-Cache", - zap.Int("total", cnt), - zap.Int("deleted", deleteCnt), - ) - return nil -} - func (h *Handle) CacheTxnRequest( ctx context.Context, meta txn.TxnMeta, @@ -274,23 +241,114 @@ func (h *Handle) tryLockMergeForBulkDelete(reqs []any, txn txnif.AsyncTxn) (rele return } +type txnCommitRequestsIter struct { + cursor int + curNorReq *api.PrecommitWriteCmd + commitRequests *txn.TxnCommitRequest + + // cache requests only used in ut + cached []any +} + +func (h *Handle) newTxnCommitRequestsIter( + cr *txn.TxnCommitRequest, + meta txn.TxnMeta, +) *txnCommitRequestsIter { + + // in the normal commit processes, the new logic won't cache the write requests anymore. + // however, there exist massive ut code that verified the preCommit-commit 2PC logic, + // which cached the write requests in the preCommit call. + // to keep that, there also leave the commiting code of the cached requests un-changed, but only for ut. + if cr == nil { + // for now, only test will into this logic + key := util.UnsafeBytesToString(meta.GetID()) + txnCtx, ok := h.txnCtxs.Load(key) + if !ok { + // no requests + return nil + } + + defer h.txnCtxs.Delete(key) + + return &txnCommitRequestsIter{ + cursor: 0, + cached: txnCtx.reqs, + commitRequests: nil, + } + + } else { + return &txnCommitRequestsIter{ + cursor: 0, + cached: nil, + commitRequests: cr, + } + } +} + +func (cri *txnCommitRequestsIter) Next() bool { + if cri.commitRequests == nil { + return cri.cursor < len(cri.cached) + } + return cri.cursor < len(cri.commitRequests.Payload) +} + +func (cri *txnCommitRequestsIter) Entry() (entry any, err error) { + + if cri.commitRequests == nil { + entry = cri.cached[cri.cursor] + cri.cursor++ + return + } + + cnReq := cri.commitRequests.Payload[cri.cursor].CNRequest + + if cri.curNorReq == nil { + cri.curNorReq = new(api.PrecommitWriteCmd) + } + + if len(cri.curNorReq.EntryList) == 0 { + if err = cri.curNorReq.UnmarshalBinary(cnReq.Payload); err != nil { + return + } + } + + entry, cri.curNorReq.EntryList, err = pkgcatalog.ParseEntryList(cri.curNorReq.EntryList) + if len(cri.curNorReq.EntryList) == 0 { + cri.cursor++ + } + + return +} + func (h *Handle) handleRequests( ctx context.Context, txn txnif.AsyncTxn, - txnCtx *txnContext, + commitRequests *txn.TxnCommitRequest, + response *txn.TxnResponse, + txnMeta txn.TxnMeta, ) (releaseF []func(), hasDDL bool, err error) { + var ( + entry any + + iter *txnCommitRequestsIter + inMemoryInsertRows int persistedMemoryInsertRows int inMemoryTombstoneRows int persistedTombstoneRows int ) - releaseF, err = h.tryLockMergeForBulkDelete(txnCtx.reqs, txn) - if err != nil { - logutil.Warn("failed to lock merging", zap.Error(err)) + + if iter = h.newTxnCommitRequestsIter(commitRequests, txnMeta); iter == nil { + return } - for _, e := range txnCtx.reqs { - switch req := e.(type) { + + for iter.Next() { + if entry, err = iter.Entry(); err != nil { + return + } + + switch req := entry.(type) { case *pkgcatalog.CreateDatabaseReq: hasDDL = true err = h.HandleCreateDatabase(ctx, txn, req) @@ -306,18 +364,45 @@ func (h *Handle) handleRequests( case *api.AlterTableReq: hasDDL = true err = h.HandleAlterTable(ctx, txn, req) - case *cmd_util.WriteReq: + case []*api.AlterTableReq: + hasDDL = true + for _, r := range req { + if err = h.HandleAlterTable(ctx, txn, r); err != nil { + return + } + } + + case *cmd_util.WriteReq, *api.Entry: + var wr *cmd_util.WriteReq + if ae, ok := req.(*api.Entry); ok { + wr = h.apiEntryToWriteEntry(ctx, txnMeta, ae, true) + } else { + wr = req.(*cmd_util.WriteReq) + } + + if wr.Type == cmd_util.EntryDelete { + var f []func() + if f, err = h.tryLockMergeForBulkDelete([]any{req}, txn); err != nil { + logutil.Warn("failed to lock merging", zap.Error(err)) + return + } + + releaseF = append(releaseF, f...) + } + var r1, r2, r3, r4 int - r1, r2, r3, r4, err = h.HandleWrite(ctx, txn, req) + r1, r2, r3, r4, err = h.HandleWrite(ctx, txn, wr) if err == nil { inMemoryInsertRows += r1 persistedMemoryInsertRows += r2 inMemoryTombstoneRows += r3 persistedTombstoneRows += r4 } + default: err = moerr.NewNotSupportedf(ctx, "unknown txn request type: %T", req) } + //Need to roll back the txn. if err != nil { txn.Rollback(ctx) @@ -342,7 +427,57 @@ func (h *Handle) handleRequests( //#region Impl TxnStorage interface //order by call frequency +func (h *Handle) apiEntryToWriteEntry( + ctx context.Context, + meta txn.TxnMeta, + pe *api.Entry, + prefetch bool, +) *cmd_util.WriteReq { + + moBat, err := batch.ProtoBatchToBatch(pe.GetBat()) + if err != nil { + panic(err) + } + req := &cmd_util.WriteReq{ + Type: cmd_util.EntryType(pe.EntryType), + DatabaseId: pe.GetDatabaseId(), + TableID: pe.GetTableId(), + DatabaseName: pe.GetDatabaseName(), + TableName: pe.GetTableName(), + FileName: pe.GetFileName(), + Batch: moBat, + PkCheck: cmd_util.PKCheckType(pe.GetPkCheckByTn()), + } + + if req.FileName != "" { + col := req.Batch.Vecs[0] + for i := 0; i < req.Batch.RowCount(); i++ { + stats := objectio.ObjectStats(col.GetBytesAt(i)) + if req.Type == cmd_util.EntryInsert { + req.DataObjectStats = append(req.DataObjectStats, stats) + } else { + req.TombstoneStats = append(req.TombstoneStats, stats) + } + } + } + + if prefetch { + if req.Type == cmd_util.EntryDelete { + if err = h.prefetchDeleteRowID(ctx, req, &meta); err != nil { + return nil + } + } else { + if err = h.prefetchMetadata(ctx, req, &meta); err != nil { + return nil + } + } + } + + return req +} + // HandlePreCommitWrite impls TxnStorage:Write +// only ut call this func (h *Handle) HandlePreCommitWrite( ctx context.Context, meta txn.TxnMeta, @@ -369,34 +504,8 @@ func (h *Handle) HandlePreCommitWrite( } case *api.Entry: //Handle DML - pe := e.(*api.Entry) - moBat, err := batch.ProtoBatchToBatch(pe.GetBat()) - if err != nil { - panic(err) - } - req := &cmd_util.WriteReq{ - Type: cmd_util.EntryType(pe.EntryType), - DatabaseId: pe.GetDatabaseId(), - TableID: pe.GetTableId(), - DatabaseName: pe.GetDatabaseName(), - TableName: pe.GetTableName(), - FileName: pe.GetFileName(), - Batch: moBat, - PkCheck: cmd_util.PKCheckType(pe.GetPkCheckByTn()), - } - - if req.FileName != "" { - col := req.Batch.Vecs[0] - for i := 0; i < req.Batch.RowCount(); i++ { - stats := objectio.ObjectStats(col.GetBytesAt(i)) - if req.Type == cmd_util.EntryInsert { - req.DataObjectStats = append(req.DataObjectStats, stats) - } else { - req.TombstoneStats = append(req.TombstoneStats, stats) - } - } - } - if err = h.CacheTxnRequest(ctx, meta, req); err != nil { + wr := h.apiEntryToWriteEntry(ctx, meta, e.(*api.Entry), false) + if err = h.CacheTxnRequest(ctx, meta, wr); err != nil { return err } default: @@ -411,9 +520,11 @@ func (h *Handle) HandlePreCommitWrite( func (h *Handle) HandleCommit( ctx context.Context, meta txn.TxnMeta, + response *txn.TxnResponse, + commitRequests *txn.TxnCommitRequest, ) (cts timestamp.Timestamp, err error) { start := time.Now() - txnCtx, ok := h.txnCtxs.Load(util.UnsafeBytesToString(meta.GetID())) + var ( txn txnif.AsyncTxn releaseF []func() @@ -423,10 +534,7 @@ func (h *Handle) HandleCommit( for _, f := range releaseF { f() } - if ok { - //delete the txn's context. - h.txnCtxs.Delete(util.UnsafeBytesToString(meta.GetID())) - } + common.DoIfInfoEnabled(func() { _, _, injected := fault.TriggerFault(objectio.FJ_CommitSlowLog) if time.Since(start) > MAX_ALLOWED_TXN_LATENCY || err != nil || hasDDL || injected { @@ -453,18 +561,17 @@ func (h *Handle) HandleCommit( } }) }() - if ok { - //Handle precommit-write command for 1PC - txn, err = h.db.GetOrCreateTxnWithMeta(nil, meta.GetID(), - types.TimestampToTS(meta.GetSnapshotTS())) - if err != nil { - return - } - releaseF, hasDDL, err = h.handleRequests(ctx, txn, txnCtx) - if err != nil { - return - } + + if txn, err = h.db.GetOrCreateTxnWithMeta( + nil, meta.GetID(), types.TimestampToTS(meta.GetSnapshotTS())); err != nil { + return } + + if releaseF, hasDDL, err = h.handleRequests( + ctx, txn, commitRequests, response, meta); err != nil { + return + } + txn, err = h.db.GetTxnByID(meta.GetID()) if err != nil { return @@ -498,7 +605,7 @@ func (h *Handle) HandleCommit( zap.String("new-txn", txn.GetID()), ) //Handle precommit-write command for 1PC - releaseF, hasDDL, err = h.handleRequests(ctx, txn, txnCtx) + releaseF, hasDDL, err = h.handleRequests(ctx, txn, commitRequests, response, meta) if err != nil && !moerr.IsMoErrCode(err, moerr.ErrTAENeedRetry) { break } @@ -556,9 +663,9 @@ func (h *Handle) HandleRollback( func (h *Handle) HandleClose(ctx context.Context) (err error) { //FIXME::should wait txn request's job done? - if h.GCJob != nil { - h.GCJob.Stop() - } + //if h.GCJob != nil { + // h.GCJob.Stop() + //} return h.db.Close() } diff --git a/pkg/vm/engine/tae/rpc/handle_2pc.go b/pkg/vm/engine/tae/rpc/handle_2pc.go index 13d7b4108ae36..adb0e5cbd11e8 100644 --- a/pkg/vm/engine/tae/rpc/handle_2pc.go +++ b/pkg/vm/engine/tae/rpc/handle_2pc.go @@ -17,7 +17,6 @@ package rpc import ( "context" - "github.com/matrixorigin/matrixone/pkg/common/util" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" "github.com/matrixorigin/matrixone/pkg/pb/txn" @@ -45,23 +44,16 @@ func (h *Handle) HandleCommitting( func (h *Handle) HandlePrepare( ctx context.Context, meta txn.TxnMeta) (pts timestamp.Timestamp, err error) { - txnCtx, ok := h.txnCtxs.Load(util.UnsafeBytesToString(meta.GetID())) var txn txnif.AsyncTxn - defer func() { - if ok { - //delete the txn's context. - h.txnCtxs.Delete(util.UnsafeBytesToString(meta.GetID())) - } - }() - if ok { - //handle pre-commit write for 2PC - txn, err = h.db.GetOrCreateTxnWithMeta(nil, meta.GetID(), - types.TimestampToTS(meta.GetSnapshotTS())) - if err != nil { - return - } - h.handleRequests(ctx, txn, txnCtx) + + //handle pre-commit write for 2PC + txn, err = h.db.GetOrCreateTxnWithMeta(nil, meta.GetID(), + types.TimestampToTS(meta.GetSnapshotTS())) + if err != nil { + return } + h.handleRequests(ctx, txn, nil, nil, meta) + txn, err = h.db.GetTxnByID(meta.GetID()) if err != nil { return timestamp.Timestamp{}, err diff --git a/pkg/vm/engine/tae/rpc/handle_test.go b/pkg/vm/engine/tae/rpc/handle_test.go index de28d4a318f94..9b4e3cfb3cf4a 100644 --- a/pkg/vm/engine/tae/rpc/handle_test.go +++ b/pkg/vm/engine/tae/rpc/handle_test.go @@ -16,47 +16,17 @@ package rpc import ( "context" - "testing" - "time" - "github.com/matrixorigin/matrixone/pkg/container/types" apipb "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/pb/txn" "github.com/matrixorigin/matrixone/pkg/vm/engine/cmd_util" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/testutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/stretchr/testify/require" + "testing" ) -func TestHandleGCCache(t *testing.T) { - now := time.Now() - expired := now.Add(-MAX_TXN_COMMIT_LATENCY).Add(-time.Second) - - handle := Handle{} - handle.txnCtxs = common.NewMap[string, *txnContext](10) - handle.txnCtxs.Store("now", &txnContext{ - deadline: now, - }) - handle.txnCtxs.Store("expired", &txnContext{ - deadline: expired, - }) - handle.GCCache(now) - - cnt := 0 - handle.txnCtxs.Range(func(key string, value *txnContext) bool { - cnt++ - return true - }) - - require.Equal(t, 1, cnt) - _, ok := handle.txnCtxs.Load("expired") - require.False(t, ok) - _, ok = handle.txnCtxs.Load("now") - require.True(t, ok) -} - func TestHandleInspectPolicy(t *testing.T) { handle := mockTAEHandle(context.Background(), t, &options.Options{}) asyncTxn, err := handle.db.StartTxn(nil) diff --git a/pkg/vm/engine/test/testutil/tae_engine.go b/pkg/vm/engine/test/testutil/tae_engine.go index b2e8ae75954c7..79783a06a3009 100644 --- a/pkg/vm/engine/test/testutil/tae_engine.go +++ b/pkg/vm/engine/test/testutil/tae_engine.go @@ -70,7 +70,6 @@ func (ts *TestTxnStorage) Close(destroy bool) error { firstErr = err } } - ts.txnHandler.GCJob.Stop() blockio.Stop("") return firstErr } @@ -98,20 +97,9 @@ func (ts *TestTxnStorage) Commit(ctx context.Context, request *txn.TxnRequest, r resp.Txn = &req.Txn } - if request.CommitRequest != nil { - for _, req := range request.CommitRequest.Payload { - //response is shared by all requests - prepareResponse(req, response) - err := ts.Write(ctx, req, response) - if err != nil { - return err - } - } - } - prepareResponse(request, response) - cts, err := ts.txnHandler.HandleCommit(ctx, request.Txn) + cts, err := ts.txnHandler.HandleCommit(ctx, request.Txn, response, request.CommitRequest) if err == nil { response.Txn.Status = txn.TxnStatus_Committed response.Txn.CommitTS = cts