Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keys: Move SequenceCacheKey decoding to keys package #4947

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,49 @@ func SequenceCacheKey(rangeID roachpb.RangeID, txnID *uuid.UUID, epoch uint32, s
return key
}

// DecodeSequenceCacheKey decodes the provided sequence cache entry,
// returning the transaction ID, the epoch, and the sequence number.
func DecodeSequenceCacheKey(key roachpb.Key, dest []byte) (*uuid.UUID, uint32, uint32, error) {
// TODO(tschottdorf): redundant check.
if !bytes.HasPrefix(key, LocalRangeIDPrefix) {
return nil, 0, 0, util.Errorf("key %s does not have %s prefix", key, LocalRangeIDPrefix)
}
// Cut the prefix, the Range ID, and the infix specifier.
b := key[len(LocalRangeIDPrefix):]
b, _, err := encoding.DecodeUvarintAscending(b)
if err != nil {
return nil, 0, 0, err
}
b = b[1:]
if !bytes.HasPrefix(b, LocalSequenceCacheSuffix) {
return nil, 0, 0, util.Errorf("key %s does not contain the sequence cache suffix %s",
key, LocalSequenceCacheSuffix)
}
// Cut the sequence cache suffix.
b = b[len(LocalSequenceCacheSuffix):]
// Decode the id.
b, idBytes, err := encoding.DecodeBytesAscending(b, dest)
if err != nil {
return nil, 0, 0, err
}
// Decode the epoch.
b, epoch, err := encoding.DecodeUint32Descending(b)
if err != nil {
return nil, 0, 0, err
}
// Decode the sequence number.
b, seq, err := encoding.DecodeUint32Descending(b)
if err != nil {
return nil, 0, 0, err
}
if len(b) > 0 {
return nil, 0, 0, util.Errorf("key %q has leftover bytes after decode: %s; indicates corrupt key",
key, b)
}
txnID, err := uuid.FromBytes(idBytes)
return txnID, epoch, seq, err
}

// RaftTombstoneKey returns a system-local key for a raft tombstone.
func RaftTombstoneKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDReplicatedKey(rangeID, localRaftTombstoneSuffix, nil)
Expand Down
26 changes: 26 additions & 0 deletions keys/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/testutils"
"github.com/cockroachdb/cockroach/util/encoding"
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/cockroachdb/cockroach/util/uuid"
)

Expand Down Expand Up @@ -52,6 +53,31 @@ func TestMakeKey(t *testing.T) {
}
}

func TestSequenceCacheEncodeDecode(t *testing.T) {
defer leaktest.AfterTest(t)()
const rangeID = 123
const testTxnEpoch = 5
const expSeq = 987
testTxnID, err := uuid.FromString("0ce61c17-5eb4-4587-8c36-dcf4062ada4c")
if err != nil {
panic(err)
}
key := SequenceCacheKey(rangeID, testTxnID, testTxnEpoch, expSeq)
txnID, epoch, seq, err := DecodeSequenceCacheKey(key, nil)
if err != nil {
t.Fatal(err)
}
if !roachpb.TxnIDEqual(txnID, testTxnID) {
t.Fatalf("expected txnID %q, got %q", testTxnID, txnID)
}
if epoch != testTxnEpoch {
t.Fatalf("expected epoch %d, got %d", testTxnEpoch, epoch)
}
if seq != expSeq {
t.Fatalf("expected sequence %d, got %d", expSeq, seq)
}
}

