From d39705c9c6752c29181ff75f3a4712d5a23513b6 Mon Sep 17 00:00:00 2001 From: Tristan Rice Date: Wed, 8 Jun 2016 18:11:57 -0400 Subject: [PATCH] storage: use raft log size (in bytes) as metric for truncation The raft log currently truncates based of the number of entries (max 10000). These entries can be of any size so it makes more sense to use the byte size of the raft log as a metric for truncation. See #7065. --- keys/constants.go | 2 + keys/keys.go | 5 ++ storage/client_raft_log_queue_test.go | 19 ++++- storage/client_split_test.go | 4 +- storage/helpers_test.go | 5 ++ storage/raft_log_queue.go | 89 ++++++++++++++++------- storage/raft_log_queue_test.go | 38 +++------- storage/replica.go | 23 +++++- storage/replica_command.go | 18 ++++- storage/replica_raftstorage.go | 83 ++++++++++++++++++--- storage/storagebase/state.pb.go | 101 +++++++++++++++++--------- storage/storagebase/state.proto | 1 + ui/next/app/js/protos.js | 6 ++ ui/next/generated/protos.d.ts | 9 +++ ui/next/generated/protos.json | 6 ++ 15 files changed, 299 insertions(+), 110 deletions(-) diff --git a/keys/constants.go b/keys/constants.go index 7d249f25525c..6569be14e001 100644 --- a/keys/constants.go +++ b/keys/constants.go @@ -119,6 +119,8 @@ var ( localRaftLastIndexSuffix = []byte("rfti") // LocalRaftLogSuffix is the suffix for the raft log. LocalRaftLogSuffix = []byte("rftl") + // LocalRaftLogSizeSuffix is the suffix for the size of the raft log. + localRaftLogSizeSuffix = []byte("rfts") // localRangeLastReplicaGCTimestampSuffix is the suffix for a range's // last replica GC timestamp (for GC of old replicas). localRangeLastReplicaGCTimestampSuffix = []byte("rlrt") diff --git a/keys/keys.go b/keys/keys.go index fd00922d088f..adbaedfb68c9 100644 --- a/keys/keys.go +++ b/keys/keys.go @@ -240,6 +240,11 @@ func RaftLogKey(rangeID roachpb.RangeID, logIndex uint64) roachpb.Key { return key } +// RaftLogSizeKey returns a system-local key for the size of the Raft log. +func RaftLogSizeKey(rangeID roachpb.RangeID) roachpb.Key { + return MakeRangeIDUnreplicatedKey(rangeID, localRaftLogSizeSuffix, nil) +} + // RangeLastReplicaGCTimestampKey returns a range-local key for // the range's last replica GC timestamp. func RangeLastReplicaGCTimestampKey(rangeID roachpb.RangeID) roachpb.Key { diff --git a/storage/client_raft_log_queue_test.go b/storage/client_raft_log_queue_test.go index e28464345da4..83a6b99b6945 100644 --- a/storage/client_raft_log_queue_test.go +++ b/storage/client_raft_log_queue_test.go @@ -17,11 +17,13 @@ package storage_test import ( + "bytes" "fmt" "testing" "time" "github.com/cockroachdb/cockroach/client" + "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/storage" "github.com/cockroachdb/cockroach/util" "github.com/cockroachdb/cockroach/util/leaktest" @@ -34,6 +36,13 @@ func TestRaftLogQueue(t *testing.T) { var mtc multiTestContext + // Set maxBytes to something small so we can trigger the raft log truncation + // without adding 64MB of logs. + const maxBytes = 1 << 16 + defer config.TestingSetDefaultZoneConfig(config.ZoneConfig{ + RangeMaxBytes: maxBytes, + })() + // Turn off raft elections so the raft leader won't change out from under // us in this test. sc := storage.TestStoreContext() @@ -61,9 +70,15 @@ func TestRaftLogQueue(t *testing.T) { t.Fatal(err) } + // Disable splits since we're increasing the raft log with puts. + for _, store := range mtc.stores { + store.DisableSplitQueue(true) + } + // Write a collection of values to increase the raft log. - for i := 0; i < storage.RaftLogQueueStaleThreshold+1; i++ { - pArgs = putArgs([]byte(fmt.Sprintf("key-%d", i)), []byte("value")) + value := bytes.Repeat([]byte("a"), 1000) // 1KB + for size := int64(0); size < 2*maxBytes; size += int64(len(value)) { + pArgs = putArgs([]byte(fmt.Sprintf("key-%d", size)), value) if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &pArgs); err != nil { t.Fatal(err) } diff --git a/storage/client_split_test.go b/storage/client_split_test.go index 430051614389..ea2029bb6450 100644 --- a/storage/client_split_test.go +++ b/storage/client_split_test.go @@ -539,7 +539,7 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) { config.TestingSetupZoneConfigHook(stopper) defer stopper.Stop() - maxBytes := int64(1 << 16) + const maxBytes = 1 << 16 // Set max bytes. descID := uint32(keys.MaxReservedDescID + 1) config.TestingSetZoneConfig(descID, &config.ZoneConfig{RangeMaxBytes: maxBytes}) @@ -597,7 +597,7 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) { origRng := store.LookupReplica(roachpb.RKeyMin, nil) // Set max bytes. - maxBytes := int64(1 << 16) + const maxBytes = 1 << 16 descID := uint32(keys.MaxReservedDescID + 1) config.TestingSetZoneConfig(descID, &config.ZoneConfig{RangeMaxBytes: maxBytes}) diff --git a/storage/helpers_test.go b/storage/helpers_test.go index ecc08698dfee..b96fff1b7974 100644 --- a/storage/helpers_test.go +++ b/storage/helpers_test.go @@ -84,6 +84,11 @@ func (s *Store) DisableRaftLogQueue(disabled bool) { s.raftLogQueue.SetDisabled(disabled) } +// DisableSplitQueue disables or enables the replica split queue. +func (s *Store) DisableSplitQueue(disabled bool) { + s.splitQueue.SetDisabled(disabled) +} + // ForceRaftLogScanAndProcess iterates over all ranges and enqueues any that // need their raft logs truncated and then process each of them. func (s *Store) ForceRaftLogScanAndProcess() { diff --git a/storage/raft_log_queue.go b/storage/raft_log_queue.go index 2d37d90685f8..f2f296de6794 100644 --- a/storage/raft_log_queue.go +++ b/storage/raft_log_queue.go @@ -20,22 +20,24 @@ import ( "math" "time" + "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" + "github.com/cockroachdb/cockroach/keys" "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/storage/engine" "github.com/cockroachdb/cockroach/util" "github.com/cockroachdb/cockroach/util/hlc" "github.com/cockroachdb/cockroach/util/log" "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" ) const ( // raftLogQueueMaxSize is the max size of the queue. raftLogQueueMaxSize = 100 - // raftLogPadding is the number of log entries that should be kept for - // lagging replicas. - raftLogPadding = 10000 // RaftLogQueueTimerDuration is the duration between truncations. This needs // to be relatively short so that truncations can keep up with raft log entry // creation. @@ -67,8 +69,10 @@ func newRaftLogQueue(db *client.DB, gossip *gossip.Gossip) *raftLogQueue { } // getTruncatableIndexes returns the total number of stale raft log entries that -// can be truncated and the oldest index that cannot be pruned. -func getTruncatableIndexes(r *Replica) (uint64, uint64, error) { +// can be truncated and the oldest index that cannot be pruned. If estimate is +// true, it returns a resource cheap estimate that can be used for scheduling +// purposes. +func getTruncatableIndexes(r *Replica, estimate bool) (uint64, uint64, error) { rangeID := r.RangeID // TODO(bram): r.store.RaftStatus(rangeID) differs from r.RaftStatus() in // tests, causing TestGetTruncatableIndexes to fail. Figure out why and fix. @@ -86,12 +90,36 @@ func getTruncatableIndexes(r *Replica) (uint64, uint64, error) { return 0, 0, nil } - // Calculate the quorum matched index and adjust based on padding. - oldestIndex := getQuorumMatchedIndex(raftStatus, raftLogPadding) - r.mu.Lock() - defer r.mu.Unlock() - firstIndex, err := r.FirstIndex() + raftLogSize := r.mu.raftLogSize + targetSize := r.mu.maxBytes + r.mu.Unlock() + + // Always truncate the oldest raft log entry which has been committed by all replicas. + oldestIndex := getMaximumMatchedIndex(raftStatus) + + // Truncate raft logs if the log is too big. + if raftLogSize > targetSize { + if estimate { + // When estimating assume that the truncate threshold has been reached. + oldestIndex += RaftLogQueueStaleThreshold + } else { + oldestSizeIndex, err := getLogIndexForSize(r.store.Engine(), r.RangeID, raftLogSize, targetSize) + if err != nil { + return 0, 0, err + } + if oldestSizeIndex > oldestIndex { + oldestIndex = oldestSizeIndex + } + } + } + + // Never truncate uncommitted logs. + if oldestIndex > raftStatus.Commit { + oldestIndex = raftStatus.Commit + } + + firstIndex, err := r.GetFirstIndex() if err != nil { return 0, 0, util.Errorf("error retrieving first index for range %d: %s", rangeID, err) } @@ -111,7 +139,7 @@ func getTruncatableIndexes(r *Replica) (uint64, uint64, error) { func (*raftLogQueue) shouldQueue( now hlc.Timestamp, r *Replica, _ config.SystemConfig, ) (shouldQ bool, priority float64) { - truncatableIndexes, _, err := getTruncatableIndexes(r) + truncatableIndexes, _, err := getTruncatableIndexes(r, true /* estimate */) if err != nil { log.Warning(err) return false, 0 @@ -124,7 +152,7 @@ func (*raftLogQueue) shouldQueue( // leader and if the total number of the range's raft log's stale entries // exceeds RaftLogQueueStaleThreshold. func (rlq *raftLogQueue) process(now hlc.Timestamp, r *Replica, _ config.SystemConfig) error { - truncatableIndexes, oldestIndex, err := getTruncatableIndexes(r) + truncatableIndexes, oldestIndex, err := getTruncatableIndexes(r, false /* estimate */) if err != nil { return err } @@ -155,26 +183,31 @@ func (*raftLogQueue) purgatoryChan() <-chan struct{} { return nil } -// getQuorumMatchedIndex returns the index which a quorum of the nodes have -// committed. The returned value is adjusted by padding to allow retaining -// additional entries, but this adjustment is limited so that we won't keep -// entries which all nodes have matched. -func getQuorumMatchedIndex(raftStatus *raft.Status, padding uint64) uint64 { - index := raftStatus.Commit - if index >= padding { - index -= padding - } else { - index = 0 - } - +// getMaximumMatchedIndex returns the index that has been committed by every +// node in the raft group. +func getMaximumMatchedIndex(raftStatus *raft.Status) uint64 { smallestMatch := uint64(math.MaxUint64) for _, progress := range raftStatus.Progress { if smallestMatch > progress.Match { smallestMatch = progress.Match } } - if index < smallestMatch { - index = smallestMatch - } - return index + return smallestMatch +} + +// getLogIndexForSize returns the oldest raft log index that keeps the log size +// under the target size. If there are no log entries 0 is returned. +func getLogIndexForSize(eng engine.Engine, rangeID roachpb.RangeID, currentSize, targetSize int64) (uint64, error) { + prefix := keys.RaftLogPrefix(rangeID) + var entry raftpb.Entry + _, err := engine.MVCCIterate(context.Background(), eng, prefix, prefix.PrefixEnd(), + hlc.ZeroTimestamp, true /* consistent */, nil /* txn */, false, /* reverse */ + func(kv roachpb.KeyValue) (bool, error) { + currentSize -= int64(kv.Size()) + if currentSize < targetSize { + return true, kv.Value.GetProto(&entry) + } + return false, nil + }) + return entry.Index, err } diff --git a/storage/raft_log_queue_test.go b/storage/raft_log_queue_test.go index 4b24e934769f..552415e41bc8 100644 --- a/storage/raft_log_queue_test.go +++ b/storage/raft_log_queue_test.go @@ -25,46 +25,32 @@ import ( "github.com/cockroachdb/cockroach/util" "github.com/cockroachdb/cockroach/util/leaktest" "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" ) -func TestGetQuorumMatchedIndex(t *testing.T) { +func TestGetMaximumMatchedIndex(t *testing.T) { defer leaktest.AfterTest(t)() testCases := []struct { - commit uint64 progress []uint64 - padding uint64 expected uint64 }{ // Basic cases. - {1, []uint64{1}, 0, 1}, - {1, []uint64{1, 2}, 0, 1}, - {2, []uint64{1, 2, 3}, 0, 2}, - {2, []uint64{1, 2, 3, 4}, 0, 2}, - {3, []uint64{1, 2, 3, 4, 5}, 0, 3}, - // Sorting. - {3, []uint64{5, 4, 3, 2, 1}, 0, 3}, - // Padding. - {3, []uint64{1, 3, 3}, 1, 2}, - {3, []uint64{1, 3, 3}, 2, 1}, - {3, []uint64{1, 3, 3}, 3, 1}, - // Minimum progress value limits padding. - {3, []uint64{2, 3, 3}, 3, 2}, + {[]uint64{1}, 1}, + {[]uint64{1, 2}, 1}, + {[]uint64{2, 3, 4}, 2}, + // sorting. + {[]uint64{5, 4, 3, 2, 1}, 1}, } for i, c := range testCases { status := &raft.Status{ - HardState: raftpb.HardState{ - Commit: c.commit, - }, Progress: make(map[uint64]raft.Progress), } for j, v := range c.progress { status.Progress[uint64(j)] = raft.Progress{Match: v} } - quorumMatchedIndex := getQuorumMatchedIndex(status, c.padding) - if c.expected != quorumMatchedIndex { - t.Fatalf("%d: expected %d, but got %d", i, c.expected, quorumMatchedIndex) + index := getMaximumMatchedIndex(status) + if c.expected != index { + t.Fatalf("%d: expected %d, but got %d", i, c.expected, index) } } } @@ -84,7 +70,7 @@ func TestGetTruncatableIndexes(t *testing.T) { // Test on a new range which should not have a raft group yet. rngNew := createRange(store, 100, roachpb.RKey("a"), roachpb.RKey("c")) - truncatableIndexes, oldestIndex, err := getTruncatableIndexes(rngNew) + truncatableIndexes, oldestIndex, err := getTruncatableIndexes(rngNew, false) if err != nil { t.Errorf("expected no error, got %s", err) } @@ -116,7 +102,7 @@ func TestGetTruncatableIndexes(t *testing.T) { } } - truncatableIndexes, oldestIndex, err = getTruncatableIndexes(r) + truncatableIndexes, oldestIndex, err = getTruncatableIndexes(r, false) if err != nil { t.Errorf("expected no error, got %s", err) } @@ -152,7 +138,7 @@ func TestGetTruncatableIndexes(t *testing.T) { // client_raft_log_queue_test. util.SucceedsSoon(t, func() error { store.ForceRaftLogScanAndProcess() - truncatableIndexes, oldestIndex, err := getTruncatableIndexes(rngNew) + truncatableIndexes, oldestIndex, err := getTruncatableIndexes(rngNew, false) if err != nil { return util.Errorf("expected no error, got %s", err) } diff --git a/storage/replica.go b/storage/replica.go index c37be04e6512..c96a293c98ab 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -238,6 +238,8 @@ type Replica struct { cmdQ *CommandQueue // Last index persisted to the raft log (not necessarily committed). lastIndex uint64 + // The size of the persisted raft log. + raftLogSize int64 // Max bytes before split. maxBytes int64 // pendingCmds stores the Raft in-flight commands which @@ -365,6 +367,22 @@ func (r *Replica) newReplicaInner(desc *roachpb.RangeDescriptor, clock *hlc.Cloc if err != nil { return err } + + var found bool + r.mu.raftLogSize, found, err = loadRaftLogSize(r.store.Engine(), r.RangeID) + if err != nil { + return err + } + if !found { + if log.V(3) { + log.Infof("raft log size not found; computing for range %s", r.RangeID) + } + r.mu.raftLogSize, err = computeRaftLogSize(r.store.Engine(), r.RangeID, clock.PhysicalNow()) + if err != nil { + return err + } + } + if r.isInitializedLocked() && replicaID != 0 { return util.Errorf("replicaID must be 0 when creating an initialized replica") } @@ -907,6 +925,7 @@ func (r *Replica) State() storagebase.RangeInfo { ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagebase.ReplicaState) ri.LastIndex = r.mu.lastIndex ri.NumPending = uint64(len(r.mu.pendingCmds)) + ri.RaftLogSize = r.mu.raftLogSize return ri } @@ -1515,6 +1534,7 @@ func (r *Replica) handleRaftReady() error { var rd raft.Ready r.mu.Lock() lastIndex := r.mu.lastIndex // used for append below + raftLogSize := r.mu.raftLogSize err := r.withRaftGroupLocked(func(raftGroup *raft.RawNode) error { if hasReady = raftGroup.HasReady(); hasReady { rd = raftGroup.Ready() @@ -1559,13 +1579,14 @@ func (r *Replica) handleRaftReady() error { // All of the entries are appended to distinct keys, returning a new // last index. var err error - if lastIndex, err = r.append(writer, lastIndex, rd.Entries); err != nil { + if lastIndex, raftLogSize, err = r.append(writer, lastIndex, raftLogSize, rd.Entries); err != nil { return err } batch.Defer(func() { // Update last index on commit. r.mu.Lock() r.mu.lastIndex = lastIndex + r.mu.raftLogSize = raftLogSize r.mu.Unlock() }) diff --git a/storage/replica_command.go b/storage/replica_command.go index 9192cf2837b4..b450a4b5fe9c 100644 --- a/storage/replica_command.go +++ b/storage/replica_command.go @@ -1439,14 +1439,23 @@ func (r *Replica) TruncateLog( if err != nil { return reply, err } + + raftLogSize := r.mu.raftLogSize start := keys.RaftLogKey(r.RangeID, 0) end := keys.RaftLogKey(r.RangeID, args.Index) - if err = batch.Iterate(engine.MakeMVCCMetadataKey(start), engine.MakeMVCCMetadataKey(end), - func(kv engine.MVCCKeyValue) (bool, error) { - return false, batch.Clear(kv.Key) - }); err != nil { + _, err = engine.MVCCIterate(ctx, batch, start, end, hlc.ZeroTimestamp, true, /* consistent */ + nil /* txn */, false /* reverse */, func(kv roachpb.KeyValue) (bool, error) { + raftLogSize -= int64(kv.Size()) + return false, batch.Clear(engine.MakeMVCCMetadataKey(kv.Key)) + }) + if err != nil { return reply, err } + + if err := setRaftLogSize(batch, r.RangeID, raftLogSize); err != nil { + return reply, err + } + tState := &roachpb.RaftTruncatedState{ Index: args.Index - 1, Term: term, @@ -1455,6 +1464,7 @@ func (r *Replica) TruncateLog( batch.(engine.Batch).Defer(func() { r.mu.Lock() r.mu.state.TruncatedState = tState + r.mu.raftLogSize = raftLogSize r.mu.Unlock() }) return reply, engine.MVCCPutProto(ctx, batch, ms, keys.RaftTruncatedStateKey(r.RangeID), hlc.ZeroTimestamp, nil, tState) diff --git a/storage/replica_raftstorage.go b/storage/replica_raftstorage.go index 78316de61c17..818d07913831 100644 --- a/storage/replica_raftstorage.go +++ b/storage/replica_raftstorage.go @@ -391,6 +391,50 @@ func setLastIndex(eng engine.ReadWriter, rangeID roachpb.RangeID, lastIndex uint nil /* txn */) } +// loadRaftLogSize retrieves the raft log size of range from storage and whether +// it was found in the database. +func loadRaftLogSize(eng engine.Reader, rangeID roachpb.RangeID) (int64, bool, error) { + v, _, err := engine.MVCCGet(context.Background(), eng, keys.RaftLogSizeKey(rangeID), + hlc.ZeroTimestamp, true /* consistent */, nil /* txn */) + if err != nil { + return 0, false, err + } + if v != nil { + raftLogSize, err := v.GetInt() + if err != nil { + return 0, false, err + } + return raftLogSize, true, nil + } + return 0, false, nil +} + +// computeRaftLogSize scans the raft log to compute the size in bytes. +func computeRaftLogSize(eng engine.Reader, rangeID roachpb.RangeID, nowNanos int64) (int64, error) { + prefix := keys.RaftLogPrefix(rangeID) + var size int64 + _, err := engine.MVCCIterate(context.Background(), eng, prefix, prefix.PrefixEnd(), + hlc.ZeroTimestamp, true /* consistent */, nil /* txn */, false, /* reverse */ + func(kv roachpb.KeyValue) (bool, error) { + size += int64(kv.Size()) + return false, nil + }) + if err != nil { + return 0, err + } + // Raft log size is counted in system local bytes. + return size, nil +} + +// setRaftLogSize persists a new last index. +func setRaftLogSize(eng engine.ReadWriter, rangeID roachpb.RangeID, raftLogSize int64) error { + var value roachpb.Value + value.SetInt(raftLogSize) + + return engine.MVCCPut(context.Background(), eng, nil, keys.RaftLogSizeKey(rangeID), + hlc.ZeroTimestamp, value, nil /* txn */) +} + // Snapshot implements the raft.Storage interface. // Snapshot requires that the replica lock is held. func (r *Replica) Snapshot() (raftpb.Snapshot, error) { @@ -595,32 +639,50 @@ func snapshot( // of r.lastIndex and returns a new value. We do this rather than // modifying r.lastIndex directly because this modification needs to // be atomic with the commit of the batch. -func (r *Replica) append(batch engine.ReadWriter, prevLastIndex uint64, entries []raftpb.Entry) (uint64, error) { +func (r *Replica) append(batch engine.ReadWriter, prevLastIndex uint64, prevRaftLogSize int64, entries []raftpb.Entry) (uint64, int64, error) { if len(entries) == 0 { - return prevLastIndex, nil + return prevLastIndex, prevRaftLogSize, nil } + ctx := context.Background() + raftLogSize := prevRaftLogSize for i := range entries { ent := &entries[i] key := keys.RaftLogKey(r.RangeID, ent.Index) - if err := engine.MVCCPutProto(context.Background(), batch, nil, key, hlc.ZeroTimestamp, nil, ent); err != nil { - return 0, err + + value := roachpb.Value{} + if err := value.SetProto(ent); err != nil { + return 0, 0, err + } + value.InitChecksum(key) + raftLogSize += int64((&roachpb.KeyValue{Key: key, Value: value}).Size()) + + if err := engine.MVCCPut(ctx, batch, nil /* ms */, key, hlc.ZeroTimestamp, value, nil /* txn */); err != nil { + return 0, 0, err } } lastIndex := entries[len(entries)-1].Index // Delete any previously appended log entries which never committed. for i := lastIndex + 1; i <= prevLastIndex; i++ { - err := engine.MVCCDelete(context.Background(), batch, nil, - keys.RaftLogKey(r.RangeID, i), hlc.ZeroTimestamp, nil) + key := keys.RaftLogKey(r.RangeID, i) + value, _, err := engine.MVCCGet(ctx, batch, key, hlc.ZeroTimestamp, true /* consistent */, nil /* txn */) if err != nil { - return 0, err + return 0, 0, err } + raftLogSize -= int64((&roachpb.KeyValue{Key: key, Value: *value}).Size()) + if err := batch.Clear(engine.MakeMVCCMetadataKey(key)); err != nil { + return 0, 0, err + } + } + + if err := setRaftLogSize(batch, r.RangeID, raftLogSize); err != nil { + return 0, 0, err } if err := setLastIndex(batch, r.RangeID, lastIndex); err != nil { - return 0, err + return 0, 0, err } - return lastIndex, nil + return lastIndex, raftLogSize, nil } // updateRangeInfo is called whenever a range is updated by ApplySnapshot @@ -655,7 +717,6 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error { } // applySnapshot updates the replica based on the given snapshot. -// Returns the new last index. func (r *Replica) applySnapshot(batch engine.Batch, snap raftpb.Snapshot) error { snapData := roachpb.RaftSnapshotData{} err := proto.Unmarshal(snap.Data, &snapData) @@ -713,7 +774,7 @@ func (r *Replica) applySnapshot(batch engine.Batch, snap raftpb.Snapshot) error } // Write the snapshot's Raft log into the range. - if _, err := r.append(batch, 0, logEntries); err != nil { + if _, _, err := r.append(batch, 0, 0, logEntries); err != nil { return err } diff --git a/storage/storagebase/state.pb.go b/storage/storagebase/state.pb.go index 063268e17ab8..e7f28c31acb6 100644 --- a/storage/storagebase/state.pb.go +++ b/storage/storagebase/state.pb.go @@ -70,8 +70,9 @@ func (*ReplicaState) Descriptor() ([]byte, []int) { return fileDescriptorState, type RangeInfo struct { ReplicaState `protobuf:"bytes,1,opt,name=state,embedded=state" json:"state"` // The highest (and last) index in the Raft log. - LastIndex uint64 `protobuf:"varint,2,opt,name=lastIndex,proto3" json:"lastIndex,omitempty"` - NumPending uint64 `protobuf:"varint,3,opt,name=num_pending,json=numPending,proto3" json:"num_pending,omitempty"` + LastIndex uint64 `protobuf:"varint,2,opt,name=lastIndex,proto3" json:"lastIndex,omitempty"` + NumPending uint64 `protobuf:"varint,3,opt,name=num_pending,json=numPending,proto3" json:"num_pending,omitempty"` + RaftLogSize int64 `protobuf:"varint,4,opt,name=raft_log_size,json=raftLogSize,proto3" json:"raft_log_size,omitempty"` } func (m *RangeInfo) Reset() { *m = RangeInfo{} } @@ -200,6 +201,11 @@ func (m *RangeInfo) MarshalTo(data []byte) (int, error) { i++ i = encodeVarintState(data, i, uint64(m.NumPending)) } + if m.RaftLogSize != 0 { + data[i] = 0x20 + i++ + i = encodeVarintState(data, i, uint64(m.RaftLogSize)) + } return i, nil } @@ -272,6 +278,9 @@ func (m *RangeInfo) Size() (n int) { if m.NumPending != 0 { n += 1 + sovState(uint64(m.NumPending)) } + if m.RaftLogSize != 0 { + n += 1 + sovState(uint64(m.RaftLogSize)) + } return n } @@ -652,6 +661,25 @@ func (m *RangeInfo) Unmarshal(data []byte) error { break } } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RaftLogSize", wireType) + } + m.RaftLogSize = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowState + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.RaftLogSize |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipState(data[iNdEx:]) @@ -779,38 +807,39 @@ var ( ) var fileDescriptorState = []byte{ - // 520 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x92, 0x5d, 0x6f, 0xd3, 0x3c, - 0x14, 0xc7, 0x9b, 0x67, 0x69, 0x9f, 0xce, 0x99, 0x78, 0xf1, 0x10, 0x8a, 0xa6, 0xad, 0x9d, 0x2a, - 0x55, 0x02, 0x51, 0x39, 0xd2, 0x90, 0xb8, 0xa7, 0x45, 0x82, 0x8a, 0x17, 0x21, 0x53, 0x10, 0xe2, - 0x26, 0x72, 0x1d, 0x37, 0x8d, 0x48, 0xec, 0x28, 0x71, 0x11, 0xe2, 0x53, 0x70, 0xc7, 0x57, 0xea, - 0xe5, 0x2e, 0xb9, 0xa1, 0x82, 0xf1, 0x45, 0xf0, 0x4b, 0xb2, 0x66, 0x74, 0xe2, 0xc2, 0x89, 0x7d, - 0xce, 0xcf, 0xff, 0x63, 0xff, 0x7d, 0xc0, 0x7d, 0x2a, 0xe8, 0xc7, 0x42, 0x10, 0xba, 0x0c, 0x4a, - 0x29, 0x0a, 0x12, 0xb3, 0xfa, 0x3f, 0x27, 0xa5, 0x9e, 0x13, 0xc9, 0x50, 0x5e, 0x08, 0x29, 0xe0, - 0xc9, 0x25, 0x8a, 0x2a, 0x04, 0x35, 0xd0, 0xa3, 0xd1, 0xae, 0x12, 0xe3, 0x71, 0xc2, 0xeb, 0x5f, - 0x3e, 0x0f, 0xb2, 0x4f, 0x94, 0x5a, 0xb1, 0xa3, 0xe1, 0x96, 0x36, 0x5f, 0x95, 0x4d, 0xb8, 0x64, - 0x05, 0x27, 0x69, 0x58, 0x90, 0x85, 0xac, 0xb0, 0xd3, 0x5d, 0x2c, 0x63, 0x92, 0x44, 0x44, 0x92, - 0x8a, 0x38, 0xde, 0x25, 0x1a, 0xd9, 0xc1, 0x36, 0xbb, 0x92, 0x49, 0x1a, 0x2c, 0x53, 0x1a, 0xc8, - 0x24, 0x63, 0xea, 0x5e, 0x59, 0x5e, 0x31, 0x77, 0x62, 0x11, 0x0b, 0x33, 0x0d, 0xf4, 0xcc, 0x46, - 0x07, 0x3f, 0xf6, 0xc0, 0x01, 0x66, 0x79, 0x9a, 0x50, 0xf2, 0x46, 0x9b, 0x00, 0x47, 0x00, 0xea, - 0x83, 0x85, 0x24, 0x57, 0x41, 0x16, 0x85, 0x09, 0x8f, 0xd8, 0x67, 0xdf, 0x39, 0x75, 0xee, 0xb9, - 0xf8, 0x96, 0xce, 0x3c, 0xb6, 0x89, 0xa9, 0x8e, 0x43, 0x04, 0x0e, 0x53, 0xa6, 0x6c, 0xf9, 0x0b, - 0xff, 0xcf, 0xe0, 0xb7, 0x4d, 0xea, 0x0a, 0xff, 0x08, 0xb8, 0x11, 0x2b, 0xa9, 0xbf, 0xa7, 0x00, - 0xef, 0x6c, 0x80, 0xb6, 0x5e, 0x57, 0xb7, 0x42, 0x98, 0xf0, 0x98, 0x3d, 0x51, 0x4c, 0x91, 0xe4, - 0xca, 0x5f, 0x6c, 0x78, 0x55, 0xa7, 0x6d, 0xc4, 0x7c, 0xd7, 0x6c, 0xf4, 0xaf, 0xd9, 0xf8, 0x42, - 0xe7, 0xb1, 0xc5, 0xe0, 0x2b, 0x70, 0x53, 0x16, 0x2b, 0x4e, 0xd5, 0x8d, 0xa2, 0xd0, 0xbc, 0xae, - 0xdf, 0x36, 0x3b, 0x87, 0xd7, 0x96, 0x5c, 0xc8, 0x59, 0x4d, 0x1b, 0x17, 0xf0, 0x0d, 0x79, 0x65, - 0x0d, 0xdf, 0x82, 0x83, 0x98, 0x86, 0x72, 0x59, 0xb0, 0x72, 0x29, 0xd2, 0xc8, 0xef, 0x18, 0xb1, - 0x93, 0x86, 0x98, 0xf6, 0x1d, 0x29, 0xdf, 0xd1, 0xac, 0xf6, 0x7d, 0x7c, 0xb8, 0xde, 0xf4, 0x5b, - 0x17, 0x9b, 0xbe, 0xf7, 0x74, 0x32, 0xab, 0x77, 0x62, 0x2f, 0xa6, 0x97, 0x0b, 0xf8, 0x0c, 0xb4, - 0xf5, 0xe1, 0x4a, 0xff, 0x7f, 0xa3, 0x37, 0x42, 0xbb, 0xbd, 0x67, 0xbb, 0x0a, 0xd5, 0xcd, 0x85, - 0x5e, 0xbe, 0x9b, 0x4c, 0xf4, 0x99, 0xca, 0xb1, 0xab, 0xe5, 0xb1, 0x15, 0x80, 0x77, 0x41, 0x67, - 0x51, 0x88, 0x2f, 0x8c, 0xfb, 0x5d, 0x25, 0xd5, 0xc5, 0xd5, 0x6a, 0xf0, 0xcd, 0x01, 0xfb, 0xc6, - 0xd2, 0x29, 0x5f, 0x08, 0xf8, 0xdc, 0xd6, 0x63, 0xe6, 0x3d, 0xbd, 0xb3, 0x07, 0xe8, 0x9f, 0xbd, - 0x8e, 0x9a, 0x8d, 0x31, 0xee, 0xea, 0x72, 0xe7, 0x9b, 0xbe, 0x63, 0x4b, 0x32, 0x78, 0x0c, 0xf6, - 0x53, 0x52, 0xca, 0x69, 0xe3, 0xc5, 0xb7, 0x01, 0xd8, 0x07, 0x1e, 0x5f, 0x65, 0x61, 0xce, 0x78, - 0x94, 0xf0, 0xd8, 0x3c, 0xb8, 0x8b, 0x81, 0x0a, 0xbd, 0xb6, 0x91, 0xf1, 0x70, 0xfd, 0xab, 0xd7, - 0x5a, 0x5f, 0xf4, 0x9c, 0x73, 0x35, 0xbe, 0xab, 0xf1, 0x53, 0x8d, 0xaf, 0xbf, 0x7b, 0xad, 0x0f, - 0x5e, 0xe3, 0x0c, 0xef, 0xdb, 0xf3, 0x8e, 0xe9, 0xd4, 0x87, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, - 0x58, 0x21, 0x28, 0xa9, 0xc4, 0x03, 0x00, 0x00, + // 541 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x92, 0xdd, 0x6e, 0xd3, 0x30, + 0x14, 0xc7, 0x5b, 0xfa, 0x41, 0xe7, 0x8c, 0x2f, 0x0f, 0xa1, 0x68, 0xda, 0xda, 0xa9, 0x52, 0x25, + 0x10, 0x95, 0x2b, 0x0d, 0x89, 0x7b, 0x5a, 0x24, 0xa8, 0x18, 0x08, 0x79, 0x05, 0x21, 0x6e, 0x22, + 0xd7, 0x71, 0xdd, 0x88, 0xd4, 0x8e, 0x12, 0x17, 0xa1, 0x3d, 0x05, 0xef, 0xc3, 0x0b, 0xf4, 0x72, + 0x97, 0xdc, 0x50, 0xc1, 0x78, 0x11, 0xfc, 0x91, 0xac, 0x19, 0x9d, 0x76, 0xe1, 0xc4, 0x3e, 0xe7, + 0xe7, 0xbf, 0xed, 0xff, 0x39, 0xe0, 0x09, 0x95, 0xf4, 0x4b, 0x2a, 0x09, 0x9d, 0x0f, 0x32, 0x25, + 0x53, 0xc2, 0x59, 0xf1, 0x9f, 0x92, 0xcc, 0xcc, 0x89, 0x62, 0x28, 0x49, 0xa5, 0x92, 0xf0, 0xf0, + 0x12, 0x45, 0x39, 0x82, 0x4a, 0xe8, 0x7e, 0x7f, 0x5b, 0x89, 0x09, 0x1e, 0x89, 0xe2, 0x97, 0x4c, + 0x07, 0x8b, 0xaf, 0x94, 0x3a, 0xb1, 0xfd, 0xde, 0x86, 0xb6, 0x5f, 0x9d, 0x8d, 0x84, 0x62, 0xa9, + 0x20, 0x71, 0x90, 0x92, 0x99, 0xca, 0xb1, 0xa3, 0x6d, 0x6c, 0xc1, 0x14, 0x09, 0x89, 0x22, 0x39, + 0x71, 0xb0, 0x4d, 0x94, 0xb2, 0xdd, 0x4d, 0x76, 0xa9, 0xa2, 0x78, 0x30, 0x8f, 0xe9, 0x40, 0x45, + 0x0b, 0xa6, 0xdf, 0xb5, 0x48, 0x72, 0xe6, 0x21, 0x97, 0x5c, 0xda, 0xe9, 0xc0, 0xcc, 0x5c, 0xb4, + 0xfb, 0xab, 0x06, 0x76, 0x31, 0x4b, 0xe2, 0x88, 0x92, 0x53, 0x63, 0x02, 0xec, 0x03, 0x68, 0x2e, + 0x16, 0x90, 0x44, 0x07, 0x59, 0x18, 0x44, 0x22, 0x64, 0xdf, 0xfc, 0xea, 0x51, 0xf5, 0x71, 0x1d, + 0xdf, 0x37, 0x99, 0x17, 0x2e, 0x31, 0x36, 0x71, 0x88, 0xc0, 0x5e, 0xcc, 0xb4, 0x2d, 0xff, 0xe1, + 0xb7, 0x2c, 0xfe, 0xc0, 0xa6, 0xae, 0xf0, 0xcf, 0x41, 0x3d, 0x64, 0x19, 0xf5, 0x6b, 0x1a, 0xf0, + 0x8e, 0xbb, 0x68, 0xe3, 0x75, 0xfe, 0x2a, 0x84, 0x89, 0xe0, 0xec, 0xa5, 0x66, 0xd2, 0x28, 0xd1, + 0xfe, 0x62, 0xcb, 0xeb, 0x73, 0x1a, 0x56, 0xcc, 0xaf, 0xdb, 0x8d, 0xfe, 0x35, 0x1b, 0x4f, 0x4c, + 0x1e, 0x3b, 0x0c, 0xbe, 0x03, 0xf7, 0x54, 0xba, 0x14, 0x54, 0xbf, 0x28, 0x0c, 0x6c, 0x75, 0xfd, + 0x86, 0xdd, 0xd9, 0xbb, 0xf6, 0xc8, 0x99, 0x9a, 0x14, 0xb4, 0x75, 0x01, 0xdf, 0x55, 0x57, 0xd6, + 0xf0, 0x03, 0xd8, 0xe5, 0x34, 0x50, 0xf3, 0x94, 0x65, 0x73, 0x19, 0x87, 0x7e, 0xd3, 0x8a, 0x1d, + 0x96, 0xc4, 0x8c, 0xef, 0x48, 0xfb, 0x8e, 0x26, 0x85, 0xef, 0xc3, 0xbd, 0xd5, 0xba, 0x53, 0xb9, + 0x58, 0x77, 0xbc, 0x57, 0xa3, 0x49, 0xb1, 0x13, 0x7b, 0x9c, 0x5e, 0x2e, 0xe0, 0x6b, 0xd0, 0x30, + 0x97, 0xcb, 0xfc, 0xdb, 0x56, 0xaf, 0x8f, 0xb6, 0x7b, 0xcf, 0x75, 0x15, 0x2a, 0x9a, 0x0b, 0xbd, + 0xfd, 0x38, 0x1a, 0x99, 0x3b, 0x65, 0xc3, 0xba, 0x91, 0xc7, 0x4e, 0x00, 0x3e, 0x02, 0xcd, 0x59, + 0x2a, 0xcf, 0x98, 0xf0, 0x5b, 0x5a, 0xaa, 0x85, 0xf3, 0x55, 0xf7, 0x47, 0x15, 0xec, 0x58, 0x4b, + 0xc7, 0x62, 0x26, 0xe1, 0x1b, 0x77, 0x1e, 0xb3, 0xf5, 0xf4, 0x8e, 0x9f, 0xa2, 0x1b, 0x7b, 0x1d, + 0x95, 0x1b, 0x63, 0xd8, 0x32, 0xc7, 0x9d, 0xaf, 0x3b, 0x55, 0x77, 0x24, 0x83, 0x07, 0x60, 0x27, + 0x26, 0x99, 0x1a, 0x97, 0x2a, 0xbe, 0x09, 0xc0, 0x0e, 0xf0, 0xc4, 0x72, 0x11, 0x24, 0x4c, 0x84, + 0x91, 0xe0, 0xb6, 0xe0, 0x75, 0x0c, 0x74, 0xe8, 0xbd, 0x8b, 0xc0, 0x2e, 0xb8, 0x63, 0x1b, 0x2d, + 0x96, 0x3c, 0xc8, 0xa2, 0x33, 0x57, 0xda, 0x1a, 0xf6, 0x4c, 0xf0, 0x44, 0xf2, 0x53, 0x1d, 0x1a, + 0xf6, 0x56, 0x7f, 0xda, 0x95, 0xd5, 0x45, 0xbb, 0x7a, 0xae, 0xc7, 0x4f, 0x3d, 0x7e, 0xeb, 0xf1, + 0xfd, 0x6f, 0xbb, 0xf2, 0xd9, 0x2b, 0xdd, 0xf3, 0x53, 0x63, 0xda, 0xb4, 0xdd, 0xfc, 0xec, 0x5f, + 0x00, 0x00, 0x00, 0xff, 0xff, 0xd7, 0xdf, 0xdd, 0xe5, 0xe8, 0x03, 0x00, 0x00, } diff --git a/storage/storagebase/state.proto b/storage/storagebase/state.proto index 0111d8354fbc..997c88fa2b65 100644 --- a/storage/storagebase/state.proto +++ b/storage/storagebase/state.proto @@ -58,4 +58,5 @@ message RangeInfo { // The highest (and last) index in the Raft log. uint64 lastIndex = 2; uint64 num_pending = 3; + int64 raft_log_size = 4; } diff --git a/ui/next/app/js/protos.js b/ui/next/app/js/protos.js index 67bf882afdd4..4a589b182786 100644 --- a/ui/next/app/js/protos.js +++ b/ui/next/app/js/protos.js @@ -2568,6 +2568,12 @@ module.exports = require("protobufjs").newBuilder({})['import']({ "type": "uint64", "name": "num_pending", "id": 3 + }, + { + "rule": "optional", + "type": "int64", + "name": "raft_log_size", + "id": 4 } ] } diff --git a/ui/next/generated/protos.d.ts b/ui/next/generated/protos.d.ts index 80b58146ee1e..8410f0b0aa81 100644 --- a/ui/next/generated/protos.d.ts +++ b/ui/next/generated/protos.d.ts @@ -4672,6 +4672,15 @@ getNumPending?() : Long; +raft_log_size?: Long; + + +getRaftLogSize?() : Long; + setRaftLogSize?(raftLogSize : Long): void; + + + + } export interface RangeInfoMessage extends RangeInfo { diff --git a/ui/next/generated/protos.json b/ui/next/generated/protos.json index bff5d1140028..13a69d79f384 100644 --- a/ui/next/generated/protos.json +++ b/ui/next/generated/protos.json @@ -2567,6 +2567,12 @@ "type": "uint64", "name": "num_pending", "id": 3 + }, + { + "rule": "optional", + "type": "int64", + "name": "raft_log_size", + "id": 4 } ] }