Skip to content

Commit

Permalink
Flush MVCC stats to range and store counters.
Browse files Browse the repository at this point in the history
On successful execution of a range read-write command, flush accumulated
mvcc stats to merging counters for range and store.
  • Loading branch information
Spencer Kimball committed Oct 14, 2014
1 parent 00f206a commit f7559ab
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 10 deletions.
56 changes: 50 additions & 6 deletions storage/engine/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,56 @@ func ValidateRangeMetaKey(key Key) error {
return nil
}

// MakeRangeStatKey returns the key for accessing the named stat
// for the specified range ID.
func MakeRangeStatKey(rangeID int64, stat Key) Key {
encRangeID := encoding.EncodeInt(nil, rangeID)
return MakeKey(KeyLocalRangeStatPrefix, encRangeID, stat)
}

// MakeStoreStatKey returns the key for accessing the named stat
// for the specified store ID.
func MakeStoreStatKey(storeID int32, stat Key) Key {
encStoreID := encoding.EncodeInt(nil, int64(storeID))
return MakeKey(KeyLocalStoreStatPrefix, encStoreID, stat)
}

func init() {
if KeyLocalPrefixLength%7 != 0 {
log.Fatal("local key prefix is not a multiple of 7: %d", KeyLocalPrefixLength)
}
}

// Constants for key construction.
var (
// StatLiveBytes counts how many bytes are "live", including bytes
// from both keys and values. Live rows include only non-deleted
// keys and only the most recent value.
StatLiveBytes = Key("live-bytes")
// StatKeyBytes counts how many bytes are used to store all keys,
// including bytes from deleted keys. Key bytes are re-counted for
// each versioned value.
StatKeyBytes = Key("key-bytes")
// StatValBytes counts how many bytes are used to store all values,
// including all historical versions and deleted tombstones.
StatValBytes = Key("val-bytes")
// StatIntentBytes counts how many bytes are used to store values
// which are unresolved intents. Includes bytes used for both intent
// keys and values.
StatIntentBytes = Key("intent-bytes")
// StatLiveCount counts how many keys are "live". This includes only
// non-deleted keys.
StatLiveCount = Key("live-count")
// StatKeyCount counts the total number of keys, including both live
// and deleted keys.
StatKeyCount = Key("key-count")
// StatValCount counts the total number of values, including all
// historical versions and deleted tombstones.
StatValCount = Key("val-count")
// StatIntentCount counts the number of unresolved intents.
StatIntentCount = Key("intent-count")
)

// Constants for system-reserved keys in the KV map.
var (
// KeyMaxLength is the maximum key length in bytes. This value is
Expand Down Expand Up @@ -263,15 +307,15 @@ var (
// KeyLocalRangeMetadataPrefix is the prefix for keys storing range metadata.
// The value is a struct of type RangeMetadata.
KeyLocalRangeMetadataPrefix = MakeKey(KeyLocalPrefix, Key("rng-"))
// KeyLocalRangeStatPrefix is the prefix for range statistics.
KeyLocalRangeStatPrefix = MakeKey(KeyLocalPrefix, Key("rst-"))
// KeyLocalResponseCachePrefix is the prefix for keys storing command
// responses used to guarantee idempotency (see ResponseCache). This key
// prefix is duplicated in rocksdb_compaction.cc and must be kept in sync
// if modified here.
// responses used to guarantee idempotency (see ResponseCache).
KeyLocalResponseCachePrefix = MakeKey(KeyLocalPrefix, Key("res-"))
// KeyLocalStoreStatPrefix is the prefix for store statistics.
KeyLocalStoreStatPrefix = MakeKey(KeyLocalPrefix, Key("sst-"))
// KeyLocalTransactionPrefix specifies the key prefix for
// transaction records. The suffix is the transaction id. This key
// prefix is duplicated in rocksdb_compaction.cc and must be kept in
// sync if modified here.
// transaction records. The suffix is the transaction id.
KeyLocalTransactionPrefix = MakeKey(KeyLocalPrefix, Key("txn-"))
// KeyLocalSnapshotIDGenerator is a snapshot ID generator sequence.
// Snapshot IDs must be unique per store ID.
Expand Down
52 changes: 52 additions & 0 deletions storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ const (
scanRowCount = int64(1 << 8)
)

// encodeMVCCStatValue constructs a proto.Value using the supplied
// stat increment and then encodes that into a byte slice. Encoding
// errors cause panice (as they should never happen). Returns false
// if stat is equal to 0 to avoid unnecessary merge.
func encodeMVCCStatValue(stat int64) (ok bool, enc []byte) {
if stat == 0 {
return false, nil
}
data, err := gogoproto.Marshal(&proto.Value{Integer: gogoproto.Int64(stat)})
if err != nil {
panic("could not marshal proto.Value")
}
return true, data
}

// MVCCStats tracks byte and instance counts for:
// - Live key/values (i.e. what a scan at current time will reveal;
// note that this includes intent keys and values, but not keys and
Expand Down Expand Up @@ -615,6 +630,43 @@ func (mvcc *MVCC) ResolveWriteIntentRange(key, endKey Key, max int64, txn *proto
return num, nil
}

// FlushStats flushes stats to merge counters for both the affected
// range and store.
func (mvcc *MVCC) FlushStats(rangeID int64, storeID int32) {
if ok, encLiveBytes := encodeMVCCStatValue(mvcc.LiveBytes); ok {
mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatLiveBytes), encLiveBytes)
mvcc.batch.Merge(MakeStoreStatKey(storeID, StatLiveBytes), encLiveBytes)
}
if ok, encKeyBytes := encodeMVCCStatValue(mvcc.KeyBytes); ok {
mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatKeyBytes), encKeyBytes)
mvcc.batch.Merge(MakeStoreStatKey(storeID, StatKeyBytes), encKeyBytes)
}
if ok, encValBytes := encodeMVCCStatValue(mvcc.ValBytes); ok {
mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatValBytes), encValBytes)
mvcc.batch.Merge(MakeStoreStatKey(storeID, StatValBytes), encValBytes)
}
if ok, encIntentBytes := encodeMVCCStatValue(mvcc.IntentBytes); ok {
mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatIntentBytes), encIntentBytes)
mvcc.batch.Merge(MakeStoreStatKey(storeID, StatIntentBytes), encIntentBytes)
}
if ok, encLiveCount := encodeMVCCStatValue(mvcc.LiveCount); ok {
mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatLiveCount), encLiveCount)
mvcc.batch.Merge(MakeStoreStatKey(storeID, StatLiveCount), encLiveCount)
}
if ok, encKeyCount := encodeMVCCStatValue(mvcc.KeyCount); ok {
mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatKeyCount), encKeyCount)
mvcc.batch.Merge(MakeStoreStatKey(storeID, StatKeyCount), encKeyCount)
}
if ok, encValCount := encodeMVCCStatValue(mvcc.ValCount); ok {
mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatValCount), encValCount)
mvcc.batch.Merge(MakeStoreStatKey(storeID, StatValCount), encValCount)
}
if ok, encIntentCount := encodeMVCCStatValue(mvcc.IntentCount); ok {
mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatIntentCount), encIntentCount)
mvcc.batch.Merge(MakeStoreStatKey(storeID, StatIntentCount), encIntentCount)
}
}

