Skip to content

Commit

Permalink
Merge branch 'main' into fix-panic-report
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Dec 23, 2024
2 parents 0bd78ba + ba1b490 commit d2ddd54
Show file tree
Hide file tree
Showing 33 changed files with 783 additions and 242 deletions.
2 changes: 1 addition & 1 deletion pkg/cnservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func (s *service) getTxnSender() (sender rpc.TxnSender, err error) {
resp.CNOpResponse = &txn.CNOpResponse{Payload: payload}
}
case txn.TxnMethod_Commit:
_, err = storage.Commit(ctx, req.Txn)
_, err = storage.Commit(ctx, req.Txn, nil, nil)
if err == nil {
resp.Txn.Status = txn.TxnStatus_Committed
}
Expand Down
29 changes: 28 additions & 1 deletion pkg/frontend/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,9 @@ func checkRestorePriv(ctx context.Context, ses *Session, snapshot *snapshotRecor
return moerr.NewInternalError(ctx, "non-sys account's snapshot can't restore to sys account")
}
}
if snapshot.level == tree.RESTORELEVELDATABASE.String() || snapshot.level == tree.RESTORELEVELTABLE.String() {
return moerr.NewInternalError(ctx, "can't restore account from db or table level snapshot")
}
case tree.RESTORELEVELDATABASE:
dbname := string(stmt.DatabaseName)
if len(dbname) > 0 && needSkipDb(dbname) {
Expand All @@ -678,6 +681,15 @@ func checkRestorePriv(ctx context.Context, ses *Session, snapshot *snapshotRecor
if snapshot.level == tree.RESTORELEVELCLUSTER.String() {
return moerr.NewInternalError(ctx, "can't restore db from cluster level snapshot")
}
if snapshot.level == tree.RESTORELEVELTABLE.String() {
return moerr.NewInternalError(ctx, "can't restore db from table level snapshot")
}
if string(stmt.AccountName) != ses.GetTenantInfo().GetTenant() {
return moerr.NewInternalError(ctx, "can't restore database from other account's snapshot")
}
if snapshot.level == tree.RESTORELEVELDATABASE.String() && snapshot.databaseName != string(stmt.DatabaseName) {
return moerr.NewInternalErrorf(ctx, "databaseName(%v) does not match snapshot.databaseName(%v)", string(stmt.DatabaseName), snapshot.databaseName)
}
case tree.RESTORELEVELTABLE:
dbname := string(stmt.DatabaseName)
if len(dbname) > 0 && needSkipDb(dbname) {
Expand All @@ -686,6 +698,14 @@ func checkRestorePriv(ctx context.Context, ses *Session, snapshot *snapshotRecor
if snapshot.level == tree.RESTORELEVELCLUSTER.String() {
return moerr.NewInternalError(ctx, "can't restore db from cluster level snapshot")
}
if string(stmt.AccountName) != ses.GetTenantInfo().GetTenant() {
return moerr.NewInternalError(ctx, "can't restore table from other account's snapshot")
}
if snapshot.level == tree.RESTORELEVELTABLE.String() {
if snapshot.databaseName != string(stmt.DatabaseName) || snapshot.tableName != string(stmt.TableName) {
return moerr.NewInternalErrorf(ctx, "tableName(%v) does not match snapshot.tableName(%v)", string(stmt.TableName), snapshot.tableName)
}
}
default:
return moerr.NewInternalErrorf(ctx, "unknown restore level: %v", restoreLevel)
}
Expand Down Expand Up @@ -1473,7 +1493,14 @@ func doResolveSnapshotWithSnapshotName(ctx context.Context, ses FeSession, snaps
var accountId uint32
// cluster level record has no accountName, so accountId is 0
if len(record.accountName) != 0 {
accountId = uint32(record.objId)
if record.level == tree.RESTORELEVELACCOUNT.String() {
accountId = uint32(record.objId)
} else {
accountId, err = defines.GetAccountId(ctx)
if err != nil {
return
}
}
}

return &pbplan.Snapshot{
Expand Down
11 changes: 1 addition & 10 deletions pkg/tnservice/store_rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,7 @@ func (s *store) handleCommit(ctx context.Context, request *txn.TxnRequest, respo
return nil
}
r.waitStarted()
if request.CommitRequest != nil {
for _, req := range request.CommitRequest.Payload {
//response is shared by all requests
prepareResponse(req, response)
err := s.handleWrite(ctx, req, response)
if err != nil {
return err
}
}
}

prepareResponse(request, response)
return r.service.Commit(ctx, request, response)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/txn/service/service_cn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ func (s *service) Commit(ctx context.Context, request *txn.TxnRequest, response
}

txnID := request.Txn.ID
txnCtx := s.getTxnContext(txnID)
txnCtx, _ := s.maybeAddTxn(request.Txn)

if txnCtx == nil {
util.LogTxnNotFoundOn(s.logger, request.Txn, s.shard)
response.TxnError = txn.WrapError(moerr.NewTNShardNotFound(ctx, "", request.GetTargetTN().ShardID), 0)
Expand Down Expand Up @@ -261,7 +262,7 @@ func (s *service) Commit(ctx context.Context, request *txn.TxnRequest, response
if len(newTxn.TNShards) == 1 {
util.LogTxnStart1PCCommit(s.logger, newTxn)

commitTS, err := s.storage.Commit(ctx, newTxn)
commitTS, err := s.storage.Commit(ctx, newTxn, response, request.CommitRequest)
v2.TxnTNCommitHandledCounter.Inc()
if err != nil {
util.LogTxnStart1PCCommitFailed(s.logger, newTxn, err)
Expand Down Expand Up @@ -469,7 +470,7 @@ func (s *service) startAsyncCommitTask(txnCtx *txnContext) error {
util.TxnIDFieldWithID(txnMeta.ID))
}

if _, err := s.storage.Commit(ctx, txnMeta); err != nil {
if _, err := s.storage.Commit(ctx, txnMeta, nil, nil); err != nil {
err = moerr.AttachCause(ctx, err)
s.logger.Fatal("commit failed after prepared",
util.TxnIDFieldWithID(txnMeta.ID),
Expand Down
2 changes: 1 addition & 1 deletion pkg/txn/service/service_dn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *service) CommitTNShard(ctx context.Context, request *txn.TxnRequest, re
}

newTxn.CommitTS = request.Txn.CommitTS
if _, err := s.storage.Commit(ctx, newTxn); err != nil {
if _, err := s.storage.Commit(ctx, newTxn, response, request.CommitRequest); err != nil {
response.TxnError = txn.WrapError(err, moerr.ErrTAECommit)
return nil
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/txn/storage/mem/kv_txn_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,12 @@ func (kv *KVTxnStorage) Committing(ctx context.Context, txnMeta txn.TxnMeta) err
return nil
}

func (kv *KVTxnStorage) Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) {
func (kv *KVTxnStorage) Commit(
ctx context.Context,
txnMeta txn.TxnMeta,
response *txn.TxnResponse,
commitRequests *txn.TxnCommitRequest,
) (timestamp.Timestamp, error) {
kv.Lock()
defer kv.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion pkg/txn/storage/mem/kv_txn_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func committingTestTxn(t *testing.T, s *KVTxnStorage, wTxn *txn.TxnMeta, ts int6

func commitTestTxn(t *testing.T, s *KVTxnStorage, wTxn *txn.TxnMeta, ts int64, errCode uint16) {
wTxn.CommitTS = newTimestamp(ts)
_, e := s.Commit(context.TODO(), *wTxn)
_, e := s.Commit(context.TODO(), *wTxn, nil, nil)
assert.True(t, moerr.IsMoErrCode(e, errCode))
wTxn.Status = txn.TxnStatus_Committed
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/txn/storage/memorystorage/dynamic_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,17 @@ func (d *DynamicStorage) Close(ctx context.Context) error {
return storage.Close(ctx)
}

func (d *DynamicStorage) Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) {
func (d *DynamicStorage) Commit(
ctx context.Context,
txnMeta txn.TxnMeta,
response *txn.TxnResponse,
commitRequests *txn.TxnCommitRequest,
) (timestamp.Timestamp, error) {
storage, err := d.get(ctx)
if err != nil {
return timestamp.Timestamp{}, err
}
return storage.Commit(ctx, txnMeta)
return storage.Commit(ctx, txnMeta, response, commitRequests)
}

func (d *DynamicStorage) Committing(ctx context.Context, txnMeta txn.TxnMeta) error {
Expand Down
7 changes: 6 additions & 1 deletion pkg/txn/storage/memorystorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ func New(

var _ storage.TxnStorage = new(Storage)

func (s *Storage) Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) {
func (s *Storage) Commit(
ctx context.Context,
txnMeta txn.TxnMeta,
response *txn.TxnResponse,
commitRequests *txn.TxnCommitRequest,
) (timestamp.Timestamp, error) {
return s.handler.HandleCommit(ctx, txnMeta)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/txn/storage/memorystorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func testDatabase(
},
}
defer func() {
_, err := s.Commit(ctx, txnMeta)
_, err := s.Commit(ctx, txnMeta, nil, nil)
assert.Nil(t, err)
}()

Expand Down
2 changes: 1 addition & 1 deletion pkg/txn/storage/memorystorage/storage_txn_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (s *StorageTxnOperator) Debug(ctx context.Context, ops []txn.TxnRequest) (*

func (s *StorageTxnOperator) Commit(ctx context.Context) error {
for _, storage := range s.storages {
if _, err := storage.Commit(ctx, s.meta); err != nil {
if _, err := storage.Commit(ctx, s.meta, nil, nil); err != nil {
return err
}
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/txn/storage/tae/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,13 @@ func (s *taeStorage) Close(ctx context.Context) error {
}

// Commit implements storage.TxnTAEStorage
func (s *taeStorage) Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error) {
return s.taeHandler.HandleCommit(ctx, txnMeta)
func (s *taeStorage) Commit(
ctx context.Context,
txnMeta txn.TxnMeta,
response *txn.TxnResponse,
commitRequests *txn.TxnCommitRequest,
) (timestamp.Timestamp, error) {
return s.taeHandler.HandleCommit(ctx, txnMeta, response, commitRequests)
}

// Committing implements storage.TxnTAEStorage
Expand Down
4 changes: 1 addition & 3 deletions pkg/txn/storage/tae/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package taestorage

import (
"context"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
apipb "github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
Expand All @@ -28,9 +27,8 @@ func (s *taeStorage) Write(
txnMeta txn.TxnMeta,
op uint32,
payload []byte) (result []byte, err error) {

switch op {
case uint32(apipb.OpCode_OpPreCommit):
return HandleWrite(ctx, txnMeta, payload, s.taeHandler.HandlePreCommitWrite)
case uint32(apipb.OpCode_OpCommitMerge):
return HandleWrite(ctx, txnMeta, payload, s.taeHandler.HandleCommitMerge)
default:
Expand Down
6 changes: 5 additions & 1 deletion pkg/txn/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ type TxnStorage interface {
// of the transaction is logged to the LogService.
Committing(ctx context.Context, txnMeta txn.TxnMeta) error
// Commit commit the transaction. TxnStorage needs to do conflict locally.
Commit(ctx context.Context, txnMeta txn.TxnMeta) (timestamp.Timestamp, error)
Commit(
ctx context.Context,
txnMeta txn.TxnMeta,
response *txn.TxnResponse,
commitRequests *txn.TxnCommitRequest) (timestamp.Timestamp, error)
// Rollback rollback the transaction.
Rollback(ctx context.Context, txnMeta txn.TxnMeta) error

Expand Down
2 changes: 2 additions & 0 deletions pkg/vm/engine/tae/iface/rpchandle/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Handler interface {
HandleCommit(
ctx context.Context,
meta txn.TxnMeta,
response *txn.TxnResponse,
commitRequests *txn.TxnCommitRequest,
) (timestamp.Timestamp, error)

HandleRollback(
Expand Down
83 changes: 35 additions & 48 deletions pkg/vm/engine/tae/logtail/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,61 +418,48 @@ func LoadCheckpointEntriesFromKey(
NeedCopy: true,
})
}
for i := 0; i < data.bats[ObjectInfoIDX].Length(); i++ {
var objectStats objectio.ObjectStats
buf := data.bats[ObjectInfoIDX].GetVectorByName(ObjectAttr_ObjectStats).Get(i).([]byte)
objectStats.UnMarshal(buf)
deletedAt := data.bats[ObjectInfoIDX].GetVectorByName(EntryNode_DeleteAt).Get(i).(types.TS)
createAt := data.bats[ObjectInfoIDX].GetVectorByName(EntryNode_CreateAt).Get(i).(types.TS)
commitAt := data.bats[ObjectInfoIDX].GetVectorByName(txnbase.SnapshotAttr_CommitTS).Get(i).(types.TS)
isAblk := objectStats.GetAppendable()
if objectStats.Extent().End() == 0 {
// tn obj is in the batch too
continue
}

if deletedAt.IsEmpty() && isAblk {
// no flush, no need to copy
continue
}
collectObject := func(bat *containers.Batch) {
for i := 0; i < bat.Length(); i++ {
var objectStats objectio.ObjectStats
buf := bat.GetVectorByName(ObjectAttr_ObjectStats).Get(i).([]byte)
objectStats.UnMarshal(buf)
deletedAt := bat.GetVectorByName(EntryNode_DeleteAt).Get(i).(types.TS)
createAt := bat.GetVectorByName(EntryNode_CreateAt).Get(i).(types.TS)
commitAt := bat.GetVectorByName(txnbase.SnapshotAttr_CommitTS).Get(i).(types.TS)
isAblk := objectStats.GetAppendable()
if objectStats.Extent().End() == 0 {
// tn obj is in the batch too
continue
}

bo := &objectio.BackupObject{
Location: objectStats.ObjectLocation(),
CrateTS: createAt,
DropTS: deletedAt,
}
if baseTS.IsEmpty() || (!baseTS.IsEmpty() &&
(createAt.GE(baseTS) || commitAt.GE(baseTS))) {
bo.NeedCopy = true
}
locations = append(locations, bo)
if !deletedAt.IsEmpty() {
if softDeletes != nil {
if !(*softDeletes)[objectStats.ObjectName().String()] {
(*softDeletes)[objectStats.ObjectName().String()] = true
if deletedAt.IsEmpty() && isAblk {
// no flush, no need to copy
continue
}

bo := &objectio.BackupObject{
Location: objectStats.ObjectLocation(),
CrateTS: createAt,
DropTS: deletedAt,
}
if baseTS.IsEmpty() || (!baseTS.IsEmpty() &&
(createAt.GE(baseTS) || commitAt.GE(baseTS))) {
bo.NeedCopy = true
}
locations = append(locations, bo)
if !deletedAt.IsEmpty() {
if softDeletes != nil {
if !(*softDeletes)[objectStats.ObjectName().String()] {
(*softDeletes)[objectStats.ObjectName().String()] = true
}
}
}
}
}

for i := 0; i < data.bats[TombstoneObjectInfoIDX].Length(); i++ {
var objectStats objectio.ObjectStats
buf := data.bats[TombstoneObjectInfoIDX].GetVectorByName(ObjectAttr_ObjectStats).Get(i).([]byte)
objectStats.UnMarshal(buf)
commitTS := data.bats[TombstoneObjectInfoIDX].GetVectorByName(txnbase.SnapshotAttr_CommitTS).Get(i).(types.TS)
if objectStats.ObjectLocation().IsEmpty() {
continue
}
bo := &objectio.BackupObject{
Location: objectStats.ObjectLocation(),
CrateTS: commitTS,
}
if baseTS.IsEmpty() ||
(!baseTS.IsEmpty() && commitTS.GE(baseTS)) {
bo.NeedCopy = true
}
locations = append(locations, bo)
}
collectObject(data.bats[ObjectInfoIDX])
collectObject(data.bats[TombstoneObjectInfoIDX])
return locations, data, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/rpc/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (h *mockHandle) HandleCommit(ctx context.Context, meta *txn.TxnMeta) (times
if len(meta.TNShards) > 1 && meta.CommitTS.IsEmpty() {
meta.CommitTS = meta.PreparedTS.Next()
}
return h.Handle.HandleCommit(ctx, *meta)
return h.Handle.HandleCommit(ctx, *meta, nil, nil)
}

func (h *mockHandle) HandleCommitting(ctx context.Context, meta *txn.TxnMeta) error {
Expand Down
Loading

0 comments on commit d2ddd54

Please sign in to comment.