Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush MVCC stats to range and store counters. #125

Merged
merged 2 commits into from
Oct 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions storage/engine/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,56 @@ func ValidateRangeMetaKey(key Key) error {
return nil
}

// MakeRangeStatKey returns the key for accessing the named stat
// for the specified range ID.
func MakeRangeStatKey(rangeID int64, stat Key) Key {
encRangeID := encoding.EncodeInt(nil, rangeID)
return MakeKey(KeyLocalRangeStatPrefix, encRangeID, stat)
}

// MakeStoreStatKey returns the key for accessing the named stat
// for the specified store ID.
func MakeStoreStatKey(storeID int32, stat Key) Key {
encStoreID := encoding.EncodeInt(nil, int64(storeID))
return MakeKey(KeyLocalStoreStatPrefix, encStoreID, stat)
}

func init() {
if KeyLocalPrefixLength%7 != 0 {
log.Fatal("local key prefix is not a multiple of 7: %d", KeyLocalPrefixLength)
}
}

// Constants for key construction.
var (
// StatLiveBytes counts how many bytes are "live", including bytes
// from both keys and values. Live rows include only non-deleted
// keys and only the most recent value.
StatLiveBytes = Key("live-bytes")
// StatKeyBytes counts how many bytes are used to store all keys,
// including bytes from deleted keys. Key bytes are re-counted for
// each versioned value.
StatKeyBytes = Key("key-bytes")
// StatValBytes counts how many bytes are used to store all values,
// including all historical versions and deleted tombstones.
StatValBytes = Key("val-bytes")
// StatIntentBytes counts how many bytes are used to store values
// which are unresolved intents. Includes bytes used for both intent
// keys and values.
StatIntentBytes = Key("intent-bytes")
// StatLiveCount counts how many keys are "live". This includes only
// non-deleted keys.
StatLiveCount = Key("live-count")
// StatKeyCount counts the total number of keys, including both live
// and deleted keys.
StatKeyCount = Key("key-count")
// StatValCount counts the total number of values, including all
// historical versions and deleted tombstones.
StatValCount = Key("val-count")
// StatIntentCount counts the number of unresolved intents.
StatIntentCount = Key("intent-count")
)

