From f7559abd4d1cea25749f15f52414781d97675490 Mon Sep 17 00:00:00 2001 From: Spencer Kimball Date: Mon, 13 Oct 2014 20:58:36 -0400 Subject: [PATCH] Flush MVCC stats to range and store counters. On successful execution of a range read-write command, flush accumulated mvcc stats to merging counters for range and store. --- storage/engine/keys.go | 56 ++++++++++++++++++++++--- storage/engine/mvcc.go | 52 +++++++++++++++++++++++ storage/range.go | 8 ++-- storage/range_test.go | 93 ++++++++++++++++++++++++++++++++++++++++++ storage/store.go | 2 + 5 files changed, 201 insertions(+), 10 deletions(-) diff --git a/storage/engine/keys.go b/storage/engine/keys.go index d3480d2cc989..af99edc030fe 100644 --- a/storage/engine/keys.go +++ b/storage/engine/keys.go @@ -207,12 +207,56 @@ func ValidateRangeMetaKey(key Key) error { return nil } +// MakeRangeStatKey returns the key for accessing the named stat +// for the specified range ID. +func MakeRangeStatKey(rangeID int64, stat Key) Key { + encRangeID := encoding.EncodeInt(nil, rangeID) + return MakeKey(KeyLocalRangeStatPrefix, encRangeID, stat) +} + +// MakeStoreStatKey returns the key for accessing the named stat +// for the specified store ID. +func MakeStoreStatKey(storeID int32, stat Key) Key { + encStoreID := encoding.EncodeInt(nil, int64(storeID)) + return MakeKey(KeyLocalStoreStatPrefix, encStoreID, stat) +} + func init() { if KeyLocalPrefixLength%7 != 0 { log.Fatal("local key prefix is not a multiple of 7: %d", KeyLocalPrefixLength) } } +// Constants for key construction. +var ( + // StatLiveBytes counts how many bytes are "live", including bytes + // from both keys and values. Live rows include only non-deleted + // keys and only the most recent value. + StatLiveBytes = Key("live-bytes") + // StatKeyBytes counts how many bytes are used to store all keys, + // including bytes from deleted keys. Key bytes are re-counted for + // each versioned value. + StatKeyBytes = Key("key-bytes") + // StatValBytes counts how many bytes are used to store all values, + // including all historical versions and deleted tombstones. + StatValBytes = Key("val-bytes") + // StatIntentBytes counts how many bytes are used to store values + // which are unresolved intents. Includes bytes used for both intent + // keys and values. + StatIntentBytes = Key("intent-bytes") + // StatLiveCount counts how many keys are "live". This includes only + // non-deleted keys. + StatLiveCount = Key("live-count") + // StatKeyCount counts the total number of keys, including both live + // and deleted keys. + StatKeyCount = Key("key-count") + // StatValCount counts the total number of values, including all + // historical versions and deleted tombstones. + StatValCount = Key("val-count") + // StatIntentCount counts the number of unresolved intents. + StatIntentCount = Key("intent-count") +) + // Constants for system-reserved keys in the KV map. var ( // KeyMaxLength is the maximum key length in bytes. This value is @@ -263,15 +307,15 @@ var ( // KeyLocalRangeMetadataPrefix is the prefix for keys storing range metadata. // The value is a struct of type RangeMetadata. KeyLocalRangeMetadataPrefix = MakeKey(KeyLocalPrefix, Key("rng-")) + // KeyLocalRangeStatPrefix is the prefix for range statistics. + KeyLocalRangeStatPrefix = MakeKey(KeyLocalPrefix, Key("rst-")) // KeyLocalResponseCachePrefix is the prefix for keys storing command - // responses used to guarantee idempotency (see ResponseCache). This key - // prefix is duplicated in rocksdb_compaction.cc and must be kept in sync - // if modified here. + // responses used to guarantee idempotency (see ResponseCache). KeyLocalResponseCachePrefix = MakeKey(KeyLocalPrefix, Key("res-")) + // KeyLocalStoreStatPrefix is the prefix for store statistics. + KeyLocalStoreStatPrefix = MakeKey(KeyLocalPrefix, Key("sst-")) // KeyLocalTransactionPrefix specifies the key prefix for - // transaction records. The suffix is the transaction id. This key - // prefix is duplicated in rocksdb_compaction.cc and must be kept in - // sync if modified here. + // transaction records. The suffix is the transaction id. KeyLocalTransactionPrefix = MakeKey(KeyLocalPrefix, Key("txn-")) // KeyLocalSnapshotIDGenerator is a snapshot ID generator sequence. // Snapshot IDs must be unique per store ID. diff --git a/storage/engine/mvcc.go b/storage/engine/mvcc.go index 4d7b5e354126..d1d2f53c15c7 100644 --- a/storage/engine/mvcc.go +++ b/storage/engine/mvcc.go @@ -37,6 +37,21 @@ const ( scanRowCount = int64(1 << 8) ) +// encodeMVCCStatValue constructs a proto.Value using the supplied +// stat increment and then encodes that into a byte slice. Encoding +// errors cause panice (as they should never happen). Returns false +// if stat is equal to 0 to avoid unnecessary merge. +func encodeMVCCStatValue(stat int64) (ok bool, enc []byte) { + if stat == 0 { + return false, nil + } + data, err := gogoproto.Marshal(&proto.Value{Integer: gogoproto.Int64(stat)}) + if err != nil { + panic("could not marshal proto.Value") + } + return true, data +} + // MVCCStats tracks byte and instance counts for: // - Live key/values (i.e. what a scan at current time will reveal; // note that this includes intent keys and values, but not keys and @@ -615,6 +630,43 @@ func (mvcc *MVCC) ResolveWriteIntentRange(key, endKey Key, max int64, txn *proto return num, nil } +// FlushStats flushes stats to merge counters for both the affected +// range and store. +func (mvcc *MVCC) FlushStats(rangeID int64, storeID int32) { + if ok, encLiveBytes := encodeMVCCStatValue(mvcc.LiveBytes); ok { + mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatLiveBytes), encLiveBytes) + mvcc.batch.Merge(MakeStoreStatKey(storeID, StatLiveBytes), encLiveBytes) + } + if ok, encKeyBytes := encodeMVCCStatValue(mvcc.KeyBytes); ok { + mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatKeyBytes), encKeyBytes) + mvcc.batch.Merge(MakeStoreStatKey(storeID, StatKeyBytes), encKeyBytes) + } + if ok, encValBytes := encodeMVCCStatValue(mvcc.ValBytes); ok { + mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatValBytes), encValBytes) + mvcc.batch.Merge(MakeStoreStatKey(storeID, StatValBytes), encValBytes) + } + if ok, encIntentBytes := encodeMVCCStatValue(mvcc.IntentBytes); ok { + mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatIntentBytes), encIntentBytes) + mvcc.batch.Merge(MakeStoreStatKey(storeID, StatIntentBytes), encIntentBytes) + } + if ok, encLiveCount := encodeMVCCStatValue(mvcc.LiveCount); ok { + mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatLiveCount), encLiveCount) + mvcc.batch.Merge(MakeStoreStatKey(storeID, StatLiveCount), encLiveCount) + } + if ok, encKeyCount := encodeMVCCStatValue(mvcc.KeyCount); ok { + mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatKeyCount), encKeyCount) + mvcc.batch.Merge(MakeStoreStatKey(storeID, StatKeyCount), encKeyCount) + } + if ok, encValCount := encodeMVCCStatValue(mvcc.ValCount); ok { + mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatValCount), encValCount) + mvcc.batch.Merge(MakeStoreStatKey(storeID, StatValCount), encValCount) + } + if ok, encIntentCount := encodeMVCCStatValue(mvcc.IntentCount); ok { + mvcc.batch.Merge(MakeRangeStatKey(rangeID, StatIntentCount), encIntentCount) + mvcc.batch.Merge(MakeStoreStatKey(storeID, StatIntentCount), encIntentCount) + } +} + // a splitSampleItem wraps a key along with an aggregate over key range // preceding it. type splitSampleItem struct { diff --git a/storage/range.go b/storage/range.go index 66caf3f9a6a8..2090142b4386 100644 --- a/storage/range.go +++ b/storage/range.go @@ -678,14 +678,14 @@ func (r *Range) executeCmd(method string, args proto.Request, reply proto.Respon return util.Errorf("unrecognized command type: %s", method) } - // Commit the batch on success. - if reply.Header().Error == nil { + // On success, flush the MVCC stats to the batch and commit. + if !IsReadOnly(method) && reply.Header().Error == nil { + mvcc.FlushStats(r.Meta.RangeID, r.rm.StoreID()) reply.Header().SetGoError(batch.Commit()) } // Maybe update gossip configs on a put if there was no error. - if (method == Put || method == ConditionalPut) && - reply.Header().Error == nil { + if (method == Put || method == ConditionalPut) && reply.Header().Error == nil { r.maybeUpdateGossipConfigs(args.Header().Key) } diff --git a/storage/range_test.go b/storage/range_test.go index 499263ca6c36..d405d33e63d7 100644 --- a/storage/range_test.go +++ b/storage/range_test.go @@ -355,6 +355,18 @@ func putArgs(key, value []byte, rangeID int64) (*proto.PutRequest, *proto.PutRes return args, reply } +// deleteArgs returns a DeleteRequest and DeleteResponse pair. +func deleteArgs(key engine.Key, rangeID int64) (*proto.DeleteRequest, *proto.DeleteResponse) { + args := &proto.DeleteRequest{ + RequestHeader: proto.RequestHeader{ + Key: key, + Replica: proto.Replica{RangeID: rangeID}, + }, + } + reply := &proto.DeleteResponse{} + return args, reply +} + // readOrWriteArgs returns either get or put arguments depending on // value of "read". Get for true; Put for false. Returns method // selected and args & reply. @@ -1403,3 +1415,84 @@ func TestInternalPushTxnPushTimestampAlreadyPushed(t *testing.T) { t.Errorf("expected pushed txn to have status PENDING; got %s", reply.PusheeTxn.Status) } } + +func verifyStatCounter(eng engine.Engine, rangeID int64, stat engine.Key, expVal int64, t *testing.T) { + statVal := &proto.Value{} + ok, err := engine.GetProto(eng, engine.MakeRangeStatKey(rangeID, stat), statVal) + if err != nil { + t.Fatal(err) + } + if expVal == 0 && !ok { + return + } else if expVal != 0 && !ok { + t.Errorf("stat %q missing", stat) + } + if statVal.GetInteger() != expVal { + t.Errorf("expected stat %q to have value %d; got %d", stat, expVal, statVal.GetInteger()) + } +} + +func verifyRangeStats(eng engine.Engine, rangeID int64, expMS engine.MVCCStats, t *testing.T) { + verifyStatCounter(eng, rangeID, engine.StatLiveBytes, expMS.LiveBytes, t) + verifyStatCounter(eng, rangeID, engine.StatKeyBytes, expMS.KeyBytes, t) + verifyStatCounter(eng, rangeID, engine.StatValBytes, expMS.ValBytes, t) + verifyStatCounter(eng, rangeID, engine.StatIntentBytes, expMS.IntentBytes, t) + verifyStatCounter(eng, rangeID, engine.StatLiveCount, expMS.LiveCount, t) + verifyStatCounter(eng, rangeID, engine.StatKeyCount, expMS.KeyCount, t) + verifyStatCounter(eng, rangeID, engine.StatValCount, expMS.ValCount, t) + verifyStatCounter(eng, rangeID, engine.StatIntentCount, expMS.IntentCount, t) +} + +// TestRangeStats verifies that commands executed against a range +// update the range stat counters. The stat values are empirically +// derived; we're really just testing that they increment in the right +// ways, not the exact amounts. If the encodings change, will need to +// update this test. +func TestRangeStats(t *testing.T) { + rng, _, clock, eng := createTestRangeWithClock(t) + defer rng.Stop() + // Put a value. + pArgs, pReply := putArgs([]byte("a"), []byte("value1"), 0) + pArgs.Timestamp = clock.Now() + if err := rng.AddCmd(Put, pArgs, pReply, true); err != nil { + t.Fatal(err) + } + expMS := engine.MVCCStats{LiveBytes: 44, KeyBytes: 20, ValBytes: 24, IntentBytes: 0, LiveCount: 1, KeyCount: 1, ValCount: 1, IntentCount: 0} + verifyRangeStats(eng, rng.Meta.RangeID, expMS, t) + + // Put a 2nd value transactionally. + pArgs, pReply = putArgs([]byte("b"), []byte("value2"), 0) + pArgs.Timestamp = clock.Now() + pArgs.Txn = &proto.Transaction{ID: []byte("txn1"), Timestamp: pArgs.Timestamp} + if err := rng.AddCmd(Put, pArgs, pReply, true); err != nil { + t.Fatal(err) + } + expMS = engine.MVCCStats{LiveBytes: 118, KeyBytes: 40, ValBytes: 78, IntentBytes: 28, LiveCount: 2, KeyCount: 2, ValCount: 2, IntentCount: 1} + verifyRangeStats(eng, rng.Meta.RangeID, expMS, t) + + // Resolve the 2nd value. + rArgs := &proto.InternalResolveIntentRequest{ + RequestHeader: proto.RequestHeader{ + Timestamp: pArgs.Txn.Timestamp, + Key: pArgs.Key, + Replica: proto.Replica{RangeID: 0}, + Txn: pArgs.Txn, + }, + } + rArgs.Txn.Status = proto.COMMITTED + rReply := &proto.InternalResolveIntentResponse{} + if err := rng.AddCmd(InternalResolveIntent, rArgs, rReply, true); err != nil { + t.Fatal(err) + } + expMS = engine.MVCCStats{LiveBytes: 88, KeyBytes: 40, ValBytes: 48, IntentBytes: 0, LiveCount: 2, KeyCount: 2, ValCount: 2, IntentCount: 0} + verifyRangeStats(eng, rng.Meta.RangeID, expMS, t) + + // Delete the 1st value. + dArgs, dReply := deleteArgs([]byte("a"), 0) + dArgs.Timestamp = clock.Now() + if err := rng.AddCmd(Delete, dArgs, dReply, true); err != nil { + t.Fatal(err) + } + expMS = engine.MVCCStats{LiveBytes: 44, KeyBytes: 56, ValBytes: 50, IntentBytes: 0, LiveCount: 1, KeyCount: 2, ValCount: 3, IntentCount: 0} + verifyRangeStats(eng, rng.Meta.RangeID, expMS, t) +} diff --git a/storage/store.go b/storage/store.go index 21a9a38af1ac..f397d641c3a9 100644 --- a/storage/store.go +++ b/storage/store.go @@ -325,6 +325,7 @@ func (s *Store) BootstrapRangeMetadata() *proto.RangeMetadata { } // The following methods are accessors implementation the RangeManager interface. +func (s *Store) StoreID() int32 { return s.Ident.StoreID } func (s *Store) Clock() *hlc.Clock { return s.clock } func (s *Store) Engine() engine.Engine { return s.engine } func (s *Store) Allocator() *allocator { return s.allocator } @@ -577,6 +578,7 @@ func (s *Store) maybeResolveWriteIntentError(rng *Range, method string, args pro // (i.e. splitting and merging) operations. type RangeManager interface { // Accessors for shared state. + StoreID() int32 Clock() *hlc.Clock Engine() engine.Engine Allocator() *allocator