Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: do not rewrite shortvalue iif the value is rollback record. #39596

Merged
merged 5 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 58 additions & 2 deletions br/pkg/stream/meta_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,34 @@ const (
flagShortValuePrefix = byte('v')
flagOverlappedRollback = byte('R')
flagGCFencePrefix = byte('F')
flagLastChangePrefix = byte('l')
flagTxnSourcePrefix = byte('S')
)

// RawWriteCFValue represents the value in write columnFamily.
// Detail see line: https://github.com/tikv/tikv/blob/release-6.5/components/txn_types/src/write.rs#L70
type RawWriteCFValue struct {
t WriteType
startTs uint64
shortValue []byte
hasOverlappedRollback bool
hasGCFence bool
gcFence uint64

// Records the next version after this version when overlapping rollback
// happens on an already existed commit record.
//
// See [`Write::gc_fence`] for more detail.
hasGCFence bool
gcFence uint64

// The number of versions that need skipping from this record
// to find the latest PUT/DELETE record.
// If versions_to_last_change > 0 but last_change_ts == 0, the key does not
// have a PUT/DELETE record before this write record.
lastChangeTs uint64
versionsToLastChange uint64

// The source of this txn.
txnSource uint64
}

// ParseFrom decodes the value to get the struct `RawWriteCFValue`.
Expand All @@ -146,6 +165,10 @@ l_for:
switch data[0] {
case flagShortValuePrefix:
vlen := data[1]
if len(data[2:]) < int(vlen) {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the length of short value is invalid, vlen: %v", int(vlen))
}
v.shortValue = data[2 : vlen+2]
data = data[vlen+2:]
case flagOverlappedRollback:
Expand All @@ -157,13 +180,37 @@ l_for:
if err != nil {
return errors.Annotate(berrors.ErrInvalidArgument, "decode gc fence failed")
}
case flagLastChangePrefix:
data, v.lastChangeTs, err = codec.DecodeUint(data[1:])
if err != nil {
return errors.Annotate(berrors.ErrInvalidArgument, "decode last change ts failed")
}
data, v.versionsToLastChange, err = codec.DecodeUvarint(data)
if err != nil {
return errors.Annotate(berrors.ErrInvalidArgument, "decode versions to last change failed")
}
case flagTxnSourcePrefix:
data, v.txnSource, err = codec.DecodeUvarint(data[1:])
if err != nil {
return errors.Annotate(berrors.ErrInvalidArgument, "decode txn source failed")
}
default:
break l_for
}
}
return nil
}

// IsRollback checks whether the value in cf is a `rollback` record.
func (v *RawWriteCFValue) IsRollback() bool {
return v.GetWriteType() == WriteTypeRollback
}

// IsRollback checks whether the value in cf is a `delete` record.
func (v *RawWriteCFValue) IsDelete() bool {
return v.GetWriteType() == WriteTypeDelete
}

// HasShortValue checks whether short value is stored in write cf.
func (v *RawWriteCFValue) HasShortValue() bool {
return len(v.shortValue) > 0
Expand Down Expand Up @@ -204,5 +251,14 @@ func (v *RawWriteCFValue) EncodeTo() []byte {
data = append(data, flagGCFencePrefix)
data = codec.EncodeUint(data, v.gcFence)
}
if v.lastChangeTs > 0 || v.versionsToLastChange > 0 {
data = append(data, flagLastChangePrefix)
data = codec.EncodeUint(data, v.lastChangeTs)
data = codec.EncodeUvarint(data, v.versionsToLastChange)
}
if v.txnSource > 0 {
data = append(data, flagTxnSourcePrefix)
data = codec.EncodeUvarint(data, v.txnSource)
}
return data
}
101 changes: 96 additions & 5 deletions br/pkg/stream/meta_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,49 @@ func TestWriteType(t *testing.T) {
}

func TestWriteCFValueNoShortValue(t *testing.T) {
var (
ts uint64 = 400036290571534337
txnSource uint64 = 9527
)

buff := make([]byte, 0, 9)
buff = append(buff, byte('P'))
buff = codec.EncodeUvarint(buff, 400036290571534337)
buff = append(buff, WriteTypePut)
buff = codec.EncodeUvarint(buff, ts)
buff = append(buff, flagTxnSourcePrefix)
buff = codec.EncodeUvarint(buff, txnSource)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
require.NoError(t, err)
require.False(t, v.IsDelete())
require.False(t, v.IsRollback())
require.False(t, v.HasShortValue())
require.False(t, v.hasGCFence)
require.Equal(t, v.lastChangeTs, uint64(0))
require.Equal(t, v.versionsToLastChange, uint64(0))
require.Equal(t, v.txnSource, txnSource)

encodedBuff := v.EncodeTo()
require.True(t, bytes.Equal(buff, encodedBuff))
}

