From e73fd222502122913336fd478611676d1505f344 Mon Sep 17 00:00:00 2001 From: Spencer Kimball Date: Thu, 14 Dec 2017 13:59:54 -0500 Subject: [PATCH] storage: implement a queue for suggested compactions Clear range commands now come with an attendant suggested range compaction hint. Any suggested compactions generated during command execution are now sent via replicated result data to each replica and stored in a store-local queue of pending compaction suggestions. A new compactor goroutine runs periodically to process pending suggestions. If more than an absolute number of bytes is reclaimable, or if the bytes to reclaim exceed a threshold fraction of the total used bytes, we'll go ahead and compact the suggested range. Suggested compactions are allowed to remain in the queue for at most 24 hours, after which if they haven't been aggregated into a compact-able key span, they'll be discarded, and left to RocksDB's background compaction processing. Release note (UX improvement): When tables are dropped, the space will be reclaimed in a more timely fashion. --- c-deps/libroach/db.cc | 163 ++--- c-deps/libroach/include/libroach.h | 5 + pkg/keys/constants.go | 10 + pkg/keys/keys.go | 50 ++ pkg/keys/keys_test.go | 45 ++ pkg/keys/printer.go | 18 +- pkg/keys/printer_test.go | 3 + pkg/storage/batcheval/cmd_clear_range.go | 41 +- pkg/storage/batcheval/result/result.go | 9 + pkg/storage/compactor/compactor.go | 424 +++++++++++++ pkg/storage/compactor/compactor_test.go | 578 ++++++++++++++++++ pkg/storage/compactor/metrics.go | 60 ++ pkg/storage/engine/engine.go | 11 + pkg/storage/engine/rocksdb.go | 134 ++++- pkg/storage/engine/rocksdb_test.go | 115 +++- pkg/storage/replica_proposal.go | 5 + pkg/storage/storagebase/proposer_kv.pb.go | 690 +++++++++++++++++++--- pkg/storage/storagebase/proposer_kv.proto | 26 + pkg/storage/store.go | 10 + 19 files changed, 2199 insertions(+), 198 deletions(-) create mode 100644 pkg/storage/compactor/compactor.go create mode 100644 pkg/storage/compactor/compactor_test.go create mode 100644 pkg/storage/compactor/metrics.go diff --git a/c-deps/libroach/db.cc b/c-deps/libroach/db.cc index 19b9f3fe60f2..c0a4be4ee5df 100644 --- a/c-deps/libroach/db.cc +++ b/c-deps/libroach/db.cc @@ -1649,12 +1649,16 @@ DBStatus DBSyncWAL(DBEngine* db) { } DBStatus DBCompact(DBEngine* db) { + return DBCompactRange(db, DBKey(), DBKey()); +} + +DBStatus DBCompactRange(DBEngine* db, DBKey start, DBKey end) { rocksdb::CompactRangeOptions options; // By default, RocksDB doesn't recompact the bottom level (unless // there is a compaction filter, which we don't use). However, // recompacting the bottom layer is necessary to pick up changes to - // settings like bloom filter configurations (which is the biggest - // reason we currently have to use this function). + // settings like bloom filter configurations, and to fully reclaim + // space after dropping, truncating, or migrating tables. options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce; // Compacting the entire database in a single-shot can use a @@ -1662,88 +1666,105 @@ DBStatus DBCompact(DBEngine* db) { // we loop over the sstables in the lowest level and initiate // compactions on smaller ranges of keys. The resulting compacted // database is the same size, but the temporary disk space needed - // for the compaction is dramatically reduced + // for the compaction is dramatically reduced. + std::vector all_metadata; std::vector metadata; - db->rep->GetLiveFilesMetaData(&metadata); + db->rep->GetLiveFilesMetaData(&all_metadata); + + const std::string start_key(EncodeKey(start)); + const std::string end_key(EncodeKey(end)); int max_level = 0; + for (int i = 0; i < all_metadata.size(); i++) { + // Skip any SSTables which fall outside the specified range, if a + // range was specified. + if ((!start_key.empty() > 0 && all_metadata[i].largestkey < start_key) || + (!end_key.empty() && all_metadata[i].smallestkey >= end_key)) { + continue; + } + if (max_level < all_metadata[i].level) { + max_level = all_metadata[i].level; + } + // Gather the set of SSTables to compact. + metadata.push_back(all_metadata[i]); + } + all_metadata.clear(); + + if (max_level != db->rep->NumberLevels() - 1) { + // There are no sstables at the lowest level, so just compact the + // specified key span, wholesale. Due to the + // level_compaction_dynamic_level_bytes setting, this will only + // happen on spans containing very little data. + const rocksdb::Slice start_slice(start_key); + const rocksdb::Slice end_slice(end_key); + return ToDBStatus(db->rep->CompactRange(options, !start_key.empty() ? &start_slice : nullptr, + !end_key.empty() ? &end_slice : nullptr)); + } + + // A naive approach to selecting ranges to compact would be to + // compact the ranges specified by the smallest and largest key in + // each sstable of the bottom-most level. Unfortunately, the + // sstables in the bottom-most level have vastly different + // sizes. For example, starting with the following set of bottom-most + // sstables: + // + // 100M[16] 89M 70M 66M 56M 54M 38M[2] 36M 23M 20M 17M 8M 6M 5M 2M 2K[4] + // + // If we compact the entire database in one call we can end up with: + // + // 100M[22] 77M 76M 50M + // + // If we use the naive approach (compact the range specified by + // the smallest and largest keys): + // + // 100M[18] 92M 68M 62M 61M 50M 45M 39M 31M 29M[2] 24M 23M 18M 9M 8M[2] 7M + // 2K[4] + // + // With the approach below: + // + // 100M[19] 80M 68M[2] 62M 61M 53M 45M 36M 31M + // + // The approach below is to loop over the bottom-most sstables in + // sorted order and initiate a compact range every 128MB of data. + + // Gather up the bottom-most sstable metadata. + std::vector sst; for (int i = 0; i < metadata.size(); i++) { - if (max_level < metadata[i].level) { - max_level = metadata[i].level; + if (metadata[i].level != max_level) { + continue; } + sst.push_back(metadata[i]); } - - if (max_level == db->rep->NumberLevels() - 1) { - // A naive approach to selecting ranges to compact would be to - // compact the ranges specified by the smallest and largest key in - // each sstable of the bottom-most level. Unfortunately, the - // sstables in the bottom-most level have vastly different - // sizes. For example, starting with the following set of bottom-most - // sstables: - // - // 100M[16] 89M 70M 66M 56M 54M 38M[2] 36M 23M 20M 17M 8M 6M 5M 2M 2K[4] - // - // If we compact the entire database in one call we can end up with: - // - // 100M[22] 77M 76M 50M - // - // If we use the naive approach (compact the range specified by - // the smallest and largest keys): - // - // 100M[18] 92M 68M 62M 61M 50M 45M 39M 31M 29M[2] 24M 23M 18M 9M 8M[2] 7M - // 2K[4] - // - // With the approach below: - // - // 100M[19] 80M 68M[2] 62M 61M 53M 45M 36M 31M - // - // The approach below is to loop over the bottom-most sstables in - // sorted order and initiate a compact range every 128MB of data. - - // Gather up the bottom-most sstable metadata. - std::vector sst; - for (int i = 0; i < metadata.size(); i++) { - if (metadata[i].level != max_level) { - continue; - } - sst.push_back(metadata[i]); - } - // Sort the metadata by smallest key. - std::sort(sst.begin(), sst.end(), [](const rocksdb::SstFileMetaData& a, const rocksdb::SstFileMetaData& b) -> bool { + // Sort the metadata by smallest key. + std::sort(sst.begin(), sst.end(), [](const rocksdb::SstFileMetaData& a, const rocksdb::SstFileMetaData& b) -> bool { return a.smallestkey < b.smallestkey; }); - // Walk over the bottom-most sstables in order and perform - // compactions every 128MB. - rocksdb::Slice last; - rocksdb::Slice* last_ptr = nullptr; - uint64_t size = 0; - const uint64_t target_size = 128 << 20; - for (int i = 0; i < sst.size(); ++i) { - size += sst[i].size; - if (size < target_size) { - continue; - } - rocksdb::Slice cur(sst[i].largestkey); - rocksdb::Status status = db->rep->CompactRange(options, last_ptr, &cur); - if (!status.ok()) { - return ToDBStatus(status); - } - last = cur; - last_ptr = &last; - size = 0; + // Walk over the bottom-most sstables in order and perform + // compactions every 128MB. + rocksdb::Slice last; + rocksdb::Slice* last_ptr = nullptr; + uint64_t size = 0; + const uint64_t target_size = 128 << 20; + for (int i = 0; i < sst.size(); ++i) { + size += sst[i].size; + if (size < target_size) { + continue; } - - if (size > 0) { - return ToDBStatus(db->rep->CompactRange(options, last_ptr, nullptr)); + rocksdb::Slice cur(sst[i].largestkey); + rocksdb::Status status = db->rep->CompactRange(options, last_ptr, &cur); + if (!status.ok()) { + return ToDBStatus(status); } - return kSuccess; + last = cur; + last_ptr = &last; + size = 0; } - // There are no sstables at the lowest level, so just compact the - // entire database. Due to the level_compaction_dynamic_level_bytes - // setting, this will only happen on very small databases. - return ToDBStatus(db->rep->CompactRange(options, NULL, NULL)); + if (size > 0) { + return ToDBStatus(db->rep->CompactRange(options, last_ptr, nullptr)); + } + return kSuccess; } DBStatus DBApproximateDiskBytes(DBEngine* db, DBKey start, DBKey end, uint64_t* size) { diff --git a/c-deps/libroach/include/libroach.h b/c-deps/libroach/include/libroach.h index ef10f1b2c313..e618518abd19 100644 --- a/c-deps/libroach/include/libroach.h +++ b/c-deps/libroach/include/libroach.h @@ -109,6 +109,11 @@ DBStatus DBSyncWAL(DBEngine* db); // Forces an immediate compaction over all keys. DBStatus DBCompact(DBEngine* db); +// Forces an immediate compaction over keys in the specified range. +// Note that if start is empty, it indicates the start of the database. +// If end is empty, it indicates the end of the database. +DBStatus DBCompactRange(DBEngine* db, DBKey start, DBKey end); + // Stores the approximate on-disk size of the given key range into the // supplied uint64. DBStatus DBApproximateDiskBytes(DBEngine* db, DBKey start, DBKey end, uint64_t *size); diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index 6d7ee5559509..ffcd02639d8d 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -88,6 +88,16 @@ var ( // is to allow a restarting node to discover approximately how long it has // been down without needing to retrieve liveness records from the cluster. localStoreLastUpSuffix = []byte("uptm") + // localStoreSuggestedCompactionSuffix stores suggested compactions to + // be aggregated and processed on the store. + localStoreSuggestedCompactionSuffix = []byte("comp") + + // LocalStoreSuggestedCompactionsMin is the start of the span of + // possible suggested compaction keys for a store. + LocalStoreSuggestedCompactionsMin = MakeStoreKey(localStoreSuggestedCompactionSuffix, nil) + // LocalStoreSuggestedCompactionsMax is the end of the span of + // possible suggested compaction keys for a store. + LocalStoreSuggestedCompactionsMax = LocalStoreSuggestedCompactionsMin.PrefixEnd() // LocalRangeIDPrefix is the prefix identifying per-range data // indexed by Range ID. The Range ID is appended to this prefix, diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index 61aab86f7ff5..c273f08c50fb 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -39,6 +39,22 @@ func MakeStoreKey(suffix, detail roachpb.RKey) roachpb.Key { return key } +// DecodeStoreKey returns the suffix and detail portions of a local +// store key. +func DecodeStoreKey(key roachpb.Key) (suffix, detail roachpb.RKey, err error) { + if !bytes.HasPrefix(key, localStorePrefix) { + return nil, nil, errors.Errorf("key %s does not have %s prefix", key, localStorePrefix) + } + // Cut the prefix, the Range ID, and the infix specifier. + key = key[len(localStorePrefix):] + if len(key) < localSuffixLength { + return nil, nil, errors.Errorf("malformed key does not contain local store suffix") + } + suffix = roachpb.RKey(key[:localSuffixLength]) + detail = roachpb.RKey(key[localSuffixLength:]) + return suffix, detail, nil +} + // StoreIdentKey returns a store-local key for the store metadata. func StoreIdentKey() roachpb.Key { return MakeStoreKey(localStoreIdentSuffix, nil) @@ -59,6 +75,40 @@ func StoreLastUpKey() roachpb.Key { return MakeStoreKey(localStoreLastUpSuffix, nil) } +// StoreSuggestedCompactionKey returns a store-local key for a +// suggested compaction. It combines the specified start and end keys. +func StoreSuggestedCompactionKey(start, end roachpb.Key) roachpb.Key { + var detail roachpb.RKey + detail = encoding.EncodeBytesAscending(detail, start) + detail = encoding.EncodeBytesAscending(detail, end) + return MakeStoreKey(localStoreSuggestedCompactionSuffix, detail) +} + +// DecodeStoreSuggestedCompactionKey returns the start and end keys of +// the suggested compaction's span. +func DecodeStoreSuggestedCompactionKey(key roachpb.Key) (start, end roachpb.Key, err error) { + var suffix, detail roachpb.RKey + suffix, detail, err = DecodeStoreKey(key) + if err != nil { + return nil, nil, err + } + if !suffix.Equal(localStoreSuggestedCompactionSuffix) { + return nil, nil, errors.Errorf("key with suffix %q != %q", suffix, localStoreSuggestedCompactionSuffix) + } + detail, start, err = encoding.DecodeBytesAscending(detail, nil) + if err != nil { + return nil, nil, err + } + detail, end, err = encoding.DecodeBytesAscending(detail, nil) + if err != nil { + return nil, nil, err + } + if len(detail) != 0 { + return nil, nil, errors.Errorf("invalid key has trailing garbage: %q", detail) + } + return start, end, nil +} + // NodeLivenessKey returns the key for the node liveness record. func NodeLivenessKey(nodeID roachpb.NodeID) roachpb.Key { key := make(roachpb.Key, 0, len(NodeLivenessPrefix)+9) diff --git a/pkg/keys/keys_test.go b/pkg/keys/keys_test.go index 6dc8fe582178..c805df57a6bb 100644 --- a/pkg/keys/keys_test.go +++ b/pkg/keys/keys_test.go @@ -27,6 +27,51 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" ) +func TestStoreKeyEncodeDecode(t *testing.T) { + testCases := []struct { + key roachpb.Key + expSuffix roachpb.RKey + expDetail roachpb.RKey + }{ + {key: StoreIdentKey(), expSuffix: localStoreIdentSuffix, expDetail: nil}, + {key: StoreGossipKey(), expSuffix: localStoreGossipSuffix, expDetail: nil}, + {key: StoreClusterVersionKey(), expSuffix: localStoreClusterVersionSuffix, expDetail: nil}, + {key: StoreLastUpKey(), expSuffix: localStoreLastUpSuffix, expDetail: nil}, + { + key: StoreSuggestedCompactionKey(roachpb.Key("a"), roachpb.Key("z")), + expSuffix: localStoreSuggestedCompactionSuffix, + expDetail: encoding.EncodeBytesAscending(encoding.EncodeBytesAscending(nil, roachpb.Key("a")), roachpb.Key("z")), + }, + } + for _, test := range testCases { + t.Run("", func(t *testing.T) { + if suffix, detail, err := DecodeStoreKey(test.key); err != nil { + t.Error(err) + } else if !suffix.Equal(test.expSuffix) { + t.Errorf("expected %s; got %s", test.expSuffix, suffix) + } else if !detail.Equal(test.expDetail) { + t.Errorf("expected %s; got %s", test.expDetail, detail) + } + }) + } +} + +func TestStoreSuggestedCompactionKeyDecode(t *testing.T) { + origStart := roachpb.Key("a") + origEnd := roachpb.Key("z") + key := StoreSuggestedCompactionKey(origStart, origEnd) + start, end, err := DecodeStoreSuggestedCompactionKey(key) + if err != nil { + t.Fatal(err) + } + if !start.Equal(origStart) { + t.Errorf("expected %s == %s", start, origStart) + } + if !end.Equal(origEnd) { + t.Errorf("expected %s == %s", end, origEnd) + } +} + // TestLocalKeySorting is a sanity check to make sure that // the non-replicated part of a store sorts before the meta. func TestKeySorting(t *testing.T) { diff --git a/pkg/keys/printer.go b/pkg/keys/printer.go index 0106f600c6f0..d96bff6080ff 100644 --- a/pkg/keys/printer.go +++ b/pkg/keys/printer.go @@ -180,11 +180,25 @@ var constSubKeyDict = []struct { {"/storeIdent", localStoreIdentSuffix}, {"/gossipBootstrap", localStoreGossipSuffix}, {"/clusterVersion", localStoreClusterVersionSuffix}, + {"/suggestedCompaction", localStoreSuggestedCompactionSuffix}, +} + +func suggestedCompactionKeyPrint(key roachpb.Key) string { + start, end, err := DecodeStoreSuggestedCompactionKey(key) + if err != nil { + return fmt.Sprintf("", err) + } + return fmt.Sprintf("{%s-%s}", start, end) } func localStoreKeyPrint(_ []encoding.Direction, key roachpb.Key) string { for _, v := range constSubKeyDict { if bytes.HasPrefix(key, v.key) { + if v.key.Equal(localStoreSuggestedCompactionSuffix) { + return v.name + "/" + suggestedCompactionKeyPrint( + append(roachpb.Key(nil), append(localStorePrefix, key...)...), + ) + } return v.name } } @@ -195,7 +209,9 @@ func localStoreKeyPrint(_ []encoding.Direction, key roachpb.Key) string { func localStoreKeyParse(input string) (remainder string, output roachpb.Key) { for _, s := range constSubKeyDict { if strings.HasPrefix(input, s.name) { - remainder = input[len(s.name):] + if s.key.Equal(localStoreSuggestedCompactionSuffix) { + panic(&errUglifyUnsupported{errors.New("cannot parse suggested compaction key")}) + } output = MakeStoreKey(s.key, nil) return } diff --git a/pkg/keys/printer_test.go b/pkg/keys/printer_test.go index 8bef295863bc..6fbe044bf49e 100644 --- a/pkg/keys/printer_test.go +++ b/pkg/keys/printer_test.go @@ -47,6 +47,9 @@ func TestPrettyPrint(t *testing.T) { {StoreIdentKey(), "/Local/Store/storeIdent"}, {StoreGossipKey(), "/Local/Store/gossipBootstrap"}, {StoreClusterVersionKey(), "/Local/Store/clusterVersion"}, + {StoreSuggestedCompactionKey(MinKey, roachpb.Key("b")), `/Local/Store/suggestedCompaction/{/Min-"b"}`}, + {StoreSuggestedCompactionKey(roachpb.Key("a"), roachpb.Key("b")), `/Local/Store/suggestedCompaction/{"a"-"b"}`}, + {StoreSuggestedCompactionKey(roachpb.Key("a"), MaxKey), `/Local/Store/suggestedCompaction/{"a"-/Max}`}, {AbortSpanKey(roachpb.RangeID(1000001), txnID), fmt.Sprintf(`/Local/RangeID/1000001/r/AbortSpan/%q`, txnID)}, {RaftTombstoneKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftTombstone"}, diff --git a/pkg/storage/batcheval/cmd_clear_range.go b/pkg/storage/batcheval/cmd_clear_range.go index 2051954a0a31..94fefd9383a3 100644 --- a/pkg/storage/batcheval/cmd_clear_range.go +++ b/pkg/storage/batcheval/cmd_clear_range.go @@ -52,9 +52,6 @@ func declareKeysClearRange( // negate them in the case of being able to clear the entire user- // space span of keys in the range. spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeStatsKey(header.RangeID)}) - // Add the GC threshold key, as this is updated as part of clear a - // range of data. - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)}) } // ClearRange wipes all MVCC versions of keys covered by the specified @@ -75,6 +72,7 @@ func ClearRange( args := cArgs.Args.(*roachpb.ClearRangeRequest) from := engine.MVCCKey{Key: args.Key} to := engine.MVCCKey{Key: args.EndKey} + var pd result.Result // Before clearing, compute the delta in MVCCStats. statsDelta, err := computeStatsDelta(ctx, batch, cArgs, from, to) @@ -83,21 +81,6 @@ func ClearRange( } cArgs.Stats.Subtract(statsDelta) - // Forward the range's GC threshold to the wall clock's now() in order - // to be newer than any previous write, and to disallow reads at earlier - // timestamps which will be invalid after deleting all existing keys - // in the span. - var pd result.Result - gcThreshold := cArgs.EvalCtx.GetGCThreshold() - gcThreshold.Forward(cArgs.EvalCtx.Clock().Now()) - pd.Replicated.State = &storagebase.ReplicaState{ - GCThreshold: &gcThreshold, - } - stateLoader := MakeStateLoader(cArgs.EvalCtx) - if err := stateLoader.SetGCThreshold(ctx, batch, cArgs.Stats, &gcThreshold); err != nil { - return result.Result{}, err - } - // If the total size of data to be cleared is less than // clearRangeBytesThreshold, clear the individual values manually, // instead of using a range tombstone (inefficient for small ranges). @@ -114,7 +97,18 @@ func ClearRange( return pd, nil } - // Otherwise, clear the key span using engine.ClearRange. + // Otherwise, suggest a compaction for the cleared range and clear + // the key span using engine.ClearRange. + pd.Replicated.SuggestedCompactions = []storagebase.SuggestedCompaction{ + { + StartKey: from.Key, + EndKey: to.Key, + Compaction: storagebase.Compaction{ + Bytes: statsDelta.Total(), + SuggestedAtNanos: cArgs.Header.Timestamp.WallTime, + }, + }, + } if err := batch.ClearRange(from, to); err != nil { return result.Result{}, err } @@ -124,10 +118,11 @@ func ClearRange( // computeStatsDelta determines the change in stats caused by the // ClearRange command. If the cleared span is the entire range, // computing MVCCStats is easy. We just negate all fields except sys -// bytes and count. Note that if a race build is enabled, we use -// the expectation of running in a CI environment to compute stats -// by iterating over the span to provide extra verification that the -// fast path of simply subtracting the non-system values is accurate. +// bytes and count. Note that if a race build is enabled, we use the +// expectation of running in a CI environment to compute stats by +// iterating over the span to provide extra verification that the fast +// path of simply subtracting the non-system values is accurate. +// Returns the delta stats. func computeStatsDelta( ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, from, to engine.MVCCKey, ) (enginepb.MVCCStats, error) { diff --git a/pkg/storage/batcheval/result/result.go b/pkg/storage/batcheval/result/result.go index 4ce31b6dfa8c..02ccc4cc25dd 100644 --- a/pkg/storage/batcheval/result/result.go +++ b/pkg/storage/batcheval/result/result.go @@ -246,6 +246,15 @@ func (p *Result) MergeAndDestroy(q Result) error { } q.Replicated.AddSSTable = nil + if q.Replicated.SuggestedCompactions != nil { + if p.Replicated.SuggestedCompactions == nil { + p.Replicated.SuggestedCompactions = q.Replicated.SuggestedCompactions + } else { + p.Replicated.SuggestedCompactions = append(p.Replicated.SuggestedCompactions, q.Replicated.SuggestedCompactions...) + } + } + q.Replicated.SuggestedCompactions = nil + if q.Local.IntentsAlways != nil { if p.Local.IntentsAlways == nil { p.Local.IntentsAlways = q.Local.IntentsAlways diff --git a/pkg/storage/compactor/compactor.go b/pkg/storage/compactor/compactor.go new file mode 100644 index 000000000000..d0de4f4f3c2a --- /dev/null +++ b/pkg/storage/compactor/compactor.go @@ -0,0 +1,424 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package compactor + +import ( + "fmt" + "time" + + opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" +) + +const ( + // defaultCompactionMinInterval indicates the minimum period of + // time to wait before any compaction activity is considered, after + // suggestions are made. The intent is to allow sufficient time for + // all ranges to be cleared when a big table is dropped, so the + // compactor can determine contiguous stretches and efficient delete + // sstable files. + defaultCompactionMinInterval = 2 * time.Minute + + // defaultThresholdBytes is the threshold in bytes of suggested + // reclamation, after which the compactor will begin processing + // (taking compactor min interval into account). Note that we want + // to target roughly the target size of an L6 SSTable (128MB) but + // these are logical bytes (as in, from MVCCStats) which can't be + // translated into SSTable-bytes. As a result, we conservatively set + // a higher threshold. + defaultThresholdBytes = 256 << 20 // more than 256MiB will trigger + + // defaultThresholdBytesFraction is the fraction of total logical + // bytes used which are up for suggested reclamation, after which + // the compactor will begin processing (taking compactor min + // interval into account). Note that this threshold handles the case + // where a table is dropped which is a significant fraction of the + // total space in the database, but does not exceed the absolute + // defaultThresholdBytes threshold. + defaultThresholdBytesFraction = 0.10 // more than 10% of space will trigger + + // defaultMaxSuggestedCompactionRecordAge is the maximum age of a + // suggested compaction record. If not processed within this time + // interval since the compaction was suggested, it will be deleted. + defaultMaxSuggestedCompactionRecordAge = 24 * time.Hour +) + +// compactorOptions specify knobs to tune for compactor behavior. +// These are intended for testing. +type compactorOptions struct { + CompactionMinInterval time.Duration + ThresholdBytes int64 + ThresholdBytesFraction float64 + MaxSuggestedCompactionRecordAge time.Duration +} + +func defaultCompactorOptions() compactorOptions { + return compactorOptions{ + CompactionMinInterval: defaultCompactionMinInterval, + ThresholdBytes: defaultThresholdBytes, + ThresholdBytesFraction: defaultThresholdBytesFraction, + MaxSuggestedCompactionRecordAge: defaultMaxSuggestedCompactionRecordAge, + } +} + +type storeCapacityFunc func() (roachpb.StoreCapacity, error) + +// A Compactor records suggested compactions and periodically +// makes requests to the engine to reclaim storage space. +type Compactor struct { + eng engine.WithSSTables + capFn storeCapacityFunc + ch chan struct{} + opts compactorOptions + Metrics Metrics +} + +// NewCompactor returns a compactor for the specified storage engine. +func NewCompactor(eng engine.WithSSTables, capFn storeCapacityFunc) *Compactor { + return &Compactor{ + eng: eng, + capFn: capFn, + ch: make(chan struct{}, 1), + opts: defaultCompactorOptions(), + Metrics: makeMetrics(), + } +} + +// Start launches a compaction processing goroutine and exits when the +// provided stopper indicates. Processing is done with a periodicity of +// compactionMinInterval, but only if there are compactions pending. +func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stopper *stop.Stopper) { + if empty, err := c.isSpanEmpty( + ctx, keys.LocalStoreSuggestedCompactionsMin, keys.LocalStoreSuggestedCompactionsMax, + ); err != nil { + log.Warningf(ctx, "failed check whether compaction suggestions exist: %s", err) + } else if !empty { + log.Eventf(ctx, "compactor starting in %s as there are suggested compactions pending", c.opts.CompactionMinInterval) + c.ch <- struct{}{} // wake up the goroutine immediately + } + + stopper.RunWorker(ctx, func(ctx context.Context) { + var timer timeutil.Timer + var timerSet bool + for { + select { + case <-c.ch: + // Set the wait timer if not already set. + if !timerSet { + timer.Reset(c.opts.CompactionMinInterval) + timerSet = true + } + + case <-timer.C: + timer.Read = true + timer.Stop() + spanCtx, cleanup := tracing.EnsureContext(ctx, tracer, "process suggested compactions") + ok, err := c.processSuggestions(spanCtx) + if err != nil { + log.Warningf(spanCtx, "failed processing suggested compactions: %s", err) + } + cleanup() + if ok { + // Everything has been processed. Wait for the next + // suggested compaction before resetting the timer. + timerSet = false + break + } + // Reset the timer to re-attempt processing after the minimum + // compaction interval. + timer.Reset(c.opts.CompactionMinInterval) + timerSet = true + + case <-stopper.ShouldStop(): + return + } + } + }) +} + +// aggregatedCompaction is a utility struct that holds information +// about aggregated suggested compactions. +type aggregatedCompaction struct { + storagebase.SuggestedCompaction + suggestions []storagebase.SuggestedCompaction + startIdx int + total int +} + +func initAggregatedCompaction( + startIdx, total int, sc storagebase.SuggestedCompaction, +) aggregatedCompaction { + return aggregatedCompaction{ + SuggestedCompaction: sc, + suggestions: []storagebase.SuggestedCompaction{sc}, + startIdx: startIdx, + total: total, + } +} + +func (aggr aggregatedCompaction) String() string { + var seqFmt string + if len(aggr.suggestions) == 1 { + seqFmt = fmt.Sprintf("#%d/%d", aggr.startIdx+1, aggr.total) + } else { + seqFmt = fmt.Sprintf("#%d-%d/%d", aggr.startIdx+1, aggr.startIdx+len(aggr.suggestions)+1, aggr.total) + } + return fmt.Sprintf("%s (%s-%s) for %s", seqFmt, aggr.StartKey, aggr.EndKey, humanizeutil.IBytes(aggr.Bytes)) +} + +// processSuggestions considers all suggested compactions and +// processes contiguous or nearly contiguous aggregations if they +// exceed the absolute or fractional size thresholds. If suggested +// compactions don't meet thresholds, they're discarded if they're +// older than maxSuggestedCompactionRecordAge. Returns a boolean +// indicating whether the processing occurred. +func (c *Compactor) processSuggestions(ctx context.Context) (bool, error) { + // Collect all suggestions. + var suggestions []storagebase.SuggestedCompaction + var totalBytes int64 + if err := c.eng.Iterate( + engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMin}, + engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMax}, + func(kv engine.MVCCKeyValue) (bool, error) { + var sc storagebase.SuggestedCompaction + var err error + sc.StartKey, sc.EndKey, err = keys.DecodeStoreSuggestedCompactionKey(kv.Key.Key) + if err != nil { + return false, errors.Wrapf(err, "failed to decode suggested compaction key") + } + if err := protoutil.Unmarshal(kv.Value, &sc.Compaction); err != nil { + return false, err + } + suggestions = append(suggestions, sc) + totalBytes += sc.Bytes + return false, nil // continue iteration + }, + ); err != nil { + return false, err + } + // Update at start of processing, and at end. Note that totalBytes + // is decremented for any compactions which are processed. + c.Metrics.BytesQueued.Update(totalBytes) + defer func() { + c.Metrics.BytesQueued.Update(totalBytes) + }() + + if len(suggestions) == 0 { + return false, nil + } + + log.Eventf(ctx, "considering %d suggested compaction(s)", len(suggestions)) + + // Determine whether to attempt a compaction to reclaim space during + // this processing. The decision is based on total bytes to free up + // and the time since the last processing. + capacity, err := c.capFn() + if err != nil { + return false, err + } + + // Get information about SSTables in the underlying RocksDB instance. + ssti := engine.NewSSTableInfosByLevel(c.eng.GetSSTables()) + + // Iterate through suggestions, merging them into a running + // aggregation. Aggregates which exceed size thresholds are + // compacted. Small, isolated suggestions will be ignored until + // becoming too old, at which point they are discarded without + // compaction. + delBatch := c.eng.NewWriteOnlyBatch() + defer func() { + if err := delBatch.Commit(true); err != nil { + log.Warningf(ctx, "unable to delete suggested compaction records: %s", err) + } + delBatch.Close() + }() + + aggr := initAggregatedCompaction(0, len(suggestions), suggestions[0]) + for i, sc := range suggestions[1:] { + // Aggregate current suggestion with running aggregate if possible. If + // the current suggestion cannot be merged with the aggregate, process + // it if it meets compaction thresholds. + if done := c.aggregateCompaction(ctx, ssti, &aggr, sc); done { + processedBytes, err := c.processCompaction(ctx, aggr, capacity, delBatch) + if err != nil { + log.Errorf(ctx, "failed processing suggested compactions %+v: %s", aggr, err) + } else { + totalBytes -= processedBytes + } + // Reset aggregation to the last, un-aggregated, suggested compaction. + aggr = initAggregatedCompaction(i, len(suggestions), sc) + } + } + // Process remaining aggregated compaction. + processedBytes, err := c.processCompaction(ctx, aggr, capacity, delBatch) + if err != nil { + return false, err + } + totalBytes -= processedBytes + + return true, nil +} + +// processCompaction sends CompactRange requests to the storage engine +// if the aggregated suggestion exceeds size threshold(s). Otherwise, +// it either skips the compaction or skips the compaction *and* deletes +// the suggested compaction records if they're too old. Returns the +// number of bytes processed (either compacted or skipped and deleted +// due to age). +func (c *Compactor) processCompaction( + ctx context.Context, + aggr aggregatedCompaction, + capacity roachpb.StoreCapacity, + delBatch engine.Batch, +) (int64, error) { + shouldProcess := aggr.Bytes >= c.opts.ThresholdBytes || + aggr.Bytes >= int64(float64(capacity.LogicalBytes)*c.opts.ThresholdBytesFraction) + + if shouldProcess { + startTime := timeutil.Now() + log.Eventf(ctx, "processing compaction %s", aggr) + if err := c.eng.CompactRange( + engine.MVCCKey{Key: aggr.StartKey}, + engine.MVCCKey{Key: aggr.EndKey}, + ); err != nil { + return 0, errors.Wrapf(err, "unable to compact range %+v", aggr) + } + c.Metrics.BytesCompacted.Inc(aggr.Bytes) + c.Metrics.Compactions.Inc(1) + duration := timeutil.Since(startTime) + c.Metrics.CompactingNanos.Inc(int64(duration)) + log.Eventf(ctx, "processed compaction %s in %s", aggr, duration) + } else { + log.VEventf(ctx, 2, "skipping compaction(s) %s", aggr) + } + + // Delete suggested compaction records if appropriate. + for _, sc := range aggr.suggestions { + age := timeutil.Since(timeutil.Unix(0, sc.SuggestedAtNanos)) + tooOld := age >= c.opts.MaxSuggestedCompactionRecordAge + // Delete unless we didn't process and the record isn't too old. + if !shouldProcess && !tooOld { + continue + } + if tooOld { + c.Metrics.BytesSkipped.Inc(aggr.Bytes) + } + key := keys.StoreSuggestedCompactionKey(sc.StartKey, sc.EndKey) + if err := delBatch.Clear(engine.MVCCKey{Key: key}); err != nil { + log.Fatal(ctx, err) // should never happen on a batch + } + } + return aggr.Bytes, nil +} + +// aggregateCompaction merges sc into aggr, to create a new suggested +// compaction, if the key spans are overlapping or near-contiguous. +// Note that because suggested compactions are stored sorted by their +// start key, sc.StartKey >= aggr.StartKey. Returns whether the +// compaction was aggregated. If false, the supplied aggregation is +// complete and should be processed. +func (c *Compactor) aggregateCompaction( + ctx context.Context, + ssti engine.SSTableInfosByLevel, + aggr *aggregatedCompaction, + sc storagebase.SuggestedCompaction, +) bool { + // If the key spans don't overlap, then check whether they're + // "nearly" contiguous. + if aggr.EndKey.Compare(sc.StartKey) < 0 { + // Aggregate if the gap between current aggregate and proposed + // compaction span overlaps (at most) two contiguous SSTables at + // the bottommost level. + span := roachpb.Span{Key: aggr.EndKey, EndKey: sc.StartKey} + maxLevel := ssti.MaxLevelSpanOverlapsContiguousSSTables(span) + if maxLevel < ssti.MaxLevel() { + return true // suggested compaction could not be aggregated + } + } + + // We can aggregate, so merge sc into aggr. + if aggr.EndKey.Compare(sc.EndKey) < 0 { + aggr.EndKey = sc.EndKey + } + aggr.Bytes += sc.Bytes + aggr.suggestions = append(aggr.suggestions, sc) + return false // aggregated successfully +} + +// isSpanEmpty returns whether the specified key span is empty (true) +// or contains keys (false). +func (c *Compactor) isSpanEmpty(ctx context.Context, start, end roachpb.Key) (bool, error) { + // If there are any suggested compactions, start the compaction timer. + var empty = true + if err := c.eng.Iterate( + engine.MVCCKey{Key: start}, + engine.MVCCKey{Key: end}, + func(_ engine.MVCCKeyValue) (bool, error) { + empty = false + return true, nil // don't continue iteration + }, + ); err != nil { + return false, err + } + return empty, nil +} + +// SuggestCompaction writes the specified compaction to persistent +// storage and pings the processing goroutine. +func (c *Compactor) SuggestCompaction(ctx context.Context, sc storagebase.SuggestedCompaction) { + log.VEventf(ctx, 2, "suggested compaction from %s - %s: %+v", sc.StartKey, sc.EndKey, sc.Compaction) + + // Check whether a suggested compaction already exists for this key span. + key := keys.StoreSuggestedCompactionKey(sc.StartKey, sc.EndKey) + var existing storagebase.Compaction + ok, _, _, err := c.eng.GetProto(engine.MVCCKey{Key: key}, &existing) + if err != nil { + log.ErrEventf(ctx, "unable to record suggested compaction: %s", err) + return + } + + // If there's already a suggested compaction, merge them. Note that + // this method is only called after clearing keys from the underlying + // storage engine. All such actions really do result in successively + // more bytes being made available for compaction, so there is no + // double-counting if the same range were cleared twice. + if ok { + sc.Bytes += existing.Bytes + } + + // Store the new compaction. + if _, _, err = engine.PutProto(c.eng, engine.MVCCKey{Key: key}, &sc.Compaction); err != nil { + log.Warningf(ctx, "unable to record suggested compaction: %s", err) + } + + // Poke the compactor goroutine to reconsider compaction in light of + // this new suggested compaction. + select { + case c.ch <- struct{}{}: + default: + } +} diff --git a/pkg/storage/compactor/compactor_test.go b/pkg/storage/compactor/compactor_test.go new file mode 100644 index 000000000000..9bc11f098121 --- /dev/null +++ b/pkg/storage/compactor/compactor_test.go @@ -0,0 +1,578 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package compactor + +import ( + "fmt" + "reflect" + "sort" + "testing" + "time" + + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" +) + +const testCompactionLatency = 1 * time.Millisecond + +type wrappedEngine struct { + *engine.RocksDB + mu struct { + syncutil.Mutex + compactions []roachpb.Span + } +} + +func newWrappedEngine() *wrappedEngine { + inMem := engine.NewInMem(roachpb.Attributes{}, 1<<20) + return &wrappedEngine{ + RocksDB: inMem.RocksDB, + } +} + +func (we *wrappedEngine) GetSSTables() engine.SSTableInfos { + key := func(s string) engine.MVCCKey { + return engine.MakeMVCCMetadataKey([]byte(s)) + } + ssti := engine.SSTableInfos{ + // Level 0. + {Level: 0, Size: 20, Start: key("a"), End: key("z")}, + {Level: 0, Size: 15, Start: key("a"), End: key("k")}, + // Level 2. + {Level: 2, Size: 200, Start: key("a"), End: key("j")}, + {Level: 2, Size: 100, Start: key("k"), End: key("o")}, + {Level: 2, Size: 100, Start: key("r"), End: key("t")}, + // Level 6. + {Level: 6, Size: 201, Start: key("a"), End: key("c")}, + {Level: 6, Size: 200, Start: key("d"), End: key("f")}, + {Level: 6, Size: 300, Start: key("h"), End: key("r")}, + {Level: 6, Size: 405, Start: key("s"), End: key("z")}, + } + sort.Sort(ssti) + return ssti +} + +func (we *wrappedEngine) CompactRange(start, end engine.MVCCKey) error { + we.mu.Lock() + defer we.mu.Unlock() + time.Sleep(testCompactionLatency) + we.mu.compactions = append(we.mu.compactions, roachpb.Span{Key: start.Key, EndKey: end.Key}) + return nil +} + +func (we *wrappedEngine) GetCompactions() []roachpb.Span { + we.mu.Lock() + defer we.mu.Unlock() + return append([]roachpb.Span(nil), we.mu.compactions...) +} + +func testSetup(capFn storeCapacityFunc) (*Compactor, *wrappedEngine, func()) { + stopper := stop.NewStopper() + eng := newWrappedEngine() + stopper.AddCloser(eng) + compactor := NewCompactor(eng, capFn) + compactor.Start(context.Background(), tracing.NewTracer(), stopper) + return compactor, eng, func() { + stopper.Stop(context.Background()) + } +} + +func key(s string) roachpb.Key { + return roachpb.Key([]byte(s)) +} + +// TestCompactorThresholds verifies the thresholding logic for the compactor. +func TestCompactorThresholds(t *testing.T) { + defer leaktest.AfterTest(t)() + + fractionThresh := defaultThresholdBytesFraction*defaultThresholdBytes + 1 + nowNanos := timeutil.Now().UnixNano() + testCases := []struct { + name string + suggestions []storagebase.SuggestedCompaction + logicalBytes int64 // logical byte count to return with store capacity + expBytesCompacted int64 + expCompactions []roachpb.Span + expUncompacted []roachpb.Span + }{ + // Single suggestion under all thresholds. + { + name: "single suggestion under all thresholds", + suggestions: []storagebase.SuggestedCompaction{ + { + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes - 1, + SuggestedAtNanos: nowNanos, + }, + }, + }, + logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold + expBytesCompacted: 0, + expCompactions: nil, + expUncompacted: []roachpb.Span{ + {Key: key("a"), EndKey: key("b")}, + }, + }, + // Single suggestion which is over absolute bytes threshold should compact. + { + name: "single suggestion over absolute threshold", + suggestions: []storagebase.SuggestedCompaction{ + { + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes, + SuggestedAtNanos: nowNanos, + }, + }, + }, + logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold + expBytesCompacted: defaultThresholdBytes, + expCompactions: []roachpb.Span{ + {Key: key("a"), EndKey: key("b")}, + }, + }, + // Single suggestion over the fractional threshold. + { + name: "single suggestion over fractional threshold", + suggestions: []storagebase.SuggestedCompaction{ + { + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: int64(fractionThresh), + SuggestedAtNanos: nowNanos, + }, + }, + }, + logicalBytes: defaultThresholdBytes, + expBytesCompacted: int64(fractionThresh), + expCompactions: []roachpb.Span{ + {Key: key("a"), EndKey: key("b")}, + }, + }, + // Double suggestion which in aggregate exceed absolute bytes threshold. + { + name: "double suggestion over absolute threshold", + suggestions: []storagebase.SuggestedCompaction{ + { + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes / 2, + SuggestedAtNanos: nowNanos, + }, + }, + { + StartKey: key("b"), EndKey: key("c"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes - (defaultThresholdBytes / 2), + SuggestedAtNanos: nowNanos, + }, + }, + }, + logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold + expBytesCompacted: defaultThresholdBytes, + expCompactions: []roachpb.Span{ + {Key: key("a"), EndKey: key("c")}, + }, + }, + // Double suggestion to same span. + { + name: "double suggestion to same span over absolute threshold", + suggestions: []storagebase.SuggestedCompaction{ + { + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes / 2, + SuggestedAtNanos: nowNanos, + }, + }, + { + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes - (defaultThresholdBytes / 2), + SuggestedAtNanos: nowNanos, + }, + }, + }, + logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold + expBytesCompacted: defaultThresholdBytes, + expCompactions: []roachpb.Span{ + {Key: key("a"), EndKey: key("b")}, + }, + }, + // Double suggestion overlapping. + { + name: "double suggestion overlapping over absolute threshold", + suggestions: []storagebase.SuggestedCompaction{ + { + StartKey: key("a"), EndKey: key("c"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes / 2, + SuggestedAtNanos: nowNanos, + }, + }, + { + StartKey: key("b"), EndKey: key("d"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes - (defaultThresholdBytes / 2), + SuggestedAtNanos: nowNanos, + }, + }, + }, + logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold + expBytesCompacted: defaultThresholdBytes, + expCompactions: []roachpb.Span{ + {Key: key("a"), EndKey: key("d")}, + }, + }, + // Double suggestion which in aggregate exceeds fractional bytes threshold. + { + name: "double suggestion over fractional threshold", + suggestions: []storagebase.SuggestedCompaction{ + { + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: int64(fractionThresh / 2), + SuggestedAtNanos: nowNanos, + }, + }, + { + StartKey: key("b"), EndKey: key("c"), + Compaction: storagebase.Compaction{ + Bytes: int64(fractionThresh) - int64(fractionThresh/2), + SuggestedAtNanos: nowNanos, + }, + }, + }, + logicalBytes: defaultThresholdBytes, + expBytesCompacted: int64(fractionThresh), + expCompactions: []roachpb.Span{ + {Key: key("a"), EndKey: key("c")}, + }, + }, + // Double suggestion without excessive gap. + { + name: "double suggestion without excessive gap", + suggestions: []storagebase.SuggestedCompaction{ + { + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes / 2, + SuggestedAtNanos: nowNanos, + }, + }, + // There are only two sstables between ("b", "e") at the max level. + { + StartKey: key("e"), EndKey: key("f"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes - (defaultThresholdBytes / 2), + SuggestedAtNanos: nowNanos, + }, + }, + }, + logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold + expBytesCompacted: defaultThresholdBytes, + expCompactions: []roachpb.Span{ + {Key: key("a"), EndKey: key("f")}, + }, + }, + // Double suggestion with excessive gap. + { + name: "double suggestion with excessive gap", + suggestions: []storagebase.SuggestedCompaction{ + { + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes / 2, + SuggestedAtNanos: nowNanos, + }, + }, + // There are three sstables between ("b", "h0") at the max level. + { + StartKey: key("h0"), EndKey: key("i"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes - (defaultThresholdBytes / 2), + SuggestedAtNanos: nowNanos, + }, + }, + }, + logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold + expBytesCompacted: 0, + expCompactions: nil, + expUncompacted: []roachpb.Span{ + {Key: key("a"), EndKey: key("b")}, + {Key: key("h0"), EndKey: key("i")}, + }, + }, + // Double suggestion with excessive gap, but both over absolute threshold. + { + name: "double suggestion with excessive gap but both over threshold", + suggestions: []storagebase.SuggestedCompaction{ + { + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes, + SuggestedAtNanos: nowNanos, + }, + }, + // There are three sstables between ("b", "h0") at the max level. + { + StartKey: key("h0"), EndKey: key("i"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes, + SuggestedAtNanos: nowNanos, + }, + }, + }, + logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold + expBytesCompacted: defaultThresholdBytes * 2, + expCompactions: []roachpb.Span{ + {Key: key("a"), EndKey: key("b")}, + {Key: key("h0"), EndKey: key("i")}, + }, + }, + // Double suggestion with excessive gap, with just one over absolute threshold. + { + name: "double suggestion with excessive gap but one over threshold", + suggestions: []storagebase.SuggestedCompaction{ + { + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes, + SuggestedAtNanos: nowNanos, + }, + }, + // There are three sstables between ("b", "h0") at the max level. + { + StartKey: key("h0"), EndKey: key("i"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes - 1, + SuggestedAtNanos: nowNanos, + }, + }, + }, + logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold + expBytesCompacted: defaultThresholdBytes, + expCompactions: []roachpb.Span{ + {Key: key("a"), EndKey: key("b")}, + }, + expUncompacted: []roachpb.Span{ + {Key: key("h0"), EndKey: key("i")}, + }, + }, + // Quadruple suggestion which can be aggregated into a single compaction. + { + name: "quadruple suggestion which aggregates", + suggestions: []storagebase.SuggestedCompaction{ + { + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes / 4, + SuggestedAtNanos: nowNanos, + }, + }, + { + StartKey: key("e"), EndKey: key("f0"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes / 4, + SuggestedAtNanos: nowNanos, + }, + }, + { + StartKey: key("g"), EndKey: key("q"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes / 4, + SuggestedAtNanos: nowNanos, + }, + }, + { + StartKey: key("y"), EndKey: key("zzz"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes - 3*(defaultThresholdBytes/4), + SuggestedAtNanos: nowNanos, + }, + }, + }, + logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold + expBytesCompacted: defaultThresholdBytes, + expCompactions: []roachpb.Span{ + {Key: key("a"), EndKey: key("zzz")}, + }, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + capacityFn := func() (roachpb.StoreCapacity, error) { + return roachpb.StoreCapacity{ + LogicalBytes: test.logicalBytes, + }, nil + } + compactor, we, cleanup := testSetup(capacityFn) + defer cleanup() + // Shorten wait times for compactor processing. + compactor.opts.CompactionMinInterval = time.Millisecond + + for _, sc := range test.suggestions { + compactor.SuggestCompaction(context.Background(), sc) + } + + // If we expect no compaction, pause to ensure test will fail if + // a compaction happens. Note that 10ms is not enough to make + // this fail all the time, but it will surely trigger a failure + // most of the time. + if len(test.expCompactions) == 0 { + time.Sleep(10 * time.Millisecond) + } + testutils.SucceedsSoon(t, func() error { + comps := we.GetCompactions() + if !reflect.DeepEqual(test.expCompactions, comps) { + return fmt.Errorf("expected %+v; got %+v", test.expCompactions, comps) + } + if a, e := compactor.Metrics.BytesCompacted.Count(), test.expBytesCompacted; a != e { + return fmt.Errorf("expected bytes compacted %d; got %d", e, a) + } + if a, e := compactor.Metrics.Compactions.Count(), int64(len(test.expCompactions)); a != e { + return fmt.Errorf("expected compactions %d; got %d", e, a) + } + if len(test.expCompactions) == 0 { + if cn := compactor.Metrics.CompactingNanos.Count(); cn > 0 { + return fmt.Errorf("expected compaction time to be 0; got %d", cn) + } + } else { + expNanos := int64(len(test.expCompactions)) * int64(testCompactionLatency) + if a, e := compactor.Metrics.CompactingNanos.Count(), expNanos; a < e { + return fmt.Errorf("expected compacting nanos > %d; got %d", e, a) + } + } + // Read the remaining suggestions in the queue; verify compacted + // spans have been cleared and uncompacted spans remain. + var idx int + return we.Iterate( + engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMin}, + engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMax}, + func(kv engine.MVCCKeyValue) (bool, error) { + start, end, err := keys.DecodeStoreSuggestedCompactionKey(kv.Key.Key) + if err != nil { + t.Fatalf("failed to decode suggested compaction key: %s", err) + } + if idx >= len(test.expUncompacted) { + return true, fmt.Errorf("found unexpected uncompacted span %s-%s", start, end) + } + if !start.Equal(test.expUncompacted[idx].Key) || !end.Equal(test.expUncompacted[idx].EndKey) { + return true, fmt.Errorf("found unexpected uncompacted span %s-%s; expected %s-%s", + start, end, test.expUncompacted[idx].Key, test.expUncompacted[idx].EndKey) + } + idx++ + return false, nil // continue iteration + }, + ) + }) + }) + } +} + +// TestCompactorProcessingInitialization verifies that a compactor gets +// started with processing if the queue is non-empty. +func TestCompactorProcessingInitialization(t *testing.T) { + defer leaktest.AfterTest(t)() + + capacityFn := func() (roachpb.StoreCapacity, error) { + return roachpb.StoreCapacity{LogicalBytes: 100 * defaultThresholdBytes}, nil + } + compactor, we, cleanup := testSetup(capacityFn) + defer cleanup() + + // Add a suggested compaction -- this won't get processed by this + // compactor for two minutes. + compactor.opts.CompactionMinInterval = time.Hour + compactor.SuggestCompaction(context.Background(), storagebase.SuggestedCompaction{ + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes, + SuggestedAtNanos: timeutil.Now().UnixNano(), + }, + }) + + // Create a new fast compactor with a short wait time for processing, + // using the same engine so that it sees a non-empty queue. + stopper := stop.NewStopper() + fastCompactor := NewCompactor(we, capacityFn) + fastCompactor.opts.CompactionMinInterval = time.Millisecond + fastCompactor.Start(context.Background(), tracing.NewTracer(), stopper) + defer stopper.Stop(context.Background()) + + testutils.SucceedsSoon(t, func() error { + comps := we.GetCompactions() + expComps := []roachpb.Span{{Key: key("a"), EndKey: key("b")}} + if !reflect.DeepEqual(expComps, comps) { + return fmt.Errorf("expected %+v; got %+v", expComps, comps) + } + return nil + }) +} + +// TestCompactorCleansUpOldRecords verifies that records which exceed +// the maximum age are deleted if they cannot be compacted. +func TestCompactorCleansUpOldRecords(t *testing.T) { + defer leaktest.AfterTest(t)() + + capacityFn := func() (roachpb.StoreCapacity, error) { + return roachpb.StoreCapacity{LogicalBytes: 100 * defaultThresholdBytes}, nil + } + compactor, we, cleanup := testSetup(capacityFn) + compactor.opts.CompactionMinInterval = time.Millisecond + compactor.opts.MaxSuggestedCompactionRecordAge = 1 * time.Millisecond + defer cleanup() + + // Add a suggested compaction that won't get processed because it's + // not over any of the thresholds. + compactor.SuggestCompaction(context.Background(), storagebase.SuggestedCompaction{ + StartKey: key("a"), EndKey: key("b"), + Compaction: storagebase.Compaction{ + Bytes: defaultThresholdBytes - 1, + SuggestedAtNanos: timeutil.Now().UnixNano(), + }, + }) + + // Verify that the record is deleted without a compaction and that the + // bytes are recorded as having been skipped. + testutils.SucceedsSoon(t, func() error { + comps := we.GetCompactions() + if !reflect.DeepEqual([]roachpb.Span(nil), comps) { + return fmt.Errorf("expected nil compactions; got %+v", comps) + } + if a, e := compactor.Metrics.BytesSkipped.Count(), compactor.opts.ThresholdBytes-1; a != e { + return fmt.Errorf("expected skipped bytes %d; got %d", e, a) + } + // Verify compaction queue is empty. + if empty, err := compactor.isSpanEmpty( + context.Background(), keys.LocalStoreSuggestedCompactionsMin, keys.LocalStoreSuggestedCompactionsMax, + ); err != nil || !empty { + return fmt.Errorf("compaction queue not empty or err: %t, %v", empty, err) + } + return nil + }) +} diff --git a/pkg/storage/compactor/metrics.go b/pkg/storage/compactor/metrics.go new file mode 100644 index 000000000000..b50b55d028c5 --- /dev/null +++ b/pkg/storage/compactor/metrics.go @@ -0,0 +1,60 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package compactor + +import "github.com/cockroachdb/cockroach/pkg/util/metric" + +// Metrics holds all metrics relating to a Compactor. +type Metrics struct { + BytesQueued *metric.Gauge + BytesSkipped *metric.Counter + BytesCompacted *metric.Counter + Compactions *metric.Counter + CompactingNanos *metric.Counter +} + +// MetricStruct implements the metrics.Struct interface. +func (Metrics) MetricStruct() {} + +var _ metric.Struct = Metrics{} + +var ( + metaBytesQueued = metric.Metadata{ + Name: "compactor.suggestionbytes.queued", + Help: "Number of logical bytes in suggested compactions in the queue"} + metaBytesSkipped = metric.Metadata{ + Name: "compactor.suggestionbytes.skipped", + Help: "Number of logical bytes in suggested compactions which were not compacted"} + metaBytesCompacted = metric.Metadata{ + Name: "compactor.suggestionbytes.compacted", + Help: "Number of logical bytes compacted from suggested compactions"} + metaCompactions = metric.Metadata{ + Name: "compactor.compactions", + Help: "Number of compaction requests sent to the storage engine"} + metaCompactingNanos = metric.Metadata{ + Name: "compactor.compactingnanos", + Help: "Number of nanoseconds spent compacting ranges"} +) + +// makeMetrics returns a Metrics struct. +func makeMetrics() Metrics { + return Metrics{ + BytesQueued: metric.NewGauge(metaBytesQueued), + BytesSkipped: metric.NewCounter(metaBytesSkipped), + BytesCompacted: metric.NewCounter(metaBytesCompacted), + Compactions: metric.NewCounter(metaCompactions), + CompactingNanos: metric.NewCounter(metaCompactingNanos), + } +} diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go index ee8aa3d52128..a80b1c270b71 100644 --- a/pkg/storage/engine/engine.go +++ b/pkg/storage/engine/engine.go @@ -233,6 +233,17 @@ type Engine interface { IngestExternalFile(ctx context.Context, path string, move bool) error // ApproximateDiskBytes returns an approximation of the on-disk size for the given key span. ApproximateDiskBytes(from, to roachpb.Key) (uint64, error) + // CompactRange ensures that the specified range of key value pairs is + // optimized for space efficiency. + CompactRange(start, end MVCCKey) error +} + +// WithSSTables extends the Engine interface with a method to get info +// on all SSTables in use. +type WithSSTables interface { + Engine + // GetSSTables retrieves metadata about this engine's live sstables. + GetSSTables() SSTableInfos } // Batch is the interface for batch specific operations. diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index 1934c7e6b497..200a8e398120 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -104,7 +104,7 @@ const ( MinimumMaxOpenFiles = 1700 ) -// SSTableInfo contains metadata about a single RocksDB sstable. This mirrors +// SSTableInfo contains metadata about a single sstable. Note this mirrors // the C.DBSSTable struct contents. type SSTableInfo struct { Level int @@ -261,6 +261,131 @@ func (s SSTableInfos) ReadAmplification() int { return readAmp } +// SSTableInfosByLevel maintains slices of SSTableInfo objects, one +// per level. The slice for each level contains the SSTableInfo +// objects for SSTables at that level, sorted by start key. +type SSTableInfosByLevel struct { + // Each level is a slice of SSTableInfos. + levels [][]SSTableInfo +} + +// NewSSTableInfosByLevel returns a new SSTableInfosByLevel object +// based on the supplied SSTableInfos slice. +func NewSSTableInfosByLevel(s SSTableInfos) SSTableInfosByLevel { + var result SSTableInfosByLevel + for _, t := range s { + for i := len(result.levels); i <= t.Level; i++ { + result.levels = append(result.levels, []SSTableInfo{}) + } + result.levels[t.Level] = append(result.levels[t.Level], t) + } + // Sort each level by start key. + for _, l := range result.levels { + sort.Slice(l, func(i, j int) bool { return l[i].Start.Less(l[j].Start) }) + } + return result +} + +// MaxLevel returns the maximum level for which there are SSTables. +func (s *SSTableInfosByLevel) MaxLevel() int { + return len(s.levels) - 1 +} + +// MaxLevelSpanOverlapsContiguousSSTables returns the maximum level at +// which the specified key span overlaps either none, one, or at most +// two contiguous SSTables. Level 0 is returned if no level qualifies. +// +// This is useful when considering when to merge two compactions. In +// this case, the method is called with the "gap" between the two +// spans to be compacted. When the result is that the gap span touches +// at most two SSTables at a high level, it suggests that merging the +// two compactions is a good idea (as the up to two SSTables touched +// by the gap span, due to containing endpoints of the existing +// compactions, would be rewritten anyway). +// +// As an example, consider the following sstables in a small database: +// +// Level 0. +// {Level: 0, Size: 20, Start: key("a"), End: key("z")}, +// {Level: 0, Size: 15, Start: key("a"), End: key("k")}, +// Level 2. +// {Level: 2, Size: 200, Start: key("a"), End: key("j")}, +// {Level: 2, Size: 100, Start: key("k"), End: key("o")}, +// {Level: 2, Size: 100, Start: key("r"), End: key("t")}, +// Level 6. +// {Level: 6, Size: 201, Start: key("a"), End: key("c")}, +// {Level: 6, Size: 200, Start: key("d"), End: key("f")}, +// {Level: 6, Size: 300, Start: key("h"), End: key("r")}, +// {Level: 6, Size: 405, Start: key("s"), End: key("z")}, +// +// - The span "a"-"c" overlaps only a single SSTable at the max level +// (L6). That's great, so we definitely want to compact that. +// - The span "s"-"t" overlaps zero SSTables at the max level (L6). +// Again, great! That means we're going to compact the 3rd L2 +// SSTable and maybe push that directly to L6. +func (s *SSTableInfosByLevel) MaxLevelSpanOverlapsContiguousSSTables(span roachpb.Span) int { + // Note overlapsMoreTHanTwo should not be called on level 0, where + // the SSTables are not guaranteed disjoint. + overlapsMoreThanTwo := func(tables []SSTableInfo) bool { + // Search to find the first sstable which might overlap the span. + i := sort.Search(len(tables), func(i int) bool { return span.Key.Compare(tables[i].End.Key) < 0 }) + // If no SSTable is overlapped, return false. + if i == -1 || i == len(tables) || span.EndKey.Compare(tables[i].Start.Key) < 0 { + return false + } + // Return true if the span is not subsumed by the combination of + // this sstable and the next. This logic is complicated and is + // covered in the unittest. There are three successive conditions + // which together ensure the span doesn't overlap > 2 SSTables. + // + // - If the first overlapped SSTable is the last. + // - If the span does not exceed the end of the next SSTable. + // - If the span does not overlap the start of the next next SSTable. + if i >= len(tables)-1 { + // First overlapped SSTable is the last (right-most) SSTable. + // Span: [c-----f) + // SSTs: [a---d) + // or + // SSTs: [a-----------q) + return false + } + if span.EndKey.Compare(tables[i+1].End.Key) <= 0 { + // Span does not reach outside of this SSTable's right neighbor. + // Span: [c------f) + // SSTs: [a---d) [e-f) ... + return false + } + if i >= len(tables)-2 { + // Span reaches outside of this SSTable's right neighbor, but + // there are no more SSTables to the right. + // Span: [c-------------x) + // SSTs: [a---d) [e---q) + return false + } + if span.EndKey.Compare(tables[i+2].Start.Key) <= 0 { + // There's another SSTable two to the right, but the span doesn't + // reach into it. + // Span: [c------------x) + // SSTs: [a---d) [e---q) [x--z) ... + return false + } + + // Touching at least three SSTables. + // Span: [c-------------y) + // SSTs: [a---d) [e---q) [x--z) ... + return true + } + // Note that we never consider level 0, where SSTables can overlap. + // Level 0 is instead returned as a catch-all which means that there + // is no level where the span overlaps only two or fewer SSTables. + for i := len(s.levels) - 1; i > 0; i-- { + if !overlapsMoreThanTwo(s.levels[i]) { + return i + } + } + return 0 +} + // RocksDBCache is a wrapper around C.DBCache type RocksDBCache struct { cache *C.DBCache @@ -718,11 +843,16 @@ func (r *RocksDB) Capacity() (roachpb.StoreCapacity, error) { }, nil } -// Compact forces compaction on the database. +// Compact forces compaction over the entire database. func (r *RocksDB) Compact() error { return statusToError(C.DBCompact(r.rdb)) } +// CompactRange forces compaction over a specified range of keys in the database. +func (r *RocksDB) CompactRange(start, end MVCCKey) error { + return statusToError(C.DBCompactRange(r.rdb, goToCKey(start), goToCKey(end))) +} + // ApproximateDiskBytes returns the approximate on-disk size of the specified key range. func (r *RocksDB) ApproximateDiskBytes(from, to roachpb.Key) (uint64, error) { start := MVCCKey{Key: from} diff --git a/pkg/storage/engine/rocksdb_test.go b/pkg/storage/engine/rocksdb_test.go index 389a5d1107af..a832ae5f1478 100644 --- a/pkg/storage/engine/rocksdb_test.go +++ b/pkg/storage/engine/rocksdb_test.go @@ -745,6 +745,10 @@ func TestRocksDBTimeBound(t *testing.T) { } } +func key(s string) MVCCKey { + return MakeMVCCMetadataKey([]byte(s)) +} + // Regression test for https://github.com/facebook/rocksdb/issues/2752. Range // deletion tombstones between different snapshot stripes are not stored in // order, so the first tombstone of each snapshot stripe should be checked as a @@ -766,9 +770,6 @@ func TestRocksDBDeleteRangeBug(t *testing.T) { } defer db.Close() - key := func(s string) MVCCKey { - return MakeMVCCMetadataKey([]byte(s)) - } if err := db.Put(key("a"), []byte("a")); err != nil { t.Fatal(err) } @@ -805,3 +806,111 @@ func TestRocksDBDeleteRangeBug(t *testing.T) { } iter.Close() } + +func createTestSSTableInfos() SSTableInfos { + ssti := SSTableInfos{ + // Level 0. + {Level: 0, Size: 20, Start: key("a"), End: key("z")}, + {Level: 0, Size: 15, Start: key("a"), End: key("k")}, + // Level 1. + {Level: 1, Size: 200, Start: key("a"), End: key("j")}, + {Level: 1, Size: 100, Start: key("k"), End: key("o")}, + {Level: 1, Size: 100, Start: key("r"), End: key("t")}, + // Level 2. + {Level: 2, Size: 201, Start: key("a"), End: key("c")}, + {Level: 2, Size: 200, Start: key("d"), End: key("f")}, + {Level: 2, Size: 300, Start: key("h"), End: key("r")}, + {Level: 2, Size: 405, Start: key("s"), End: key("z")}, + // Level 3. + {Level: 3, Size: 667, Start: key("a"), End: key("c")}, + {Level: 3, Size: 230, Start: key("d"), End: key("f")}, + {Level: 3, Size: 332, Start: key("h"), End: key("i")}, + {Level: 3, Size: 923, Start: key("k"), End: key("n")}, + {Level: 3, Size: 143, Start: key("n"), End: key("o")}, + {Level: 3, Size: 621, Start: key("p"), End: key("s")}, + {Level: 3, Size: 411, Start: key("u"), End: key("x")}, + // Level 4. + {Level: 4, Size: 215, Start: key("a"), End: key("b")}, + {Level: 4, Size: 211, Start: key("b"), End: key("d")}, + {Level: 4, Size: 632, Start: key("e"), End: key("f")}, + {Level: 4, Size: 813, Start: key("f"), End: key("h")}, + {Level: 4, Size: 346, Start: key("h"), End: key("j")}, + {Level: 4, Size: 621, Start: key("j"), End: key("l")}, + {Level: 4, Size: 681, Start: key("m"), End: key("o")}, + {Level: 4, Size: 521, Start: key("o"), End: key("r")}, + {Level: 4, Size: 135, Start: key("r"), End: key("t")}, + {Level: 4, Size: 622, Start: key("t"), End: key("v")}, + {Level: 4, Size: 672, Start: key("x"), End: key("z")}, + } + sort.Sort(ssti) + return ssti +} + +func TestSSTableInfosByLevel(t *testing.T) { + defer leaktest.AfterTest(t)() + ssti := NewSSTableInfosByLevel(createTestSSTableInfos()) + + // First, verify that each level is sorted by start key, not size. + for level, l := range ssti.levels { + if level == 0 { + continue + } + lastInfo := l[0] + for _, info := range l[1:] { + if !lastInfo.Start.Less(info.Start) { + t.Errorf("sort failed (%s >= %s) for level %d", lastInfo.Start, info.Start, level) + } + } + } + if a, e := ssti.MaxLevel(), 4; a != e { + t.Errorf("expected MaxLevel() == %d; got %d", e, a) + } + + // Next, verify various contiguous overlap scenarios. + testCases := []struct { + span roachpb.Span + expMaxLevel int + }{ + // The full a-z span overlaps more than two SSTables at all levels L1-L4 + {span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")}, expMaxLevel: 0}, + // The a-j span overlaps the first three SSTables in L2, so max level is L1. + {span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("j")}, expMaxLevel: 1}, + // The k-o span overlaps only two adjacent L4 SSTs: j-l & m-o. + {span: roachpb.Span{Key: roachpb.Key("k"), EndKey: roachpb.Key("o")}, expMaxLevel: 4}, + // The K0-o0 span hits three SSTs in L4: j-l, m-o, & o-r. + {span: roachpb.Span{Key: roachpb.Key("k0"), EndKey: roachpb.Key("o0")}, expMaxLevel: 3}, + // The k-z span overlaps the last 4 SSTs in L3. + {span: roachpb.Span{Key: roachpb.Key("k"), EndKey: roachpb.Key("z")}, expMaxLevel: 2}, + // The c-c0 span overlaps only the second L4 SST. + {span: roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("c0")}, expMaxLevel: 4}, + // The a-f span full overlaps the first three L4 SSTs. + {span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("f")}, expMaxLevel: 3}, + // The a-d0 span only overlaps the first two L4 SSTs. + {span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d0")}, expMaxLevel: 4}, + // The a-e span only overlaps the first two L4 SSTs. It only is adjacent to the 3rd. + {span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("e")}, expMaxLevel: 4}, + // The a-d span overlaps fully the first two L4 SSTs. + {span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")}, expMaxLevel: 4}, + // The a-a0 span overlaps only the first L4 SST. + {span: roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("a0")}, expMaxLevel: 4}, + // The 0-1 span doesn't overlap any L4 SSTs. + {span: roachpb.Span{Key: roachpb.Key("0"), EndKey: roachpb.Key("1")}, expMaxLevel: 4}, + // The Z-a span doesn't overlap any L4 SSTs, just touches the start of the first. + {span: roachpb.Span{Key: roachpb.Key("Z"), EndKey: roachpb.Key("a")}, expMaxLevel: 4}, + // The Z-a0 span overlaps only the first L4 SST. + {span: roachpb.Span{Key: roachpb.Key("Z"), EndKey: roachpb.Key("a0")}, expMaxLevel: 4}, + // The z-z0 span doesn't overlap any L4 SSTs, just touches the end of the last. + {span: roachpb.Span{Key: roachpb.Key("z"), EndKey: roachpb.Key("z0")}, expMaxLevel: 4}, + // The y-z0 span overlaps the last L4 SST. + {span: roachpb.Span{Key: roachpb.Key("y"), EndKey: roachpb.Key("z0")}, expMaxLevel: 4}, + } + + for _, test := range testCases { + t.Run(fmt.Sprintf("%s-%s", test.span.Key, test.span.EndKey), func(t *testing.T) { + maxLevel := ssti.MaxLevelSpanOverlapsContiguousSSTables(test.span) + if test.expMaxLevel != maxLevel { + t.Errorf("expected max level %d; got %d", test.expMaxLevel, maxLevel) + } + }) + } +} diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 07e66c23ac52..0a573791d854 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -633,6 +633,11 @@ func (r *Replica) handleReplicatedEvalResult( } } + for _, sc := range rResult.SuggestedCompactions { + r.store.compactor.SuggestCompaction(ctx, sc) + } + rResult.SuggestedCompactions = nil + if !rResult.Equal(storagebase.ReplicatedEvalResult{}) { log.Fatalf(ctx, "unhandled field in ReplicatedEvalResult: %s", pretty.Diff(rResult, storagebase.ReplicatedEvalResult{})) } diff --git a/pkg/storage/storagebase/proposer_kv.pb.go b/pkg/storage/storagebase/proposer_kv.pb.go index 2b9d63aed1b4..f214dd3caa60 100644 --- a/pkg/storage/storagebase/proposer_kv.pb.go +++ b/pkg/storage/storagebase/proposer_kv.pb.go @@ -12,6 +12,8 @@ Split Merge ChangeReplicas + Compaction + SuggestedCompaction ReplicatedEvalResult WriteBatch RaftCommand @@ -87,6 +89,32 @@ func (m *ChangeReplicas) Reset() { *m = ChangeReplicas{} } func (*ChangeReplicas) ProtoMessage() {} func (*ChangeReplicas) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{2} } +// Compaction holds core details about a suggested compaction. +type Compaction struct { + // bytes indicates the expected space reclamation from compaction. + Bytes int64 `protobuf:"varint,1,opt,name=bytes,proto3" json:"bytes,omitempty"` + // suggested_at is nanoseconds since the epoch. + SuggestedAtNanos int64 `protobuf:"varint,2,opt,name=suggested_at_nanos,json=suggestedAtNanos,proto3" json:"suggested_at_nanos,omitempty"` +} + +func (m *Compaction) Reset() { *m = Compaction{} } +func (m *Compaction) String() string { return proto.CompactTextString(m) } +func (*Compaction) ProtoMessage() {} +func (*Compaction) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{3} } + +// SuggestedCompaction holds start and end keys in conjunction with +// the compaction details. +type SuggestedCompaction struct { + StartKey github_com_cockroachdb_cockroach_pkg_roachpb.Key `protobuf:"bytes,1,opt,name=start_key,json=startKey,proto3,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.Key" json:"start_key,omitempty"` + EndKey github_com_cockroachdb_cockroach_pkg_roachpb.Key `protobuf:"bytes,2,opt,name=end_key,json=endKey,proto3,casttype=github.com/cockroachdb/cockroach/pkg/roachpb.Key" json:"end_key,omitempty"` + Compaction `protobuf:"bytes,3,opt,name=compaction,embedded=compaction" json:"compaction"` +} + +func (m *SuggestedCompaction) Reset() { *m = SuggestedCompaction{} } +func (m *SuggestedCompaction) String() string { return proto.CompactTextString(m) } +func (*SuggestedCompaction) ProtoMessage() {} +func (*SuggestedCompaction) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{4} } + // ReplicatedEvalResult is the structured information which together with // a RocksDB WriteBatch constitutes the proposal payload in proposer-evaluated // KV. For the majority of proposals, we expect ReplicatedEvalResult to be @@ -119,12 +147,16 @@ type ReplicatedEvalResult struct { ChangeReplicas *ChangeReplicas `protobuf:"bytes,12,opt,name=change_replicas,json=changeReplicas" json:"change_replicas,omitempty"` RaftLogDelta int64 `protobuf:"varint,13,opt,name=raft_log_delta,json=raftLogDelta,proto3" json:"raft_log_delta,omitempty"` AddSSTable *ReplicatedEvalResult_AddSSTable `protobuf:"bytes,17,opt,name=add_sstable,json=addSstable" json:"add_sstable,omitempty"` + // suggested_compactions are sent to the engine's compactor to + // reclaim storage space after garbage collection or cleared / + // rebalanced ranges. + SuggestedCompactions []SuggestedCompaction `protobuf:"bytes,19,rep,name=suggested_compactions,json=suggestedCompactions" json:"suggested_compactions"` } func (m *ReplicatedEvalResult) Reset() { *m = ReplicatedEvalResult{} } func (m *ReplicatedEvalResult) String() string { return proto.CompactTextString(m) } func (*ReplicatedEvalResult) ProtoMessage() {} -func (*ReplicatedEvalResult) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{3} } +func (*ReplicatedEvalResult) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{5} } // AddSSTable is a side effect that must execute before the Raft application // is committed. It must be idempotent to account for an ill-timed crash after @@ -145,7 +177,7 @@ func (m *ReplicatedEvalResult_AddSSTable) Reset() { *m = ReplicatedEvalR func (m *ReplicatedEvalResult_AddSSTable) String() string { return proto.CompactTextString(m) } func (*ReplicatedEvalResult_AddSSTable) ProtoMessage() {} func (*ReplicatedEvalResult_AddSSTable) Descriptor() ([]byte, []int) { - return fileDescriptorProposerKv, []int{3, 0} + return fileDescriptorProposerKv, []int{5, 0} } // WriteBatch is the serialized representation of a RocksDB write @@ -160,7 +192,7 @@ type WriteBatch struct { func (m *WriteBatch) Reset() { *m = WriteBatch{} } func (m *WriteBatch) String() string { return proto.CompactTextString(m) } func (*WriteBatch) ProtoMessage() {} -func (*WriteBatch) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{4} } +func (*WriteBatch) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{6} } // RaftCommand is the message written to the raft log. It contains // some metadata about the proposal itself, then either a BatchRequest @@ -229,12 +261,14 @@ type RaftCommand struct { func (m *RaftCommand) Reset() { *m = RaftCommand{} } func (m *RaftCommand) String() string { return proto.CompactTextString(m) } func (*RaftCommand) ProtoMessage() {} -func (*RaftCommand) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{5} } +func (*RaftCommand) Descriptor() ([]byte, []int) { return fileDescriptorProposerKv, []int{7} } func init() { proto.RegisterType((*Split)(nil), "cockroach.storage.storagebase.Split") proto.RegisterType((*Merge)(nil), "cockroach.storage.storagebase.Merge") proto.RegisterType((*ChangeReplicas)(nil), "cockroach.storage.storagebase.ChangeReplicas") + proto.RegisterType((*Compaction)(nil), "cockroach.storage.storagebase.Compaction") + proto.RegisterType((*SuggestedCompaction)(nil), "cockroach.storage.storagebase.SuggestedCompaction") proto.RegisterType((*ReplicatedEvalResult)(nil), "cockroach.storage.storagebase.ReplicatedEvalResult") proto.RegisterType((*ReplicatedEvalResult_AddSSTable)(nil), "cockroach.storage.storagebase.ReplicatedEvalResult.AddSSTable") proto.RegisterType((*WriteBatch)(nil), "cockroach.storage.storagebase.WriteBatch") @@ -333,6 +367,75 @@ func (this *ChangeReplicas) Equal(that interface{}) bool { } return true } +func (this *Compaction) Equal(that interface{}) bool { + if that == nil { + if this == nil { + return true + } + return false + } + + that1, ok := that.(*Compaction) + if !ok { + that2, ok := that.(Compaction) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + if this == nil { + return true + } + return false + } else if this == nil { + return false + } + if this.Bytes != that1.Bytes { + return false + } + if this.SuggestedAtNanos != that1.SuggestedAtNanos { + return false + } + return true +} +func (this *SuggestedCompaction) Equal(that interface{}) bool { + if that == nil { + if this == nil { + return true + } + return false + } + + that1, ok := that.(*SuggestedCompaction) + if !ok { + that2, ok := that.(SuggestedCompaction) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + if this == nil { + return true + } + return false + } else if this == nil { + return false + } + if !bytes.Equal(this.StartKey, that1.StartKey) { + return false + } + if !bytes.Equal(this.EndKey, that1.EndKey) { + return false + } + if !this.Compaction.Equal(&that1.Compaction) { + return false + } + return true +} func (this *ReplicatedEvalResult) Equal(that interface{}) bool { if that == nil { if this == nil { @@ -403,6 +506,14 @@ func (this *ReplicatedEvalResult) Equal(that interface{}) bool { if !this.AddSSTable.Equal(that1.AddSSTable) { return false } + if len(this.SuggestedCompactions) != len(that1.SuggestedCompactions) { + return false + } + for i := range this.SuggestedCompactions { + if !this.SuggestedCompactions[i].Equal(&that1.SuggestedCompactions[i]) { + return false + } + } return true } func (this *ReplicatedEvalResult_AddSSTable) Equal(that interface{}) bool { @@ -524,6 +635,72 @@ func (m *ChangeReplicas) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *Compaction) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Compaction) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Bytes != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintProposerKv(dAtA, i, uint64(m.Bytes)) + } + if m.SuggestedAtNanos != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintProposerKv(dAtA, i, uint64(m.SuggestedAtNanos)) + } + return i, nil +} + +func (m *SuggestedCompaction) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SuggestedCompaction) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.StartKey) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintProposerKv(dAtA, i, uint64(len(m.StartKey))) + i += copy(dAtA[i:], m.StartKey) + } + if len(m.EndKey) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintProposerKv(dAtA, i, uint64(len(m.EndKey))) + i += copy(dAtA[i:], m.EndKey) + } + dAtA[i] = 0x1a + i++ + i = encodeVarintProposerKv(dAtA, i, uint64(m.Compaction.Size())) + n5, err := m.Compaction.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n5 + return i, nil +} + func (m *ReplicatedEvalResult) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -553,41 +730,41 @@ func (m *ReplicatedEvalResult) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.State.Size())) - n5, err := m.State.MarshalTo(dAtA[i:]) + n6, err := m.State.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n5 + i += n6 } if m.Split != nil { dAtA[i] = 0x1a i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.Split.Size())) - n6, err := m.Split.MarshalTo(dAtA[i:]) + n7, err := m.Split.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n6 + i += n7 } if m.Merge != nil { dAtA[i] = 0x22 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.Merge.Size())) - n7, err := m.Merge.MarshalTo(dAtA[i:]) + n8, err := m.Merge.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n7 + i += n8 } if m.ComputeChecksum != nil { dAtA[i] = 0x2a i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.ComputeChecksum.Size())) - n8, err := m.ComputeChecksum.MarshalTo(dAtA[i:]) + n9, err := m.ComputeChecksum.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n8 + i += n9 } if m.IsLeaseRequest { dAtA[i] = 0x30 @@ -602,11 +779,11 @@ func (m *ReplicatedEvalResult) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x42 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.Timestamp.Size())) - n9, err := m.Timestamp.MarshalTo(dAtA[i:]) + n10, err := m.Timestamp.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n9 + i += n10 if m.IsConsistencyRelated { dAtA[i] = 0x48 i++ @@ -621,21 +798,21 @@ func (m *ReplicatedEvalResult) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x52 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.DeprecatedDelta.Size())) - n10, err := m.DeprecatedDelta.MarshalTo(dAtA[i:]) + n11, err := m.DeprecatedDelta.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n10 + i += n11 } if m.ChangeReplicas != nil { dAtA[i] = 0x62 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.ChangeReplicas.Size())) - n11, err := m.ChangeReplicas.MarshalTo(dAtA[i:]) + n12, err := m.ChangeReplicas.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n11 + i += n12 } if m.RaftLogDelta != 0 { dAtA[i] = 0x68 @@ -660,22 +837,36 @@ func (m *ReplicatedEvalResult) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x1 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.AddSSTable.Size())) - n12, err := m.AddSSTable.MarshalTo(dAtA[i:]) + n13, err := m.AddSSTable.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n12 + i += n13 } dAtA[i] = 0x92 i++ dAtA[i] = 0x1 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.Delta.Size())) - n13, err := m.Delta.MarshalTo(dAtA[i:]) + n14, err := m.Delta.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n13 + i += n14 + if len(m.SuggestedCompactions) > 0 { + for _, msg := range m.SuggestedCompactions { + dAtA[i] = 0x9a + i++ + dAtA[i] = 0x1 + i++ + i = encodeVarintProposerKv(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } return i, nil } @@ -750,20 +941,20 @@ func (m *RaftCommand) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.ProposerReplica.Size())) - n14, err := m.ProposerReplica.MarshalTo(dAtA[i:]) + n15, err := m.ProposerReplica.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n14 + i += n15 if m.TestingBatchRequest != nil { dAtA[i] = 0x1a i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.TestingBatchRequest.Size())) - n15, err := m.TestingBatchRequest.MarshalTo(dAtA[i:]) + n16, err := m.TestingBatchRequest.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n15 + i += n16 } if m.MaxLeaseIndex != 0 { dAtA[i] = 0x20 @@ -773,28 +964,28 @@ func (m *RaftCommand) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x2a i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.ProposerLease.Size())) - n16, err := m.ProposerLease.MarshalTo(dAtA[i:]) + n17, err := m.ProposerLease.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n16 + i += n17 dAtA[i] = 0x6a i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.ReplicatedEvalResult.Size())) - n17, err := m.ReplicatedEvalResult.MarshalTo(dAtA[i:]) + n18, err := m.ReplicatedEvalResult.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n17 + i += n18 if m.WriteBatch != nil { dAtA[i] = 0x72 i++ i = encodeVarintProposerKv(dAtA, i, uint64(m.WriteBatch.Size())) - n18, err := m.WriteBatch.MarshalTo(dAtA[i:]) + n19, err := m.WriteBatch.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n18 + i += n19 } return i, nil } @@ -834,6 +1025,34 @@ func (m *ChangeReplicas) Size() (n int) { return n } +func (m *Compaction) Size() (n int) { + var l int + _ = l + if m.Bytes != 0 { + n += 1 + sovProposerKv(uint64(m.Bytes)) + } + if m.SuggestedAtNanos != 0 { + n += 1 + sovProposerKv(uint64(m.SuggestedAtNanos)) + } + return n +} + +func (m *SuggestedCompaction) Size() (n int) { + var l int + _ = l + l = len(m.StartKey) + if l > 0 { + n += 1 + l + sovProposerKv(uint64(l)) + } + l = len(m.EndKey) + if l > 0 { + n += 1 + l + sovProposerKv(uint64(l)) + } + l = m.Compaction.Size() + n += 1 + l + sovProposerKv(uint64(l)) + return n +} + func (m *ReplicatedEvalResult) Size() (n int) { var l int _ = l @@ -889,6 +1108,12 @@ func (m *ReplicatedEvalResult) Size() (n int) { } l = m.Delta.Size() n += 2 + l + sovProposerKv(uint64(l)) + if len(m.SuggestedCompactions) > 0 { + for _, e := range m.SuggestedCompactions { + l = e.Size() + n += 2 + l + sovProposerKv(uint64(l)) + } + } return n } @@ -1221,6 +1446,236 @@ func (m *ChangeReplicas) Unmarshal(dAtA []byte) error { } return nil } +func (m *Compaction) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Compaction: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Compaction: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Bytes", wireType) + } + m.Bytes = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Bytes |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SuggestedAtNanos", wireType) + } + m.SuggestedAtNanos = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.SuggestedAtNanos |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipProposerKv(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthProposerKv + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SuggestedCompaction) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SuggestedCompaction: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SuggestedCompaction: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StartKey", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthProposerKv + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.StartKey = append(m.StartKey[:0], dAtA[iNdEx:postIndex]...) + if m.StartKey == nil { + m.StartKey = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field EndKey", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthProposerKv + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.EndKey = append(m.EndKey[:0], dAtA[iNdEx:postIndex]...) + if m.EndKey == nil { + m.EndKey = []byte{} + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Compaction", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProposerKv + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Compaction.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipProposerKv(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthProposerKv + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *ReplicatedEvalResult) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -1682,6 +2137,37 @@ func (m *ReplicatedEvalResult) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 19: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SuggestedCompactions", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProposerKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthProposerKv + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SuggestedCompactions = append(m.SuggestedCompactions, SuggestedCompaction{}) + if err := m.SuggestedCompactions[len(m.SuggestedCompactions)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipProposerKv(dAtA[iNdEx:]) @@ -2217,70 +2703,78 @@ var ( func init() { proto.RegisterFile("storage/storagebase/proposer_kv.proto", fileDescriptorProposerKv) } var fileDescriptorProposerKv = []byte{ - // 1036 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0x4d, 0x6f, 0x1b, 0x45, - 0x18, 0xce, 0x36, 0x76, 0xb2, 0x1e, 0x27, 0xb6, 0x33, 0x84, 0x68, 0x15, 0xa9, 0xde, 0x28, 0xa4, - 0x28, 0x08, 0x58, 0x8b, 0x04, 0x2e, 0x3d, 0x20, 0xc5, 0x6e, 0x05, 0xa4, 0x69, 0x90, 0xc6, 0xa1, - 0x95, 0xe0, 0xb0, 0x1a, 0xcf, 0x4e, 0xd7, 0x2b, 0xef, 0x17, 0x33, 0xe3, 0x7c, 0xfc, 0x06, 0x2e, - 0x70, 0xe3, 0x04, 0xfd, 0x07, 0xfc, 0x8d, 0x1c, 0x7b, 0xe4, 0x64, 0x81, 0xb9, 0x70, 0xe7, 0xc6, - 0x09, 0xcd, 0xc7, 0xfa, 0x03, 0xdc, 0x24, 0xa8, 0x97, 0xec, 0xe4, 0xdd, 0xf7, 0x79, 0x9e, 0x79, - 0x3f, 0xd7, 0xe0, 0x01, 0x17, 0x19, 0xc3, 0x21, 0x6d, 0x99, 0x67, 0x0f, 0x73, 0xda, 0xca, 0x59, - 0x96, 0x67, 0x9c, 0x32, 0x7f, 0x70, 0xee, 0xe5, 0x2c, 0x13, 0x19, 0xbc, 0x4f, 0x32, 0x32, 0x60, - 0x19, 0x26, 0x7d, 0xcf, 0x38, 0x7a, 0x33, 0x80, 0xed, 0x0d, 0xf5, 0x2a, 0xef, 0xb5, 0x70, 0x1e, - 0x69, 0xc4, 0x36, 0x2c, 0x4c, 0x01, 0x16, 0xd8, 0xd8, 0xb6, 0x0a, 0x5b, 0x42, 0x05, 0x9e, 0xb1, - 0xef, 0x16, 0x97, 0xa0, 0x69, 0x18, 0xa5, 0xc5, 0x43, 0xfa, 0x9d, 0x13, 0x62, 0x7c, 0xde, 0xb9, - 0xc9, 0xe7, 0xd0, 0x38, 0xb9, 0x8b, 0xa2, 0xe1, 0x02, 0x0b, 0x6a, 0x1c, 0x9c, 0xa1, 0x88, 0xe2, - 0x56, 0x3f, 0x26, 0x2d, 0x11, 0x25, 0x94, 0x0b, 0x9c, 0xe4, 0xe6, 0xcd, 0x66, 0x98, 0x85, 0x99, - 0x3a, 0xb6, 0xe4, 0x49, 0x5b, 0x77, 0x7f, 0xb1, 0x40, 0xb9, 0x9b, 0xc7, 0x91, 0x80, 0x1d, 0xb0, - 0x2a, 0x58, 0x14, 0x86, 0x94, 0x39, 0xd6, 0x8e, 0xb5, 0x5f, 0x3d, 0x70, 0xbd, 0x69, 0x4e, 0x4c, - 0x5c, 0x9e, 0x72, 0x3d, 0xd3, 0x6e, 0x6d, 0xfb, 0x7a, 0xe4, 0x2e, 0xbd, 0x1a, 0xb9, 0x16, 0x2a, - 0x90, 0xf0, 0x1b, 0x50, 0x61, 0x7d, 0xee, 0x07, 0x34, 0x16, 0xd8, 0xb9, 0xa7, 0x68, 0x3e, 0xf0, - 0xfe, 0x9b, 0x5a, 0x1d, 0x9b, 0x57, 0x84, 0xe8, 0x3d, 0x7d, 0xd6, 0xe9, 0x74, 0x05, 0x16, 0xbc, - 0xdd, 0x90, 0x9c, 0xe3, 0x91, 0x6b, 0xa3, 0xcf, 0xbb, 0x8f, 0x24, 0x0b, 0xb2, 0x59, 0x9f, 0xab, - 0xd3, 0xc3, 0xd2, 0x9f, 0x2f, 0x5d, 0x6b, 0x17, 0x81, 0xf2, 0x53, 0xca, 0x42, 0x7a, 0xb7, 0x0b, - 0x2b, 0xd7, 0xd7, 0x5f, 0xd8, 0x70, 0xf6, 0x41, 0xad, 0xd3, 0xc7, 0x69, 0x48, 0x11, 0xcd, 0xe3, - 0x88, 0x60, 0x0e, 0x4f, 0xfe, 0x4d, 0xbe, 0xbf, 0x80, 0x7c, 0x1e, 0x73, 0x83, 0x8a, 0xfd, 0xe3, - 0x4b, 0x77, 0x49, 0x29, 0x7d, 0x57, 0x01, 0x9b, 0x06, 0x20, 0x68, 0xf0, 0xf8, 0x1c, 0xc7, 0x88, - 0xf2, 0x61, 0x2c, 0xa0, 0x0b, 0xaa, 0xbd, 0x38, 0x23, 0x03, 0x9f, 0x51, 0x1c, 0x70, 0x25, 0x6a, - 0x23, 0xa0, 0x4c, 0x48, 0x5a, 0xe0, 0x11, 0x28, 0xab, 0x42, 0x9b, 0xb4, 0xbe, 0xef, 0xdd, 0xd8, - 0xb1, 0x9e, 0x11, 0x91, 0x59, 0xa5, 0x48, 0x23, 0xe1, 0x43, 0x50, 0xe6, 0xb2, 0x80, 0xce, 0xb2, - 0xa2, 0xd8, 0xbb, 0x85, 0x42, 0x15, 0x1b, 0x69, 0x88, 0xc4, 0x26, 0x32, 0x97, 0x4e, 0xe9, 0x4e, - 0x58, 0x95, 0x77, 0xa4, 0x21, 0xf0, 0x0c, 0x34, 0x48, 0x96, 0xe4, 0x43, 0x41, 0x7d, 0xd2, 0xa7, - 0x64, 0xc0, 0x87, 0x89, 0x53, 0x56, 0x34, 0xef, 0x2d, 0xca, 0xaa, 0x76, 0xed, 0x18, 0x4f, 0x44, - 0xbf, 0x1d, 0x52, 0x2e, 0x50, 0x9d, 0xcc, 0xdb, 0xe1, 0x3e, 0x68, 0x44, 0xdc, 0x8f, 0x29, 0xe6, - 0xd4, 0x67, 0xda, 0xc9, 0x59, 0x51, 0x69, 0xab, 0x45, 0xfc, 0x44, 0x9a, 0x0d, 0x14, 0x1e, 0x81, - 0xca, 0x64, 0x1a, 0x1c, 0x5b, 0x09, 0xdf, 0x9f, 0x11, 0x96, 0x23, 0xe3, 0xf5, 0x63, 0xe2, 0x9d, - 0x15, 0x4e, 0xed, 0x92, 0xac, 0x21, 0x9a, 0xa2, 0xe0, 0xc7, 0x60, 0x2b, 0xe2, 0x3e, 0xc9, 0x52, - 0x1e, 0x71, 0x41, 0x53, 0x72, 0xe5, 0x33, 0x1a, 0xcb, 0x12, 0x3a, 0x15, 0x25, 0xb9, 0x19, 0xf1, - 0xce, 0xf4, 0x25, 0xd2, 0xef, 0xe0, 0x73, 0xd0, 0x08, 0x68, 0xce, 0xa8, 0x2a, 0xb6, 0x99, 0x0a, - 0xf0, 0xff, 0xa7, 0x02, 0xd5, 0xa7, 0x2c, 0x6a, 0x14, 0xe0, 0x33, 0x50, 0x27, 0xaa, 0xf9, 0x7c, - 0x66, 0xba, 0xcf, 0x59, 0x53, 0xbc, 0x1f, 0xde, 0x52, 0x97, 0xf9, 0x96, 0x45, 0x35, 0x32, 0xdf, - 0xf6, 0x7b, 0xa0, 0xc6, 0xf0, 0x0b, 0xe1, 0xc7, 0x59, 0x68, 0xae, 0xbb, 0xbe, 0x63, 0xed, 0x2f, - 0xa3, 0x35, 0x69, 0x3d, 0xc9, 0x42, 0xad, 0x8e, 0x40, 0x85, 0x0b, 0xcc, 0x84, 0x3f, 0xa0, 0x57, - 0x4e, 0x6d, 0xc7, 0xda, 0x5f, 0x6b, 0x7f, 0xf2, 0xf7, 0xc8, 0xfd, 0x28, 0x8c, 0x44, 0x7f, 0xd8, - 0xf3, 0x48, 0x96, 0xb4, 0x26, 0xb7, 0x08, 0x7a, 0xd3, 0x73, 0x2b, 0x1f, 0x84, 0xad, 0xa2, 0xcc, - 0xe8, 0x09, 0xbd, 0x42, 0xb6, 0xe2, 0x79, 0x42, 0xaf, 0xe0, 0x29, 0x58, 0xa5, 0x69, 0xa0, 0x18, - 0xeb, 0x6f, 0xc2, 0xb8, 0x42, 0xd3, 0x40, 0xf2, 0x65, 0xa0, 0x8a, 0x83, 0xc0, 0xe7, 0x5c, 0xe0, - 0x5e, 0x4c, 0x9d, 0x0d, 0x95, 0x9d, 0x4f, 0xef, 0x36, 0x34, 0x73, 0x93, 0xe9, 0x1d, 0x05, 0x41, - 0xb7, 0x7b, 0x26, 0x59, 0xda, 0xb5, 0xf1, 0xc8, 0x05, 0xd3, 0xff, 0x11, 0xc0, 0x41, 0xd0, 0xd5, - 0x0a, 0xf0, 0x4b, 0x50, 0xd6, 0x19, 0x83, 0x4a, 0xea, 0xf0, 0x8e, 0x05, 0x3e, 0xa5, 0xe2, 0x22, - 0x63, 0x03, 0xbd, 0xfd, 0x74, 0xdb, 0x69, 0x9e, 0xed, 0xcf, 0xc0, 0x8c, 0x14, 0x84, 0xa0, 0x24, - 0x3f, 0x28, 0x6a, 0x31, 0xac, 0x21, 0x75, 0x86, 0x2e, 0x28, 0x13, 0x46, 0x0e, 0x0f, 0xd4, 0x4a, - 0x58, 0x6f, 0x57, 0xc6, 0x23, 0xb7, 0xdc, 0x41, 0x9d, 0xc3, 0x03, 0xa4, 0xed, 0x7a, 0xbb, 0xe9, - 0xbf, 0xc7, 0x25, 0x7b, 0xb5, 0x61, 0x1f, 0x97, 0xec, 0x46, 0x63, 0xe3, 0x78, 0xc5, 0xfe, 0xe1, - 0xb4, 0xf1, 0xd3, 0xe9, 0xee, 0x0e, 0x00, 0xcf, 0x59, 0x24, 0x68, 0x1b, 0x0b, 0xd2, 0x5f, 0x24, - 0xb1, 0xfb, 0xd7, 0x32, 0xa8, 0x22, 0xfc, 0x42, 0x74, 0xb2, 0x24, 0xc1, 0x69, 0x00, 0xbf, 0x02, - 0x8d, 0xc9, 0xc7, 0xd3, 0xb4, 0x9e, 0x59, 0x48, 0x7b, 0x0b, 0x46, 0xd9, 0xe4, 0xf3, 0x11, 0xe5, - 0x84, 0x45, 0xb9, 0xc8, 0x98, 0x89, 0xb0, 0x5e, 0x70, 0x18, 0x07, 0xd8, 0x05, 0x6f, 0x0b, 0xca, - 0x45, 0x94, 0x86, 0x7e, 0x4f, 0xde, 0x65, 0x32, 0xd0, 0xcb, 0xaf, 0xdd, 0xec, 0xea, 0xce, 0xc5, - 0x72, 0x78, 0xcb, 0xa0, 0x67, 0x8d, 0xf0, 0x5d, 0x50, 0x4f, 0xf0, 0xa5, 0xd9, 0x10, 0x51, 0x1a, - 0xd0, 0x4b, 0xb5, 0xbc, 0x4a, 0x68, 0x3d, 0xc1, 0x97, 0x6a, 0x41, 0x7c, 0x21, 0x8d, 0xf0, 0x31, - 0xa8, 0x4d, 0x62, 0x52, 0xce, 0x66, 0x39, 0x39, 0x0b, 0x54, 0x15, 0xcc, 0x44, 0xb1, 0x5e, 0xa0, - 0x94, 0x11, 0x66, 0x60, 0x8b, 0x4d, 0xfa, 0xc7, 0xa7, 0xe7, 0x38, 0xf6, 0x99, 0xea, 0x20, 0x35, - 0x43, 0x8b, 0x3b, 0xe2, 0xb6, 0xe6, 0x33, 0x4a, 0x9b, 0x6c, 0xd1, 0x27, 0xe3, 0x18, 0x54, 0x2f, - 0x64, 0xf5, 0x74, 0xca, 0xd4, 0x20, 0xce, 0x6f, 0xd4, 0x45, 0x2a, 0xd3, 0x7a, 0x23, 0x70, 0x31, - 0x39, 0x1f, 0x97, 0x6c, 0xab, 0x71, 0x4f, 0xf7, 0xc5, 0xcf, 0xa7, 0xed, 0x07, 0xd7, 0xbf, 0x37, - 0x97, 0xae, 0xc7, 0x4d, 0xeb, 0xd5, 0xb8, 0x69, 0xfd, 0x3a, 0x6e, 0x5a, 0xbf, 0x8d, 0x9b, 0xd6, - 0xf7, 0x7f, 0x34, 0x97, 0xbe, 0xae, 0xce, 0x70, 0xf5, 0x56, 0xd4, 0x6f, 0x88, 0xc3, 0x7f, 0x02, - 0x00, 0x00, 0xff, 0xff, 0x67, 0xef, 0xc6, 0xe0, 0x64, 0x09, 0x00, 0x00, + // 1165 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x56, 0x4d, 0x6f, 0x1b, 0x37, + 0x13, 0xb6, 0x2c, 0xc9, 0x59, 0x51, 0xb1, 0xa4, 0x30, 0x8e, 0xb1, 0x30, 0x10, 0xad, 0xe1, 0xd7, + 0x79, 0xe1, 0xa2, 0xe9, 0xaa, 0xb5, 0xd3, 0x4b, 0x0e, 0x05, 0x2c, 0x25, 0x68, 0xeb, 0xd8, 0x2a, + 0x4a, 0xb9, 0x0e, 0xd0, 0x1e, 0x16, 0x14, 0x97, 0x59, 0x2d, 0xb4, 0x5f, 0x25, 0x29, 0x7f, 0xfc, + 0x8b, 0xb6, 0xa7, 0x9e, 0xda, 0xfc, 0x83, 0xfe, 0x0d, 0x1f, 0x73, 0xec, 0x49, 0x68, 0xd5, 0x4b, + 0xef, 0xbd, 0xf9, 0x54, 0x2c, 0xc9, 0xd5, 0x47, 0xad, 0x58, 0x0e, 0x72, 0x91, 0xb8, 0xc3, 0x99, + 0x67, 0x38, 0xc3, 0x79, 0x9e, 0x5d, 0xf0, 0x88, 0x8b, 0x98, 0x61, 0x8f, 0x36, 0xf4, 0x7f, 0x17, + 0x73, 0xda, 0x48, 0x58, 0x9c, 0xc4, 0x9c, 0x32, 0xa7, 0x7f, 0x6a, 0x27, 0x2c, 0x16, 0x31, 0x7c, + 0x48, 0x62, 0xd2, 0x67, 0x31, 0x26, 0x3d, 0x5b, 0x3b, 0xda, 0x53, 0x01, 0x1b, 0xf7, 0xe4, 0x56, + 0xd2, 0x6d, 0xe0, 0xc4, 0x57, 0x11, 0x1b, 0x30, 0x33, 0xb9, 0x58, 0x60, 0x6d, 0x5b, 0xcf, 0x6c, + 0x21, 0x15, 0x78, 0xca, 0xbe, 0x95, 0x1d, 0x82, 0x46, 0x9e, 0x1f, 0x65, 0x7f, 0xa9, 0xdf, 0x29, + 0x21, 0xda, 0xe7, 0x7f, 0x37, 0xf9, 0xec, 0x69, 0x27, 0x6b, 0x5e, 0x35, 0x5c, 0x60, 0x41, 0xb5, + 0x83, 0x39, 0x10, 0x7e, 0xd0, 0xe8, 0x05, 0xa4, 0x21, 0xfc, 0x90, 0x72, 0x81, 0xc3, 0x44, 0xef, + 0xac, 0x79, 0xb1, 0x17, 0xcb, 0x65, 0x23, 0x5d, 0x29, 0xeb, 0xd6, 0x6f, 0x39, 0x50, 0xec, 0x24, + 0x81, 0x2f, 0x60, 0x0b, 0xdc, 0x11, 0xcc, 0xf7, 0x3c, 0xca, 0xcc, 0xdc, 0x66, 0x6e, 0xa7, 0xbc, + 0x6b, 0xd9, 0x93, 0x9e, 0xe8, 0xba, 0x6c, 0xe9, 0x7a, 0xac, 0xdc, 0x9a, 0xc6, 0xe5, 0xd0, 0x5a, + 0x7a, 0x33, 0xb4, 0x72, 0x28, 0x8b, 0x84, 0xdf, 0x81, 0x12, 0xeb, 0x71, 0xc7, 0xa5, 0x81, 0xc0, + 0xe6, 0xb2, 0x84, 0x79, 0x6c, 0x5f, 0x6f, 0xad, 0xaa, 0xcd, 0xce, 0x4a, 0xb4, 0x8f, 0x4e, 0x5a, + 0xad, 0x8e, 0xc0, 0x82, 0x37, 0x6b, 0x29, 0xe6, 0x68, 0x68, 0x19, 0xe8, 0x8b, 0xce, 0xb3, 0x14, + 0x05, 0x19, 0xac, 0xc7, 0xe5, 0xea, 0x69, 0xe1, 0xef, 0xd7, 0x56, 0x6e, 0x0b, 0x81, 0xe2, 0x11, + 0x65, 0x1e, 0xbd, 0xdd, 0x81, 0xa5, 0xeb, 0xdb, 0x0f, 0xac, 0x31, 0x7b, 0xa0, 0xd2, 0xea, 0xe1, + 0xc8, 0xa3, 0x88, 0x26, 0x81, 0x4f, 0x30, 0x87, 0x87, 0xff, 0x05, 0xdf, 0x99, 0x03, 0x3e, 0x1b, + 0x73, 0x43, 0x16, 0xe3, 0xe7, 0xd7, 0xd6, 0x92, 0xcc, 0x74, 0x02, 0x40, 0x2b, 0x0e, 0x13, 0x4c, + 0x84, 0x1f, 0x47, 0x70, 0x0d, 0x14, 0xbb, 0x17, 0x82, 0x72, 0x99, 0x23, 0x8f, 0xd4, 0x03, 0x7c, + 0x0c, 0x20, 0x1f, 0x78, 0x1e, 0xe5, 0x82, 0xba, 0x0e, 0x16, 0x4e, 0x84, 0xa3, 0x98, 0xcb, 0x6e, + 0xe6, 0x51, 0x6d, 0xbc, 0xb3, 0x2f, 0xda, 0xa9, 0x5d, 0x57, 0xf0, 0xd3, 0x32, 0xb8, 0xdf, 0xc9, + 0xb6, 0xa6, 0x32, 0x7c, 0x0d, 0x4a, 0x5c, 0x60, 0x26, 0x9c, 0x3e, 0xbd, 0x90, 0x59, 0xee, 0x36, + 0x9f, 0x5c, 0x0d, 0xad, 0x8f, 0x3d, 0x5f, 0xf4, 0x06, 0x5d, 0x9b, 0xc4, 0x61, 0x63, 0x5c, 0x97, + 0xdb, 0x9d, 0xac, 0x1b, 0x49, 0xdf, 0x6b, 0x64, 0x75, 0xbe, 0xa0, 0x17, 0xc8, 0x90, 0x30, 0x2f, + 0xe8, 0x05, 0x3c, 0x02, 0x77, 0x68, 0xe4, 0x4a, 0xc0, 0xe5, 0xf7, 0x00, 0x5c, 0xa1, 0x91, 0x9b, + 0xc2, 0x75, 0x00, 0x20, 0xe3, 0xf3, 0x9a, 0x79, 0xd9, 0xec, 0x0f, 0xec, 0x1b, 0xe9, 0x68, 0x4f, + 0x0a, 0x9c, 0xea, 0xf6, 0x14, 0x8c, 0x6e, 0xca, 0x55, 0x09, 0xac, 0xe9, 0xdb, 0x11, 0xd4, 0x7d, + 0x7e, 0x8a, 0x03, 0x44, 0xf9, 0x20, 0x10, 0xd0, 0x02, 0xe5, 0x6e, 0x10, 0x93, 0xbe, 0xc3, 0x28, + 0x76, 0x55, 0xf7, 0x0d, 0x04, 0xa4, 0x09, 0xa5, 0x16, 0xb8, 0x0f, 0x8a, 0x92, 0x55, 0x7a, 0x86, + 0x3f, 0x5c, 0x70, 0x1e, 0x9d, 0x24, 0x1d, 0x61, 0x8a, 0x54, 0x24, 0x7c, 0x0a, 0x8a, 0x3c, 0x65, + 0x8b, 0x2e, 0x69, 0x7b, 0x01, 0x84, 0x64, 0x16, 0x52, 0x21, 0x69, 0x6c, 0x98, 0x0e, 0xae, 0x59, + 0xb8, 0x55, 0xac, 0x1c, 0x72, 0xa4, 0x42, 0xe0, 0x31, 0xa8, 0xa5, 0x8d, 0x18, 0x08, 0xea, 0x90, + 0x1e, 0x25, 0x7d, 0x3e, 0x08, 0xcd, 0xe2, 0xb5, 0xae, 0x8e, 0x47, 0x58, 0xb9, 0xb6, 0xb4, 0x27, + 0xa2, 0xdf, 0x0f, 0x28, 0x17, 0xa8, 0x4a, 0x66, 0xed, 0x70, 0x07, 0xd4, 0x7c, 0xee, 0x04, 0x14, + 0x73, 0xea, 0x30, 0xe5, 0x64, 0xae, 0xc8, 0xb6, 0x55, 0x7c, 0x7e, 0x98, 0x9a, 0x75, 0x28, 0xdc, + 0x07, 0xa5, 0xb1, 0xf4, 0x98, 0x86, 0x4c, 0xfc, 0x70, 0x2a, 0x71, 0xaa, 0x4f, 0x76, 0x2f, 0x20, + 0xf6, 0x71, 0xe6, 0xd4, 0x2c, 0xa4, 0x57, 0x88, 0x26, 0x51, 0xf0, 0x09, 0x58, 0xf7, 0xb9, 0x43, + 0xe2, 0x88, 0xfb, 0x5c, 0xd0, 0x88, 0x5c, 0x38, 0x8c, 0x06, 0xe9, 0x15, 0x9a, 0x25, 0x99, 0x72, + 0xcd, 0xe7, 0xad, 0xc9, 0x26, 0x52, 0x7b, 0xf0, 0x25, 0xa8, 0xb9, 0x34, 0x61, 0x54, 0x5e, 0xb6, + 0x96, 0x20, 0xf0, 0xee, 0x12, 0x84, 0xaa, 0x13, 0x14, 0xa9, 0x3b, 0xf0, 0x04, 0x54, 0x89, 0x64, + 0xba, 0xc3, 0x34, 0xd5, 0xcd, 0xbb, 0x12, 0xf7, 0xa3, 0x45, 0x63, 0x3a, 0xa3, 0x0f, 0xa8, 0x42, + 0x66, 0x35, 0x66, 0x1b, 0x54, 0x18, 0x7e, 0x25, 0x9c, 0x20, 0xf6, 0xf4, 0x71, 0x57, 0x25, 0xc7, + 0xef, 0xa6, 0xd6, 0xc3, 0xd8, 0x53, 0xd9, 0xd1, 0x34, 0x83, 0x2b, 0x92, 0x70, 0x9f, 0x5e, 0x0d, + 0xad, 0x4f, 0xde, 0x89, 0x70, 0x68, 0x96, 0xc2, 0xed, 0x09, 0x85, 0xab, 0xef, 0x83, 0x98, 0x71, + 0x38, 0x06, 0x65, 0xec, 0xba, 0x0e, 0xe7, 0x02, 0x77, 0x03, 0x6a, 0xde, 0x93, 0xdd, 0xf9, 0xec, + 0x76, 0xa4, 0x99, 0x61, 0xa6, 0xbd, 0xef, 0xba, 0x9d, 0xce, 0x71, 0x8a, 0xd2, 0xac, 0x8c, 0x86, + 0x16, 0x98, 0x3c, 0x23, 0x80, 0x5d, 0xb7, 0xa3, 0x32, 0xc0, 0xaf, 0x40, 0x51, 0x75, 0x0c, 0xca, + 0x54, 0x7b, 0xb7, 0xbc, 0xe0, 0x36, 0x15, 0x67, 0x31, 0xeb, 0xab, 0x57, 0x8d, 0x1a, 0x3b, 0x85, + 0x03, 0x43, 0xf0, 0x60, 0xa2, 0xb9, 0x13, 0x21, 0xe1, 0xe6, 0xfd, 0xcd, 0xfc, 0x4e, 0x79, 0x77, + 0x77, 0x11, 0x7b, 0xaf, 0x4b, 0xaf, 0xc6, 0x5f, 0xe3, 0xd7, 0xb7, 0xf8, 0xc6, 0xe7, 0x60, 0xaa, + 0x32, 0x08, 0x41, 0x21, 0xfd, 0x58, 0x50, 0xfa, 0x8c, 0xe4, 0x1a, 0x5a, 0xa0, 0x48, 0x18, 0xd9, + 0xdb, 0x95, 0x0a, 0xb4, 0xda, 0x2c, 0x8d, 0x86, 0x56, 0xb1, 0x85, 0x5a, 0x7b, 0xbb, 0x48, 0xd9, + 0x95, 0xc4, 0xa9, 0xdf, 0x83, 0x82, 0x71, 0xa7, 0x66, 0x1c, 0x14, 0x8c, 0x5a, 0xed, 0xde, 0xc1, + 0x8a, 0xf1, 0x63, 0xbb, 0xf6, 0x4b, 0x7b, 0x6b, 0x13, 0x80, 0x97, 0xcc, 0x17, 0xb4, 0x89, 0x05, + 0xe9, 0xcd, 0x4b, 0xb1, 0xf5, 0x4f, 0x1e, 0x94, 0x11, 0x7e, 0x25, 0x5a, 0x71, 0x18, 0xe2, 0xc8, + 0x85, 0xdf, 0x80, 0xda, 0xf8, 0xc3, 0x48, 0x4f, 0xba, 0xd6, 0xbf, 0xed, 0x39, 0xca, 0xa1, 0xaf, + 0xef, 0x19, 0xe5, 0x84, 0xf9, 0x89, 0x88, 0x99, 0x2e, 0xb8, 0x9a, 0x61, 0x68, 0x07, 0xd8, 0x01, + 0x0f, 0x04, 0xe5, 0xc2, 0x8f, 0x3c, 0xa7, 0x9b, 0x9e, 0x65, 0xac, 0x1f, 0xf9, 0xb7, 0xbe, 0xb5, + 0xe5, 0x99, 0x33, 0x2d, 0xba, 0xaf, 0xa3, 0xa7, 0x8d, 0xf0, 0xff, 0xa0, 0x1a, 0xe2, 0x73, 0x2d, + 0x48, 0x7e, 0xe4, 0xd2, 0x73, 0xa9, 0x95, 0x05, 0xb4, 0x1a, 0xe2, 0x73, 0xa9, 0x47, 0x5f, 0xa6, + 0x46, 0xf8, 0x1c, 0x54, 0xc6, 0x35, 0x49, 0x67, 0xad, 0x85, 0xe6, 0x9c, 0xac, 0x32, 0x4c, 0x57, + 0xb1, 0x9a, 0x45, 0x49, 0x23, 0x8c, 0xc1, 0x3a, 0x1b, 0x8f, 0xab, 0x43, 0x4f, 0x71, 0xe0, 0x30, + 0x39, 0xb0, 0x92, 0xb2, 0xf3, 0x07, 0x70, 0xd1, 0xac, 0x67, 0x03, 0xc2, 0xe6, 0xbd, 0xa1, 0x0e, + 0x40, 0xf9, 0x2c, 0xbd, 0x3d, 0xd5, 0x32, 0xc9, 0xfb, 0xc5, 0xaf, 0xc5, 0xc9, 0x7d, 0x23, 0x70, + 0x36, 0x5e, 0x1f, 0x14, 0x8c, 0x5c, 0x6d, 0x59, 0xcd, 0xc5, 0xaf, 0xed, 0xe6, 0xa3, 0xcb, 0x3f, + 0xeb, 0x4b, 0x97, 0xa3, 0x7a, 0xee, 0xcd, 0xa8, 0x9e, 0xfb, 0x7d, 0x54, 0xcf, 0xfd, 0x31, 0xaa, + 0xe7, 0x7e, 0xf8, 0xab, 0xbe, 0xf4, 0x6d, 0x79, 0x0a, 0xab, 0xbb, 0x22, 0xbf, 0x0f, 0xf7, 0xfe, + 0x0d, 0x00, 0x00, 0xff, 0xff, 0xb9, 0xb8, 0x47, 0x17, 0x40, 0x0b, 0x00, 0x00, } diff --git a/pkg/storage/storagebase/proposer_kv.proto b/pkg/storage/storagebase/proposer_kv.proto index 015cd1dbb05b..ca5675e30ded 100644 --- a/pkg/storage/storagebase/proposer_kv.proto +++ b/pkg/storage/storagebase/proposer_kv.proto @@ -62,6 +62,27 @@ message ChangeReplicas { (gogoproto.embed) = true]; } +// Compaction holds core details about a suggested compaction. +message Compaction { + option (gogoproto.equal) = true; + + // bytes indicates the expected space reclamation from compaction. + int64 bytes = 1; + // suggested_at is nanoseconds since the epoch. + int64 suggested_at_nanos = 2; +} + +// SuggestedCompaction holds start and end keys in conjunction with +// the compaction details. +message SuggestedCompaction { + option (gogoproto.equal) = true; + + bytes start_key = 1 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"]; + bytes end_key = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"]; + + Compaction compaction = 3 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; +} + // ReplicatedEvalResult is the structured information which together with // a RocksDB WriteBatch constitutes the proposal payload in proposer-evaluated // KV. For the majority of proposals, we expect ReplicatedEvalResult to be @@ -116,6 +137,11 @@ message ReplicatedEvalResult { reserved 16; AddSSTable add_sstable = 17 [(gogoproto.customname) = "AddSSTable"]; + // suggested_compactions are sent to the engine's compactor to + // reclaim storage space after garbage collection or cleared / + // rebalanced ranges. + repeated SuggestedCompaction suggested_compactions = 19 [(gogoproto.nullable) = false]; + reserved 10001 to 10013; } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index d56b316ee20b..863db7d6c460 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" + "github.com/cockroachdb/cockroach/pkg/storage/compactor" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/spanset" @@ -352,6 +353,7 @@ type Store struct { cfg StoreConfig db *client.DB engine engine.Engine // The underlying key-value store + compactor *compactor.Compactor // Schedules compaction of the engine tsCache tscache.Cache // Most recent timestamps for keys / key ranges allocator Allocator // Makes allocation decisions rangeIDAlloc *idAllocator // Range ID allocator @@ -862,6 +864,9 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript s.tsCache = tscache.New(cfg.Clock, cfg.TimestampCachePageSize, tsCacheMetrics) s.metrics.registry.AddMetricStruct(tsCacheMetrics) + s.compactor = compactor.NewCompactor(s.engine.(engine.WithSSTables), s.Capacity) + s.metrics.registry.AddMetricStruct(s.compactor.Metrics) + s.snapshotApplySem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit) if s.cfg.Gossip != nil { @@ -1205,6 +1210,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { log.Event(ctx, "computed initial metrics") } + // Start the storage engine compactor. + if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) { + s.compactor.Start(s.AnnotateCtx(context.Background()), s.Tracer(), s.stopper) + } + // Set the started flag (for unittests). atomic.StoreInt32(&s.started, 1)