// Constants for system-reserved keys in the KV map.
var (
// KeyMaxLength is the maximum key length in bytes. This value is
Expand Down Expand Up @@ -263,15 +307,15 @@ var (
// KeyLocalRangeMetadataPrefix is the prefix for keys storing range metadata.
// The value is a struct of type RangeMetadata.
KeyLocalRangeMetadataPrefix = MakeKey(KeyLocalPrefix, Key("rng-"))
// KeyLocalRangeStatPrefix is the prefix for range statistics.
KeyLocalRangeStatPrefix = MakeKey(KeyLocalPrefix, Key("rst-"))
// KeyLocalResponseCachePrefix is the prefix for keys storing command
// responses used to guarantee idempotency (see ResponseCache). This key
// prefix is duplicated in rocksdb_compaction.cc and must be kept in sync
// if modified here.
// responses used to guarantee idempotency (see ResponseCache).
KeyLocalResponseCachePrefix = MakeKey(KeyLocalPrefix, Key("res-"))
// KeyLocalStoreStatPrefix is the prefix for store statistics.
KeyLocalStoreStatPrefix = MakeKey(KeyLocalPrefix, Key("sst-"))
// KeyLocalTransactionPrefix specifies the key prefix for
// transaction records. The suffix is the transaction id. This key
// prefix is duplicated in rocksdb_compaction.cc and must be kept in
// sync if modified here.
// transaction records. The suffix is the transaction id.
KeyLocalTransactionPrefix = MakeKey(KeyLocalPrefix, Key("txn-"))
// KeyLocalSnapshotIDGenerator is a snapshot ID generator sequence.
// Snapshot IDs must be unique per store ID.
Expand Down
37 changes: 37 additions & 0 deletions storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ const (
scanRowCount = int64(1 << 8)
)

// encodeMVCCStatValue constructs a proto.Value using the supplied
// stat increment and then encodes that into a byte slice. Encoding
// errors cause panics (as they should never happen). Returns false
// if stat is equal to 0 to avoid unnecessary merge.
func encodeMVCCStatValue(stat int64) (ok bool, enc []byte) {
if stat == 0 {
return false, nil
}
data, err := gogoproto.Marshal(&proto.Value{Integer: gogoproto.Int64(stat)})
if err != nil {
panic(fmt.Sprintf("could not marshal proto.Value: %s", err))
}
return true, data
}

// MVCCStats tracks byte and instance counts for:
// - Live key/values (i.e. what a scan at current time will reveal;
// note that this includes intent keys and values, but not keys and
Expand Down Expand Up @@ -615,6 +630,28 @@ func (mvcc *MVCC) ResolveWriteIntentRange(key, endKey Key, max int64, txn *proto
return num, nil
}

// FlushStat flushes the specified stat to merge counters for both
// the affected range and store.
func (mvcc *MVCC) FlushStat(rangeID int64, storeID int32, stat Key, statVal int64) {
if ok, encStat := encodeMVCCStatValue(statVal); ok {
mvcc.batch.Merge(MakeRangeStatKey(rangeID, stat), encStat)
mvcc.batch.Merge(MakeStoreStatKey(storeID, stat), encStat)
}
}

// FlushStats flushes stats to merge counters for both the affected
// range and store.
func (mvcc *MVCC) FlushStats(rangeID int64, storeID int32) {
mvcc.FlushStat(rangeID, storeID, StatLiveBytes, mvcc.LiveBytes)
mvcc.FlushStat(rangeID, storeID, StatKeyBytes, mvcc.KeyBytes)
mvcc.FlushStat(rangeID, storeID, StatValBytes, mvcc.ValBytes)
mvcc.FlushStat(rangeID, storeID, StatIntentBytes, mvcc.IntentBytes)
mvcc.FlushStat(rangeID, storeID, StatLiveCount, mvcc.LiveCount)
mvcc.FlushStat(rangeID, storeID, StatKeyCount, mvcc.KeyCount)
mvcc.FlushStat(rangeID, storeID, StatValCount, mvcc.ValCount)
mvcc.FlushStat(rangeID, storeID, StatIntentCount, mvcc.IntentCount)
}

// a splitSampleItem wraps a key along with an aggregate over key range
// preceding it.
type splitSampleItem struct {
Expand Down
8 changes: 4 additions & 4 deletions storage/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,14 +678,14 @@ func (r *Range) executeCmd(method string, args proto.Request, reply proto.Respon
return util.Errorf("unrecognized command type: %s", method)
}

// Commit the batch on success.
if reply.Header().Error == nil {
// On success, flush the MVCC stats to the batch and commit.
if !IsReadOnly(method) && reply.Header().Error == nil {
mvcc.FlushStats(r.Meta.RangeID, r.rm.StoreID())
reply.Header().SetGoError(batch.Commit())
}

// Maybe update gossip configs on a put if there was no error.
if (method == Put || method == ConditionalPut) &&
reply.Header().Error == nil {
if (method == Put || method == ConditionalPut) && reply.Header().Error == nil {
r.maybeUpdateGossipConfigs(args.Header().Key)
}

Expand Down
93 changes: 93 additions & 0 deletions storage/range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,18 @@ func putArgs(key, value []byte, rangeID int64) (*proto.PutRequest, *proto.PutRes
return args, reply
}

// deleteArgs returns a DeleteRequest and DeleteResponse pair.
func deleteArgs(key engine.Key, rangeID int64) (*proto.DeleteRequest, *proto.DeleteResponse) {
args := &proto.DeleteRequest{
RequestHeader: proto.RequestHeader{
Key: key,
Replica: proto.Replica{RangeID: rangeID},
},
}
reply := &proto.DeleteResponse{}
return args, reply
}

// readOrWriteArgs returns either get or put arguments depending on
// value of "read". Get for true; Put for false. Returns method
// selected and args & reply.
Expand Down Expand Up @@ -1403,3 +1415,84 @@ func TestInternalPushTxnPushTimestampAlreadyPushed(t *testing.T) {
t.Errorf("expected pushed txn to have status PENDING; got %s", reply.PusheeTxn.Status)
}
}

func verifyStatCounter(eng engine.Engine, rangeID int64, stat engine.Key, expVal int64, t *testing.T) {
statVal := &proto.Value{}
ok, err := engine.GetProto(eng, engine.MakeRangeStatKey(rangeID, stat), statVal)
if err != nil {
t.Fatal(err)
}
if expVal == 0 && !ok {
return
} else if expVal != 0 && !ok {
t.Errorf("stat %q missing", stat)
}
if statVal.GetInteger() != expVal {
t.Errorf("expected stat %q to have value %d; got %d", stat, expVal, statVal.GetInteger())
}
}

func verifyRangeStats(eng engine.Engine, rangeID int64, expMS engine.MVCCStats, t *testing.T) {
verifyStatCounter(eng, rangeID, engine.StatLiveBytes, expMS.LiveBytes, t)
verifyStatCounter(eng, rangeID, engine.StatKeyBytes, expMS.KeyBytes, t)
verifyStatCounter(eng, rangeID, engine.StatValBytes, expMS.ValBytes, t)
verifyStatCounter(eng, rangeID, engine.StatIntentBytes, expMS.IntentBytes, t)
verifyStatCounter(eng, rangeID, engine.StatLiveCount, expMS.LiveCount, t)
verifyStatCounter(eng, rangeID, engine.StatKeyCount, expMS.KeyCount, t)
verifyStatCounter(eng, rangeID, engine.StatValCount, expMS.ValCount, t)
verifyStatCounter(eng, rangeID, engine.StatIntentCount, expMS.IntentCount, t)
}

// TestRangeStats verifies that commands executed against a range
// update the range stat counters. The stat values are empirically
// derived; we're really just testing that they increment in the right
// ways, not the exact amounts. If the encodings change, will need to
// update this test.
func TestRangeStats(t *testing.T) {
rng, _, clock, eng := createTestRangeWithClock(t)
defer rng.Stop()
// Put a value.
pArgs, pReply := putArgs([]byte("a"), []byte("value1"), 0)
pArgs.Timestamp = clock.Now()
if err := rng.AddCmd(Put, pArgs, pReply, true); err != nil {
t.Fatal(err)
}
expMS := engine.MVCCStats{LiveBytes: 44, KeyBytes: 20, ValBytes: 24, IntentBytes: 0, LiveCount: 1, KeyCount: 1, ValCount: 1, IntentCount: 0}
verifyRangeStats(eng, rng.Meta.RangeID, expMS, t)

// Put a 2nd value transactionally.
pArgs, pReply = putArgs([]byte("b"), []byte("value2"), 0)
pArgs.Timestamp = clock.Now()
pArgs.Txn = &proto.Transaction{ID: []byte("txn1"), Timestamp: pArgs.Timestamp}
if err := rng.AddCmd(Put, pArgs, pReply, true); err != nil {
t.Fatal(err)
}
expMS = engine.MVCCStats{LiveBytes: 118, KeyBytes: 40, ValBytes: 78, IntentBytes: 28, LiveCount: 2, KeyCount: 2, ValCount: 2, IntentCount: 1}
verifyRangeStats(eng, rng.Meta.RangeID, expMS, t)

// Resolve the 2nd value.
rArgs := &proto.InternalResolveIntentRequest{
RequestHeader: proto.RequestHeader{
Timestamp: pArgs.Txn.Timestamp,
Key: pArgs.Key,
Replica: proto.Replica{RangeID: 0},
Txn: pArgs.Txn,
},
}
rArgs.Txn.Status = proto.COMMITTED
rReply := &proto.InternalResolveIntentResponse{}
if err := rng.AddCmd(InternalResolveIntent, rArgs, rReply, true); err != nil {
t.Fatal(err)
}
expMS = engine.MVCCStats{LiveBytes: 88, KeyBytes: 40, ValBytes: 48, IntentBytes: 0, LiveCount: 2, KeyCount: 2, ValCount: 2, IntentCount: 0}
verifyRangeStats(eng, rng.Meta.RangeID, expMS, t)

// Delete the 1st value.
dArgs, dReply := deleteArgs([]byte("a"), 0)
dArgs.Timestamp = clock.Now()
if err := rng.AddCmd(Delete, dArgs, dReply, true); err != nil {
t.Fatal(err)
}
expMS = engine.MVCCStats{LiveBytes: 44, KeyBytes: 56, ValBytes: 50, IntentBytes: 0, LiveCount: 1, KeyCount: 2, ValCount: 3, IntentCount: 0}
verifyRangeStats(eng, rng.Meta.RangeID, expMS, t)
}
2 changes: 2 additions & 0 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (s *Store) BootstrapRangeMetadata() *proto.RangeMetadata {
}

// The following methods are accessors implementation the RangeManager interface.
func (s *Store) StoreID() int32 { return s.Ident.StoreID }
func (s *Store) Clock() *hlc.Clock { return s.clock }
func (s *Store) Engine() engine.Engine { return s.engine }
func (s *Store) Allocator() *allocator { return s.allocator }
Expand Down Expand Up @@ -577,6 +578,7 @@ func (s *Store) maybeResolveWriteIntentError(rng *Range, method string, args pro
// (i.e. splitting and merging) operations.
type RangeManager interface {
// Accessors for shared state.
StoreID() int32
Clock() *hlc.Clock
Engine() engine.Engine
Allocator() *allocator
Expand Down