Skip to content

Commit

Permalink
Merge branch 'main' into do-no-merge-tombstone-when-compact
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Dec 10, 2024
2 parents a9e0e24 + e7c4432 commit fb8ce9d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 6 deletions.
30 changes: 30 additions & 0 deletions pkg/container/bytejson/bytejosn_modifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,33 @@ func TestAppendBinaryString(t *testing.T) {
})
}
}

func Test_appendBinaryValElem(t *testing.T) {
tests := []struct {
name string
input any
wantType TpCode
wantErr bool
}{
{
name: "invalid type",
input: struct{}{},
wantType: 0,
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotBuf, err := appendBinaryValElem(make([]byte, 0), 0, 0, tt.input)

if tt.wantErr {
require.Error(t, err)
return
}

require.NoError(t, err)
require.NotEmpty(t, gotBuf)
})
}
}
3 changes: 1 addition & 2 deletions pkg/container/bytejson/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/util"
"github.com/pingcap/errors"
)

func ParseFromString(s string) (ret ByteJson, err error) {
Expand Down Expand Up @@ -673,7 +672,7 @@ func appendBinaryValElem(buf []byte, docOff, valEntryOff int, val any) ([]byte,
elemDocOff := len(buf)
typeCode, buf, err = appendBinaryJSON(buf, val)
if err != nil {
return nil, errors.Trace(err)
return nil, moerr.NewInvalidArgNoCtx("invalid json value", val)
}
if typeCode == TpCodeLiteral {
litCode := buf[elemDocOff]
Expand Down
27 changes: 23 additions & 4 deletions pkg/txn/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ type txnClient struct {
// all active txns
activeTxns map[string]*txnOperator
// FIFO queue for ready to active txn
waitActiveTxns []*txnOperator
waitActiveTxns []*txnOperator
waitMarkAllActiveAbortedC chan struct{}
}

abortC chan time.Time
Expand Down Expand Up @@ -499,6 +500,8 @@ func (client *txnClient) openTxn(op *txnOperator) error {
client.mu.Unlock()
}()

client.waitMarkAllActiveAbortedLocked()

if !op.opts.skipWaitPushClient {
for client.mu.state == paused {
if client.normalStateNoWait {
Expand Down Expand Up @@ -695,13 +698,23 @@ func (client *txnClient) handleMarkActiveTxnAborted(
case from := <-client.abortC:
fn := func() {
client.mu.Lock()
defer client.mu.Unlock()

client.mu.waitMarkAllActiveAbortedC = make(chan struct{})
ops := make([]*txnOperator, 0, len(client.mu.activeTxns))
for _, op := range client.mu.activeTxns {
if op.reset.createAt.Before(from) {
op.addFlag(AbortedFlag)
ops = append(ops, op)
}
}
client.mu.Unlock()

for _, op := range ops {
op.addFlag(AbortedFlag)
}

client.mu.Lock()
close(client.mu.waitMarkAllActiveAbortedC)
client.mu.waitMarkAllActiveAbortedC = nil
client.mu.Unlock()
}
fn()

Expand All @@ -727,3 +740,9 @@ func (client *txnClient) removeFromWaitActiveLocked(txnID []byte) bool {
client.mu.waitActiveTxns = values
return ok
}

func (client *txnClient) waitMarkAllActiveAbortedLocked() {
if client.mu.waitMarkAllActiveAbortedC != nil {
<-client.mu.waitMarkAllActiveAbortedC
}
}

0 comments on commit fb8ce9d

Please sign in to comment.