func TestWriteCFValueWithShortValue(t *testing.T) {
var ts uint64 = 400036290571534337
shortValue := []byte("pingCAP")
var (
ts uint64 = 400036290571534337
shortValue = []byte("pingCAP")
lastChangeTs uint64 = 9527
versionsToLastChange uint64 = 95271
)

buff := make([]byte, 0, 9)
buff = append(buff, byte('P'))
buff = append(buff, WriteTypePut)
buff = codec.EncodeUvarint(buff, ts)
buff = append(buff, flagShortValuePrefix)
buff = append(buff, byte(len(shortValue)))
buff = append(buff, shortValue...)
buff = append(buff, flagLastChangePrefix)
buff = codec.EncodeUint(buff, lastChangeTs)
buff = codec.EncodeUvarint(buff, versionsToLastChange)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
Expand All @@ -99,7 +119,78 @@ func TestWriteCFValueWithShortValue(t *testing.T) {
require.True(t, bytes.Equal(v.GetShortValue(), shortValue))
require.False(t, v.hasGCFence)
require.False(t, v.hasOverlappedRollback)
require.Equal(t, v.lastChangeTs, lastChangeTs)
require.Equal(t, v.versionsToLastChange, versionsToLastChange)
require.Equal(t, v.txnSource, uint64(0))

data := v.EncodeTo()
require.True(t, bytes.Equal(data, buff))
}

func TestWriteCFValueWithRollback(t *testing.T) {
var (
ts uint64 = 400036290571534337
protectedRollbackShortValue = []byte{'P'}
)

buff := make([]byte, 0, 9)
buff = append(buff, WriteTypeRollback)
buff = codec.EncodeUvarint(buff, ts)
buff = append(buff, flagShortValuePrefix, byte(len(protectedRollbackShortValue)))
buff = append(buff, protectedRollbackShortValue...)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
require.NoError(t, err)
require.True(t, v.IsRollback())
require.True(t, v.HasShortValue())
require.Equal(t, v.GetShortValue(), protectedRollbackShortValue)
require.Equal(t, v.startTs, ts)
require.Equal(t, v.lastChangeTs, uint64(0))
require.Equal(t, v.versionsToLastChange, uint64(0))
require.Equal(t, v.txnSource, uint64(0))

data := v.EncodeTo()
require.Equal(t, data, buff)
}

func TestWriteCFValueWithDelete(t *testing.T) {
var ts uint64 = 400036290571534337
buff := make([]byte, 0, 9)
buff = append(buff, byte('D'))
buff = codec.EncodeUvarint(buff, ts)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
require.NoError(t, err)
require.True(t, v.IsDelete())
require.False(t, v.HasShortValue())

data := v.EncodeTo()
require.Equal(t, data, buff)
}

func TestWriteCFValueWithGcFence(t *testing.T) {
var (
ts uint64 = 400036290571534337
gcFence uint64 = 9527
)

buff := make([]byte, 0, 9)
buff = append(buff, WriteTypePut)
buff = codec.EncodeUvarint(buff, ts)
buff = append(buff, flagOverlappedRollback)
buff = append(buff, flagGCFencePrefix)
buff = codec.EncodeUint(buff, gcFence)

v := new(RawWriteCFValue)
err := v.ParseFrom(buff)
require.NoError(t, err)
require.Equal(t, v.startTs, ts)
require.True(t, v.hasGCFence)
require.Equal(t, v.gcFence, gcFence)
require.True(t, v.hasOverlappedRollback)

data := v.EncodeTo()
require.Equal(t, data, buff)
}
12 changes: 11 additions & 1 deletion br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,20 @@ func (sr *SchemasReplace) rewriteValueV2(value []byte, cf string, rewrite func([
return rewriteResult{}, errors.Trace(err)
}

if rawWriteCFValue.t == WriteTypeDelete {
if rawWriteCFValue.IsDelete() {
return rewriteResult{
NewValue: value,
NeedRewrite: true,
Deleted: true,
}, nil
}
if rawWriteCFValue.IsRollback() {
return rewriteResult{
NewValue: value,
NeedRewrite: true,
Deleted: false,
}, nil
}
if !rawWriteCFValue.HasShortValue() {
return rewriteResult{
NewValue: value,
Expand All @@ -467,6 +474,9 @@ func (sr *SchemasReplace) rewriteValueV2(value []byte, cf string, rewrite func([

shortValue, needWrite, err := rewrite(rawWriteCFValue.GetShortValue())
if err != nil {
log.Info("failed to rewrite short value",
zap.ByteString("write-type", []byte{rawWriteCFValue.GetWriteType()}),
zap.Int("short-value-len", len(rawWriteCFValue.GetShortValue())))
return rewriteResult{}, errors.Trace(err)
}
if !needWrite {
Expand Down