func TestKeyAddress(t *testing.T) {
testCases := []struct {
key roachpb.Key
Expand Down
48 changes: 3 additions & 45 deletions storage/sequence_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/encoding"
"github.com/cockroachdb/cockroach/util/uuid"
)

Expand Down Expand Up @@ -109,7 +108,7 @@ func (sc *SequenceCache) Get(e engine.Engine, txnID *uuid.UUID, dest *roachpb.Se
if err != nil || len(kvs) == 0 || !bytes.HasPrefix(kvs[0].Key, prefix) {
return 0, 0, err
}
_, epoch, seq, err := decodeSequenceCacheKey(kvs[0].Key, sc.scratchBuf[:0])
_, epoch, seq, err := keys.DecodeSequenceCacheKey(kvs[0].Key, sc.scratchBuf[:0])
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -140,7 +139,7 @@ func (sc *SequenceCache) Iterate(e engine.Engine, f func([]byte, *uuid.UUID, roa
true /* consistent */, nil /* txn */, false, /* !reverse */
func(kv roachpb.KeyValue) (bool, error) {
var entry roachpb.SequenceCacheEntry
txnID, _, _, err := decodeSequenceCacheKey(kv.Key, nil)
txnID, _, _, err := keys.DecodeSequenceCacheKey(kv.Key, nil)
if err != nil {
panic(err) // TODO(tschottdorf): ReplicaCorruptionError
}
Expand Down Expand Up @@ -246,50 +245,9 @@ func (sc *SequenceCache) shouldCacheError(pErr *roachpb.Error) bool {
return true
}

func decodeSequenceCacheKey(key roachpb.Key, dest []byte) (*uuid.UUID, uint32, uint32, error) {
// TODO(tschottdorf): redundant check.
if !bytes.HasPrefix(key, keys.LocalRangeIDPrefix) {
return nil, 0, 0, util.Errorf("key %s does not have %s prefix", key, keys.LocalRangeIDPrefix)
}
// Cut the prefix, the Range ID, and the infix specifier.
b := key[len(keys.LocalRangeIDPrefix):]
b, _, err := encoding.DecodeUvarintAscending(b)
if err != nil {
return nil, 0, 0, err
}
b = b[1:]
if !bytes.HasPrefix(b, keys.LocalSequenceCacheSuffix) {
return nil, 0, 0, util.Errorf("key %s does not contain the sequence cache suffix %s",
key, keys.LocalSequenceCacheSuffix)
}
// Cut the sequence cache suffix.
b = b[len(keys.LocalSequenceCacheSuffix):]
// Decode the id.
b, idBytes, err := encoding.DecodeBytesAscending(b, dest)
if err != nil {
return nil, 0, 0, err
}
// Decode the epoch.
b, epoch, err := encoding.DecodeUint32Descending(b)
if err != nil {
return nil, 0, 0, err
}
// Decode the sequence number.
b, seq, err := encoding.DecodeUint32Descending(b)
if err != nil {
return nil, 0, 0, err
}
if len(b) > 0 {
return nil, 0, 0, util.Errorf("key %q has leftover bytes after decode: %s; indicates corrupt key",
key, b)
}
txnID, err := uuid.FromBytes(idBytes)
return txnID, epoch, seq, err
}

func decodeSequenceCacheMVCCKey(encKey engine.MVCCKey, dest []byte) (*uuid.UUID, uint32, uint32, error) {
if encKey.IsValue() {
return nil, 0, 0, util.Errorf("key %s is not a raw MVCC value", encKey)
}
return decodeSequenceCacheKey(encKey.Key, dest)
return keys.DecodeSequenceCacheKey(encKey.Key, dest)
}
21 changes: 0 additions & 21 deletions storage/sequence_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"reflect"
"testing"

"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util/leaktest"
Expand Down Expand Up @@ -58,26 +57,6 @@ func createTestSequenceCache(t *testing.T, rangeID roachpb.RangeID, stopper *sto
return NewSequenceCache(rangeID), engine.NewInMem(roachpb.Attributes{}, 1<<20, stopper)
}

func TestSequenceCacheEncodeDecode(t *testing.T) {
defer leaktest.AfterTest(t)()
const rangeID = 123
const expSeq = 987
key := keys.SequenceCacheKey(rangeID, testTxnID, testTxnEpoch, expSeq)
txnID, epoch, seq, err := decodeSequenceCacheKey(key, nil)
if err != nil {
t.Fatal(err)
}
if !roachpb.TxnIDEqual(txnID, testTxnID) {
t.Fatalf("expected txnID %q, got %q", testTxnID, txnID)
}
if epoch != testTxnEpoch {
t.Fatalf("expected epoch %d, got %d", testTxnEpoch, epoch)
}
if seq != expSeq {
t.Fatalf("expected sequence %d, got %d", expSeq, seq)
}
}

// TestSequenceCachePutGetClearData tests basic get & put functionality as well as
// clearing the cache.
func TestSequenceCachePutGetClearData(t *testing.T) {
Expand Down