Skip to content

Commit

Permalink
remove the preCommitWrite in the commit routine. (#20819)
Browse files Browse the repository at this point in the history
1. pre commit write is designed for 2pc, current TN is running in the 1PC  mode.

Approved by: @daviszhen, @zhangxu19830126, @w-zr, @XuPeng-SH, @reusee
  • Loading branch information
gouhongshen authored Dec 23, 2024
1 parent a5fbab6 commit ba1b490
Show file tree
Hide file tree
Showing 19 changed files with 255 additions and 182 deletions.
2 changes: 1 addition & 1 deletion pkg/cnservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func (s *service) getTxnSender() (sender rpc.TxnSender, err error) {
resp.CNOpResponse = &txn.CNOpResponse{Payload: payload}
}
case txn.TxnMethod_Commit:
_, err = storage.Commit(ctx, req.Txn)
_, err = storage.Commit(ctx, req.Txn, nil, nil)
if err == nil {
resp.Txn.Status = txn.TxnStatus_Committed
}
Expand Down
11 changes: 1 addition & 10 deletions pkg/tnservice/store_rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,7 @@ func (s *store) handleCommit(ctx context.Context, request *txn.TxnRequest, respo
return nil
}
r.waitStarted()
if request.CommitRequest != nil {
for _, req := range request.CommitRequest.Payload {
//response is shared by all requests
prepareResponse(req, response)
err := s.handleWrite(ctx, req, response)
if err != nil {
return err
}
}
}

prepareResponse(request, response)
return r.service.Commit(ctx, request, response)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/txn/service/service_cn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ func (s *service) Commit(ctx context.Context, request *txn.TxnRequest, response
}

txnID := request.Txn.ID
txnCtx := s.getTxnContext(txnID)
txnCtx, _ := s.maybeAddTxn(request.Txn)

if txnCtx == nil {
util.LogTxnNotFoundOn(s.logger, request.Txn, s.shard)
response.TxnError = txn.WrapError(moerr.NewTNShardNotFound(ctx, "", request.GetTargetTN().ShardID), 0)
Expand Down Expand Up @@ -261,7 +262,7 @@ func (s *service) Commit(ctx context.Context, request *txn.TxnRequest, response
if len(newTxn.TNShards) == 1 {
util.LogTxnStart1PCCommit(s.logger, newTxn)

commitTS, err := s.storage.Commit(ctx, newTxn)
commitTS, err := s.storage.Commit(ctx, newTxn, response, request.CommitRequest)
v2.TxnTNCommitHandledCounter.Inc()
if err != nil {
util.LogTxnStart1PCCommitFailed(s.logger, newTxn, err)
Expand Down Expand Up @@ -469,7 +470,7 @@ func (s *service) startAsyncCommitTask(txnCtx *txnContext) error {
util.TxnIDFieldWithID(txnMeta.ID))
}

if _, err := s.storage.Commit(ctx, txnMeta); err != nil {
if _, err := s.storage.Commit(ctx, txnMeta, nil, nil); err != nil {
err = moerr.AttachCause(ctx, err)
s.logger.Fatal("commit failed after prepared",
util.TxnIDFieldWithID(txnMeta.ID),
Expand Down
2 changes: 1 addition & 1 deletion pkg/txn/service/service_dn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *service) CommitTNShard(ctx context.Context, request *txn.TxnRequest, re
}

newTxn.CommitTS = request.Txn.CommitTS
if _, err := s.storage.Commit(ctx, newTxn); err != nil {
if _, err := s.storage.Commit(ctx, newTxn, response, request.CommitRequest); err != nil {
response.TxnError = txn.WrapError(err, moerr.ErrTAECommit)
return nil
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/txn/storage/mem/kv_txn_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,12 @@ func (kv *KVTxnStorage) Committing(ctx context.Context, txnMeta txn.TxnMeta) err
return nil
}

func (kv *KVTxnStorage) Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) {
func (kv *KVTxnStorage) Commit(
ctx context.Context,
txnMeta txn.TxnMeta,
response *txn.TxnResponse,
commitRequests *txn.TxnCommitRequest,
) (timestamp.Timestamp, error) {
kv.Lock()
defer kv.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion pkg/txn/storage/mem/kv_txn_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func committingTestTxn(t *testing.T, s *KVTxnStorage, wTxn *txn.TxnMeta, ts int6

func commitTestTxn(t *testing.T, s *KVTxnStorage, wTxn *txn.TxnMeta, ts int64, errCode uint16) {
wTxn.CommitTS = newTimestamp(ts)
_, e := s.Commit(context.TODO(), *wTxn)
_, e := s.Commit(context.TODO(), *wTxn, nil, nil)
assert.True(t, moerr.IsMoErrCode(e, errCode))
wTxn.Status = txn.TxnStatus_Committed
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/txn/storage/memorystorage/dynamic_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,17 @@ func (d *DynamicStorage) Close(ctx context.Context) error {
return storage.Close(ctx)
}

func (d *DynamicStorage) Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) {
func (d *DynamicStorage) Commit(
ctx context.Context,
txnMeta txn.TxnMeta,
response *txn.TxnResponse,
commitRequests *txn.TxnCommitRequest,
) (timestamp.Timestamp, error) {
storage, err := d.get(ctx)
if err != nil {
return timestamp.Timestamp{}, err
}
return storage.Commit(ctx, txnMeta)
return storage.Commit(ctx, txnMeta, response, commitRequests)
}

func (d *DynamicStorage) Committing(ctx context.Context, txnMeta txn.TxnMeta) error {
Expand Down
7 changes: 6 additions & 1 deletion pkg/txn/storage/memorystorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ func New(

var _ storage.TxnStorage = new(Storage)

func (s *Storage) Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) {
func (s *Storage) Commit(
ctx context.Context,
txnMeta txn.TxnMeta,
response *txn.TxnResponse,
commitRequests *txn.TxnCommitRequest,
) (timestamp.Timestamp, error) {
return s.handler.HandleCommit(ctx, txnMeta)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/txn/storage/memorystorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func testDatabase(
},
}
defer func() {
_, err := s.Commit(ctx, txnMeta)
_, err := s.Commit(ctx, txnMeta, nil, nil)
assert.Nil(t, err)
}()

Expand Down
2 changes: 1 addition & 1 deletion pkg/txn/storage/memorystorage/storage_txn_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (s *StorageTxnOperator) Debug(ctx context.Context, ops []txn.TxnRequest) (*

func (s *StorageTxnOperator) Commit(ctx context.Context) error {
for _, storage := range s.storages {
if _, err := storage.Commit(ctx, s.meta); err != nil {
if _, err := storage.Commit(ctx, s.meta, nil, nil); err != nil {
return err
}
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/txn/storage/tae/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,13 @@ func (s *taeStorage) Close(ctx context.Context) error {
}

// Commit implements storage.TxnTAEStorage
func (s *taeStorage) Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) {
return s.taeHandler.HandleCommit(ctx, txnMeta)
func (s *taeStorage) Commit(
ctx context.Context,
txnMeta txn.TxnMeta,
response *txn.TxnResponse,
commitRequests *txn.TxnCommitRequest,
) (timestamp.Timestamp, error) {
return s.taeHandler.HandleCommit(ctx, txnMeta, response, commitRequests)
}

// Committing implements storage.TxnTAEStorage
Expand Down
4 changes: 1 addition & 3 deletions pkg/txn/storage/tae/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package taestorage

import (
"context"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
apipb "github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
Expand All @@ -28,9 +27,8 @@ func (s *taeStorage) Write(
txnMeta txn.TxnMeta,
op uint32,
payload []byte) (result []byte, err error) {

switch op {
case uint32(apipb.OpCode_OpPreCommit):
return HandleWrite(ctx, txnMeta, payload, s.taeHandler.HandlePreCommitWrite)
case uint32(apipb.OpCode_OpCommitMerge):
return HandleWrite(ctx, txnMeta, payload, s.taeHandler.HandleCommitMerge)
default:
Expand Down
6 changes: 5 additions & 1 deletion pkg/txn/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ type TxnStorage interface {
// of the transaction is logged to the LogService.
Committing(ctx context.Context, txnMeta txn.TxnMeta) error
// Commit commit the transaction. TxnStorage needs to do conflict locally.
Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error)
Commit(
ctx context.Context,
txnMeta txn.TxnMeta,
response *txn.TxnResponse,
commitRequests *txn.TxnCommitRequest) (timestamp.Timestamp, error)
// Rollback rollback the transaction.
Rollback(ctx context.Context, txnMeta txn.TxnMeta) error

Expand Down
2 changes: 2 additions & 0 deletions pkg/vm/engine/tae/iface/rpchandle/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Handler interface {
HandleCommit(
ctx context.Context,
meta txn.TxnMeta,
response *txn.TxnResponse,
commitRequests *txn.TxnCommitRequest,
) (timestamp.Timestamp, error)

HandleRollback(
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/rpc/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (h *mockHandle) HandleCommit(ctx context.Context, meta *txn.TxnMeta) (times
if len(meta.TNShards) > 1 && meta.CommitTS.IsEmpty() {
meta.CommitTS = meta.PreparedTS.Next()
}
return h.Handle.HandleCommit(ctx, *meta)
return h.Handle.HandleCommit(ctx, *meta, nil, nil)
}

func (h *mockHandle) HandleCommitting(ctx context.Context, meta *txn.TxnMeta) error {
Expand Down
Loading

0 comments on commit ba1b490

Please sign in to comment.