// a splitSampleItem wraps a key along with an aggregate over key range
// preceding it.
type splitSampleItem struct {
Expand Down
8 changes: 4 additions & 4 deletions storage/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,14 +678,14 @@ func (r *Range) executeCmd(method string, args proto.Request, reply proto.Respon
return util.Errorf("unrecognized command type: %s", method)
}

// Commit the batch on success.
if reply.Header().Error == nil {
// On success, flush the MVCC stats to the batch and commit.
if !IsReadOnly(method) && reply.Header().Error == nil {
mvcc.FlushStats(r.Meta.RangeID, r.rm.StoreID())
reply.Header().SetGoError(batch.Commit())
}

// Maybe update gossip configs on a put if there was no error.
if (method == Put || method == ConditionalPut) &&
reply.Header().Error == nil {
if (method == Put || method == ConditionalPut) && reply.Header().Error == nil {
r.maybeUpdateGossipConfigs(args.Header().Key)
}

Expand Down
93 changes: 93 additions & 0 deletions storage/range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,18 @@ func putArgs(key, value []byte, rangeID int64) (*proto.PutRequest, *proto.PutRes
return args, reply
}

// deleteArgs returns a DeleteRequest and DeleteResponse pair.
func deleteArgs(key engine.Key, rangeID int64) (*proto.DeleteRequest, *proto.DeleteResponse) {
args := &proto.DeleteRequest{
RequestHeader: proto.RequestHeader{
Key: key,
Replica: proto.Replica{RangeID: rangeID},
},
}
reply := &proto.DeleteResponse{}
return args, reply
}

// readOrWriteArgs returns either get or put arguments depending on
// value of "read". Get for true; Put for false. Returns method
// selected and args & reply.
Expand Down Expand Up @@ -1403,3 +1415,84 @@ func TestInternalPushTxnPushTimestampAlreadyPushed(t *testing.T) {
t.Errorf("expected pushed txn to have status PENDING; got %s", reply.PusheeTxn.Status)
}
}

func verifyStatCounter(eng engine.Engine, rangeID int64, stat engine.Key, expVal int64, t *testing.T) {
statVal := &proto.Value{}
ok, err := engine.GetProto(eng, engine.MakeRangeStatKey(rangeID, stat), statVal)
if err != nil {
t.Fatal(err)
}
if expVal == 0 && !ok {
return
} else if expVal != 0 && !ok {
t.Errorf("stat %q missing", stat)
}
if statVal.GetInteger() != expVal {
t.Errorf("expected stat %q to have value %d; got %d", stat, expVal, statVal.GetInteger())
}
}

func verifyRangeStats(eng engine.Engine, rangeID int64, expMS engine.MVCCStats, t *testing.T) {
verifyStatCounter(eng, rangeID, engine.StatLiveBytes, expMS.LiveBytes, t)
verifyStatCounter(eng, rangeID, engine.StatKeyBytes, expMS.KeyBytes, t)
verifyStatCounter(eng, rangeID, engine.StatValBytes, expMS.ValBytes, t)
verifyStatCounter(eng, rangeID, engine.StatIntentBytes, expMS.IntentBytes, t)
verifyStatCounter(eng, rangeID, engine.StatLiveCount, expMS.LiveCount, t)
verifyStatCounter(eng, rangeID, engine.StatKeyCount, expMS.KeyCount, t)
verifyStatCounter(eng, rangeID, engine.StatValCount, expMS.ValCount, t)
verifyStatCounter(eng, rangeID, engine.StatIntentCount, expMS.IntentCount, t)
}

// TestRangeStats verifies that commands executed against a range
// update the range stat counters. The stat values are empirically
// derived; we're really just testing that they increment in the right
// ways, not the exact amounts. If the encodings change, will need to
// update this test.
func TestRangeStats(t *testing.T) {
rng, _, clock, eng := createTestRangeWithClock(t)
defer rng.Stop()
// Put a value.
pArgs, pReply := putArgs([]byte("a"), []byte("value1"), 0)
pArgs.Timestamp = clock.Now()
if err := rng.AddCmd(Put, pArgs, pReply, true); err != nil {
t.Fatal(err)
}
expMS := engine.MVCCStats{LiveBytes: 44, KeyBytes: 20, ValBytes: 24, IntentBytes: 0, LiveCount: 1, KeyCount: 1, ValCount: 1, IntentCount: 0}
verifyRangeStats(eng, rng.Meta.RangeID, expMS, t)

// Put a 2nd value transactionally.
pArgs, pReply = putArgs([]byte("b"), []byte("value2"), 0)
pArgs.Timestamp = clock.Now()
pArgs.Txn = &proto.Transaction{ID: []byte("txn1"), Timestamp: pArgs.Timestamp}
if err := rng.AddCmd(Put, pArgs, pReply, true); err != nil {
t.Fatal(err)
}
expMS = engine.MVCCStats{LiveBytes: 118, KeyBytes: 40, ValBytes: 78, IntentBytes: 28, LiveCount: 2, KeyCount: 2, ValCount: 2, IntentCount: 1}
verifyRangeStats(eng, rng.Meta.RangeID, expMS, t)

// Resolve the 2nd value.
rArgs := &proto.InternalResolveIntentRequest{
RequestHeader: proto.RequestHeader{
Timestamp: pArgs.Txn.Timestamp,
Key: pArgs.Key,
Replica: proto.Replica{RangeID: 0},
Txn: pArgs.Txn,
},
}
rArgs.Txn.Status = proto.COMMITTED
rReply := &proto.InternalResolveIntentResponse{}
if err := rng.AddCmd(InternalResolveIntent, rArgs, rReply, true); err != nil {
t.Fatal(err)
}
expMS = engine.MVCCStats{LiveBytes: 88, KeyBytes: 40, ValBytes: 48, IntentBytes: 0, LiveCount: 2, KeyCount: 2, ValCount: 2, IntentCount: 0}
verifyRangeStats(eng, rng.Meta.RangeID, expMS, t)

// Delete the 1st value.
dArgs, dReply := deleteArgs([]byte("a"), 0)
dArgs.Timestamp = clock.Now()
if err := rng.AddCmd(Delete, dArgs, dReply, true); err != nil {
t.Fatal(err)
}
expMS = engine.MVCCStats{LiveBytes: 44, KeyBytes: 56, ValBytes: 50, IntentBytes: 0, LiveCount: 1, KeyCount: 2, ValCount: 3, IntentCount: 0}
verifyRangeStats(eng, rng.Meta.RangeID, expMS, t)
}
2 changes: 2 additions & 0 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (s *Store) BootstrapRangeMetadata() *proto.RangeMetadata {
}

// The following methods are accessors implementation the RangeManager interface.
func (s *Store) StoreID() int32 { return s.Ident.StoreID }
func (s *Store) Clock() *hlc.Clock { return s.clock }
func (s *Store) Engine() engine.Engine { return s.engine }
func (s *Store) Allocator() *allocator { return s.allocator }
Expand Down Expand Up @@ -577,6 +578,7 @@ func (s *Store) maybeResolveWriteIntentError(rng *Range, method string, args pro
// (i.e. splitting and merging) operations.
type RangeManager interface {
// Accessors for shared state.
StoreID() int32
Clock() *hlc.Clock
Engine() engine.Engine
Allocator() *allocator
Expand Down

0 comments on commit f7559ab

Please sign in to comment.