Skip to content

Commit

Permalink
add test cases
Browse files Browse the repository at this point in the history
Signed-off-by: joccau <[email protected]>
  • Loading branch information
joccau committed Dec 2, 2022
1 parent 2ddf385 commit 8f61a6f
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 11 deletions.
19 changes: 13 additions & 6 deletions br/pkg/stream/meta_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,29 @@ const (
flagTxnSourcePrefix = byte('S')
)

// RawWriteCFValue represents the value in write columnFamily.
// Detail see line: https://github.com/tikv/tikv/blob/release-6.5/components/txn_types/src/write.rs#L70
type RawWriteCFValue struct {
t WriteType
startTs uint64
shortValue []byte
hasOverlappedRollback bool

// Records the next version after this version when overlapping rollback
// happens on an already existed commit record.
//
// See [`Write::gc_fence`] for more detail.
hasGCFence bool
gcFence uint64

//
lastChangeTs uint64

//
// The number of versions that need skipping from this record
// to find the latest PUT/DELETE record.
// If versions_to_last_change > 0 but last_change_ts == 0, the key does not
// have a PUT/DELETE record before this write record.
lastChangeTs uint64
versionsToLastChange uint64

//
// The source of this txn.
txnSource uint64
}

Expand Down Expand Up @@ -183,7 +190,7 @@ l_for:
return errors.Annotate(berrors.ErrInvalidArgument, "decode versions to last change failed")
}
case flagTxnSourcePrefix:
data, v.txnSource, err = codec.DecodeUvarint(data)
data, v.txnSource, err = codec.DecodeUvarint(data[1:])
if err != nil {
return errors.Annotate(berrors.ErrInvalidArgument, "decode txn source failed")
}
Expand Down
101 changes: 96 additions & 5 deletions br/pkg/stream/meta_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,49 @@ func TestWriteType(t *testing.T) {
}

func TestWriteCFValueNoShortValue(t *testing.T) {
var (
ts uint64 = 400036290571534337
txnSource uint64 = 9527
)

buff := make([]byte, 0, 9)
buff = append(buff, byte('P'))
buff = codec.EncodeUvarint(buff, 400036290571534337)
buff = append(buff, WriteTypePut)
buff = codec.EncodeUvarint(buff, ts)
buff = append(buff, flagTxnSourcePrefix)
buff = codec.EncodeUvarint(buff, txnSource)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
require.NoError(t, err)
require.False(t, v.IsDelete())
require.False(t, v.IsRollback())
require.False(t, v.HasShortValue())
require.False(t, v.hasGCFence)
require.Equal(t, v.lastChangeTs, uint64(0))
require.Equal(t, v.versionsToLastChange, uint64(0))
require.Equal(t, v.txnSource, txnSource)

encodedBuff := v.EncodeTo()
require.True(t, bytes.Equal(buff, encodedBuff))
}

func TestWriteCFValueWithShortValue(t *testing.T) {
var ts uint64 = 400036290571534337
shortValue := []byte("pingCAP")
var (
ts uint64 = 400036290571534337
shortValue = []byte("pingCAP")
lastChangeTs uint64 = 9527
versionsToLastChange uint64 = 95271
)

buff := make([]byte, 0, 9)
buff = append(buff, byte('P'))
buff = append(buff, WriteTypePut)
buff = codec.EncodeUvarint(buff, ts)
buff = append(buff, flagShortValuePrefix)
buff = append(buff, byte(len(shortValue)))
buff = append(buff, shortValue...)
buff = append(buff, flagLastChangePrefix)
buff = codec.EncodeUint(buff, lastChangeTs)
buff = codec.EncodeUvarint(buff, versionsToLastChange)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
Expand All @@ -99,7 +119,78 @@ func TestWriteCFValueWithShortValue(t *testing.T) {
require.True(t, bytes.Equal(v.GetShortValue(), shortValue))
require.False(t, v.hasGCFence)
require.False(t, v.hasOverlappedRollback)
require.Equal(t, v.lastChangeTs, lastChangeTs)
require.Equal(t, v.versionsToLastChange, versionsToLastChange)
require.Equal(t, v.txnSource, uint64(0))

data := v.EncodeTo()
require.True(t, bytes.Equal(data, buff))
}

func TestWriteCFValueWithRollback(t *testing.T) {
var (
ts uint64 = 400036290571534337
protectedRollbackShortValue = []byte{'P'}
)

buff := make([]byte, 0, 9)
buff = append(buff, WriteTypeRollback)
buff = codec.EncodeUvarint(buff, ts)
buff = append(buff, flagShortValuePrefix, byte(len(protectedRollbackShortValue)))
buff = append(buff, protectedRollbackShortValue...)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
require.NoError(t, err)
require.True(t, v.IsRollback())
require.True(t, v.HasShortValue())
require.Equal(t, v.GetShortValue(), protectedRollbackShortValue)
require.Equal(t, v.startTs, ts)
require.Equal(t, v.lastChangeTs, uint64(0))
require.Equal(t, v.versionsToLastChange, uint64(0))
require.Equal(t, v.txnSource, uint64(0))

data := v.EncodeTo()
require.Equal(t, data, buff)
}

func TestWriteCFValueWithDelete(t *testing.T) {
var ts uint64 = 400036290571534337
buff := make([]byte, 0, 9)
buff = append(buff, byte('D'))
buff = codec.EncodeUvarint(buff, ts)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
require.NoError(t, err)
require.True(t, v.IsDelete())
require.False(t, v.HasShortValue())

data := v.EncodeTo()
require.Equal(t, data, buff)
}

func TestWriteCFValueWithGcFence(t *testing.T) {
var (
ts uint64 = 400036290571534337
gcFence uint64 = 9527
)

buff := make([]byte, 0, 9)
buff = append(buff, WriteTypePut)
buff = codec.EncodeUvarint(buff, ts)
buff = append(buff, flagOverlappedRollback)
buff = append(buff, flagGCFencePrefix)
buff = codec.EncodeUint(buff, gcFence)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
require.NoError(t, err)
require.Equal(t, v.startTs, ts)
require.True(t, v.hasGCFence)
require.Equal(t, v.gcFence, gcFence)
require.True(t, v.hasOverlappedRollback)

data := v.EncodeTo()
require.Equal(t, data, buff)
}

0 comments on commit 8f61a6f

Please sign in to comment.