Skip to content

Commit

Permalink
storage: implement a queue for suggested compactions
Browse files Browse the repository at this point in the history
Clear range commands now come with an attendant suggested range compaction
hint. Any suggested compactions generated during command execution are now
sent via replicated result data to each replica and stored in a store-local
queue of pending compaction suggestions.

A new compactor goroutine runs periodically to process pending suggestions.
If more than an absolute number of bytes is reclaimable, or if the bytes
to reclaim exceed a threshold fraction of the total used bytes, we'll go
ahead and compact the suggested range.

Suggested compactions are allowed to remain in the queue for at most
24 hours, after which if they haven't been aggregated into a compact-able
key span, they'll be discarded, and left to RocksDB's background compaction
processing.

Release note (UX improvement): When tables are dropped, the
space will be reclaimed in a more timely fashion.
  • Loading branch information
spencerkimball committed Dec 20, 2017
1 parent 90078d2 commit b8439f3
Show file tree
Hide file tree
Showing 19 changed files with 1,588 additions and 194 deletions.
163 changes: 92 additions & 71 deletions c-deps/libroach/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1649,101 +1649,122 @@ DBStatus DBSyncWAL(DBEngine* db) {
}

DBStatus DBCompact(DBEngine* db) {
return DBCompactRange(db, DBKey(), DBKey());
}

DBStatus DBCompactRange(DBEngine* db, DBKey start, DBKey end) {
rocksdb::CompactRangeOptions options;
// By default, RocksDB doesn't recompact the bottom level (unless
// there is a compaction filter, which we don't use). However,
// recompacting the bottom layer is necessary to pick up changes to
// settings like bloom filter configurations (which is the biggest
// reason we currently have to use this function).
// settings like bloom filter configurations, and to fully reclaim
// space after dropping, truncating, or migrating tables.
options.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce;

// Compacting the entire database in a single-shot can use a
// significant amount of additional (temporary) disk space. Instead,
// we loop over the sstables in the lowest level and initiate
// compactions on smaller ranges of keys. The resulting compacted
// database is the same size, but the temporary disk space needed
// for the compaction is dramatically reduced
// for the compaction is dramatically reduced.
std::vector<rocksdb::LiveFileMetaData> all_metadata;
std::vector<rocksdb::LiveFileMetaData> metadata;
db->rep->GetLiveFilesMetaData(&metadata);
db->rep->GetLiveFilesMetaData(&all_metadata);

const std::string start_key(EncodeKey(start));
const std::string end_key(EncodeKey(end));

int max_level = 0;
for (int i = 0; i < all_metadata.size(); i++) {
// Skip any SSTables which fall outside the specified range, if a
// range was specified.
if ((!start_key.empty() > 0 && all_metadata[i].largestkey < start_key) ||
(!end_key.empty() && all_metadata[i].smallestkey >= end_key)) {
continue;
}
if (max_level < all_metadata[i].level) {
max_level = all_metadata[i].level;
}
// Gather the set of SSTables to compact.
metadata.push_back(all_metadata[i]);
}
all_metadata.clear();

if (max_level != db->rep->NumberLevels() - 1) {
// There are no sstables at the lowest level, so just compact the
// specified key span, wholesale. Due to the
// level_compaction_dynamic_level_bytes setting, this will only
// happen on spans containing very little data.
const rocksdb::Slice start_slice(start_key);
const rocksdb::Slice end_slice(end_key);
return ToDBStatus(db->rep->CompactRange(options, !start_key.empty() ? &start_slice : nullptr,
!end_key.empty() ? &end_slice : nullptr));
}

// A naive approach to selecting ranges to compact would be to
// compact the ranges specified by the smallest and largest key in
// each sstable of the bottom-most level. Unfortunately, the
// sstables in the bottom-most level have vastly different
// sizes. For example, starting with the following set of bottom-most
// sstables:
//
// 100M[16] 89M 70M 66M 56M 54M 38M[2] 36M 23M 20M 17M 8M 6M 5M 2M 2K[4]
//
// If we compact the entire database in one call we can end up with:
//
// 100M[22] 77M 76M 50M
//
// If we use the naive approach (compact the range specified by
// the smallest and largest keys):
//
// 100M[18] 92M 68M 62M 61M 50M 45M 39M 31M 29M[2] 24M 23M 18M 9M 8M[2] 7M
// 2K[4]
//
// With the approach below:
//
// 100M[19] 80M 68M[2] 62M 61M 53M 45M 36M 31M
//
// The approach below is to loop over the bottom-most sstables in
// sorted order and initiate a compact range every 128MB of data.

// Gather up the bottom-most sstable metadata.
std::vector<rocksdb::SstFileMetaData> sst;
for (int i = 0; i < metadata.size(); i++) {
if (max_level < metadata[i].level) {
max_level = metadata[i].level;
if (metadata[i].level != max_level) {
continue;
}
sst.push_back(metadata[i]);
}

if (max_level == db->rep->NumberLevels() - 1) {
// A naive approach to selecting ranges to compact would be to
// compact the ranges specified by the smallest and largest key in
// each sstable of the bottom-most level. Unfortunately, the
// sstables in the bottom-most level have vastly different
// sizes. For example, starting with the following set of bottom-most
// sstables:
//
// 100M[16] 89M 70M 66M 56M 54M 38M[2] 36M 23M 20M 17M 8M 6M 5M 2M 2K[4]
//
// If we compact the entire database in one call we can end up with:
//
// 100M[22] 77M 76M 50M
//
// If we use the naive approach (compact the range specified by
// the smallest and largest keys):
//
// 100M[18] 92M 68M 62M 61M 50M 45M 39M 31M 29M[2] 24M 23M 18M 9M 8M[2] 7M
// 2K[4]
//
// With the approach below:
//
// 100M[19] 80M 68M[2] 62M 61M 53M 45M 36M 31M
//
// The approach below is to loop over the bottom-most sstables in
// sorted order and initiate a compact range every 128MB of data.

// Gather up the bottom-most sstable metadata.
std::vector<rocksdb::SstFileMetaData> sst;
for (int i = 0; i < metadata.size(); i++) {
if (metadata[i].level != max_level) {
continue;
}
sst.push_back(metadata[i]);
}
// Sort the metadata by smallest key.
std::sort(sst.begin(), sst.end(), [](const rocksdb::SstFileMetaData& a, const rocksdb::SstFileMetaData& b) -> bool {
// Sort the metadata by smallest key.
std::sort(sst.begin(), sst.end(), [](const rocksdb::SstFileMetaData& a, const rocksdb::SstFileMetaData& b) -> bool {
return a.smallestkey < b.smallestkey;
});

// Walk over the bottom-most sstables in order and perform
// compactions every 128MB.
rocksdb::Slice last;
rocksdb::Slice* last_ptr = nullptr;
uint64_t size = 0;
const uint64_t target_size = 128 << 20;
for (int i = 0; i < sst.size(); ++i) {
size += sst[i].size;
if (size < target_size) {
continue;
}
rocksdb::Slice cur(sst[i].largestkey);
rocksdb::Status status = db->rep->CompactRange(options, last_ptr, &cur);
if (!status.ok()) {
return ToDBStatus(status);
}
last = cur;
last_ptr = &last;
size = 0;
// Walk over the bottom-most sstables in order and perform
// compactions every 128MB.
rocksdb::Slice last;
rocksdb::Slice* last_ptr = nullptr;
uint64_t size = 0;
const uint64_t target_size = 128 << 20;
for (int i = 0; i < sst.size(); ++i) {
size += sst[i].size;
if (size < target_size) {
continue;
}

if (size > 0) {
return ToDBStatus(db->rep->CompactRange(options, last_ptr, nullptr));
rocksdb::Slice cur(sst[i].largestkey);
rocksdb::Status status = db->rep->CompactRange(options, last_ptr, &cur);
if (!status.ok()) {
return ToDBStatus(status);
}
return kSuccess;
last = cur;
last_ptr = &last;
size = 0;
}

// There are no sstables at the lowest level, so just compact the
// entire database. Due to the level_compaction_dynamic_level_bytes
// setting, this will only happen on very small databases.
return ToDBStatus(db->rep->CompactRange(options, NULL, NULL));
if (size > 0) {
return ToDBStatus(db->rep->CompactRange(options, last_ptr, nullptr));
}
return kSuccess;
}

DBStatus DBApproximateDiskBytes(DBEngine* db, DBKey start, DBKey end, uint64_t* size) {
Expand Down
5 changes: 5 additions & 0 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ DBStatus DBSyncWAL(DBEngine* db);
// Forces an immediate compaction over all keys.
DBStatus DBCompact(DBEngine* db);

// Forces an immediate compaction over keys in the specified range.
// Note that if start is empty, it indicates the start of the database.
// If end is empty, it indicates the end of the database.
DBStatus DBCompactRange(DBEngine* db, DBKey start, DBKey end);

// Stores the approximate on-disk size of the given key range into the
// supplied uint64.
DBStatus DBApproximateDiskBytes(DBEngine* db, DBKey start, DBKey end, uint64_t *size);
Expand Down
10 changes: 10 additions & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ var (
// is to allow a restarting node to discover approximately how long it has
// been down without needing to retrieve liveness records from the cluster.
localStoreLastUpSuffix = []byte("uptm")
// localStoreSuggestedCompactionSuffix stores suggested compactions to
// be aggregated and processed on the store.
localStoreSuggestedCompactionSuffix = []byte("comp")

// LocalStoreSuggestedCompactionsMin is the start of the span of
// possible suggested compaction keys for a store.
LocalStoreSuggestedCompactionsMin = MakeStoreKey(localStoreSuggestedCompactionSuffix, nil)
// LocalStoreSuggestedCompactionsMax is the end of the span of
// possible suggested compaction keys for a store.
LocalStoreSuggestedCompactionsMax = LocalStoreSuggestedCompactionsMin.PrefixEnd()

// LocalRangeIDPrefix is the prefix identifying per-range data
// indexed by Range ID. The Range ID is appended to this prefix,
Expand Down
50 changes: 50 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ func MakeStoreKey(suffix, detail roachpb.RKey) roachpb.Key {
return key
}

// DecodeStoreKey returns the suffix and detail portions of a local
// store key.
func DecodeStoreKey(key roachpb.Key) (suffix, detail roachpb.RKey, err error) {
if !bytes.HasPrefix(key, localStorePrefix) {
return nil, nil, errors.Errorf("key %s does not have %s prefix", key, localStorePrefix)
}
// Cut the prefix, the Range ID, and the infix specifier.
key = key[len(localStorePrefix):]
if len(key) < localSuffixLength {
return nil, nil, errors.Errorf("malformed key does not contain local store suffix")
}
suffix = roachpb.RKey(key[:localSuffixLength])
detail = roachpb.RKey(key[localSuffixLength:])
return suffix, detail, nil
}

// StoreIdentKey returns a store-local key for the store metadata.
func StoreIdentKey() roachpb.Key {
return MakeStoreKey(localStoreIdentSuffix, nil)
Expand All @@ -59,6 +75,40 @@ func StoreLastUpKey() roachpb.Key {
return MakeStoreKey(localStoreLastUpSuffix, nil)
}

// StoreSuggestedCompactionKey returns a store-local key for a
// suggested compaction. It combines the specified start and end keys.
func StoreSuggestedCompactionKey(start, end roachpb.RKey) roachpb.Key {
var detail roachpb.RKey
detail = encoding.EncodeBytesAscending(detail, start)
detail = encoding.EncodeBytesAscending(detail, end)
return MakeStoreKey(localStoreSuggestedCompactionSuffix, detail)
}

// DecodeStoreSuggestedCompactionKey returns the start and end keys of
// the suggested compaction's span.
func DecodeStoreSuggestedCompactionKey(key roachpb.Key) (start, end roachpb.RKey, err error) {
var suffix, detail roachpb.RKey
suffix, detail, err = DecodeStoreKey(key)
if err != nil {
return nil, nil, err
}
if !suffix.Equal(localStoreSuggestedCompactionSuffix) {
return nil, nil, errors.Errorf("key with suffix %q != %q", suffix, localStoreSuggestedCompactionSuffix)
}
detail, start, err = encoding.DecodeBytesAscending(detail, nil)
if err != nil {
return nil, nil, err
}
detail, end, err = encoding.DecodeBytesAscending(detail, nil)
if err != nil {
return nil, nil, err
}
if len(detail) != 0 {
return nil, nil, errors.Errorf("invalid key has trailing garbage: %q", detail)
}
return start, end, nil
}

// NodeLivenessKey returns the key for the node liveness record.
func NodeLivenessKey(nodeID roachpb.NodeID) roachpb.Key {
key := make(roachpb.Key, 0, len(NodeLivenessPrefix)+9)
Expand Down
45 changes: 45 additions & 0 deletions pkg/keys/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,51 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

func TestStoreKeyEncodeDecode(t *testing.T) {
testCases := []struct {
key roachpb.Key
expSuffix roachpb.RKey
expDetail roachpb.RKey
}{
{key: StoreIdentKey(), expSuffix: localStoreIdentSuffix, expDetail: nil},
{key: StoreGossipKey(), expSuffix: localStoreGossipSuffix, expDetail: nil},
{key: StoreClusterVersionKey(), expSuffix: localStoreClusterVersionSuffix, expDetail: nil},
{key: StoreLastUpKey(), expSuffix: localStoreLastUpSuffix, expDetail: nil},
{
key: StoreSuggestedCompactionKey(roachpb.RKey("a"), roachpb.RKey("z")),
expSuffix: localStoreSuggestedCompactionSuffix,
expDetail: encoding.EncodeBytesAscending(encoding.EncodeBytesAscending(nil, roachpb.Key("a")), roachpb.Key("z")),
},
}
for _, test := range testCases {
t.Run("", func(t *testing.T) {
if suffix, detail, err := DecodeStoreKey(test.key); err != nil {
t.Error(err)
} else if !suffix.Equal(test.expSuffix) {
t.Errorf("expected %s; got %s", test.expSuffix, suffix)
} else if !detail.Equal(test.expDetail) {
t.Errorf("expected %s; got %s", test.expDetail, detail)
}
})
}
}

func TestStoreSuggestedCompactionKeyDecode(t *testing.T) {
origStart := roachpb.RKey("a")
origEnd := roachpb.RKey("z")
key := StoreSuggestedCompactionKey(origStart, origEnd)
start, end, err := DecodeStoreSuggestedCompactionKey(key)
if err != nil {
t.Fatal(err)
}
if !start.Equal(origStart) {
t.Errorf("expected %s == %s", start, origStart)
}
if !end.Equal(origEnd) {
t.Errorf("expected %s == %s", end, origEnd)
}
}

// TestLocalKeySorting is a sanity check to make sure that
// the non-replicated part of a store sorts before the meta.
func TestKeySorting(t *testing.T) {
Expand Down
18 changes: 17 additions & 1 deletion pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,25 @@ var constSubKeyDict = []struct {
{"/storeIdent", localStoreIdentSuffix},
{"/gossipBootstrap", localStoreGossipSuffix},
{"/clusterVersion", localStoreClusterVersionSuffix},
{"/suggestedCompaction", localStoreSuggestedCompactionSuffix},
}

func suggestedCompactionKeyPrint(key roachpb.Key) string {
start, end, err := DecodeStoreSuggestedCompactionKey(key)
if err != nil {
return fmt.Sprintf("<invalid: %s>", err)
}
return fmt.Sprintf("{%s-%s}", start, end)
}

func localStoreKeyPrint(_ []encoding.Direction, key roachpb.Key) string {
for _, v := range constSubKeyDict {
if bytes.HasPrefix(key, v.key) {
if v.key.Equal(localStoreSuggestedCompactionSuffix) {
return v.name + "/" + suggestedCompactionKeyPrint(
append(roachpb.Key(nil), append(localStorePrefix, key...)...),
)
}
return v.name
}
}
Expand All @@ -195,7 +209,9 @@ func localStoreKeyPrint(_ []encoding.Direction, key roachpb.Key) string {
func localStoreKeyParse(input string) (remainder string, output roachpb.Key) {
for _, s := range constSubKeyDict {
if strings.HasPrefix(input, s.name) {
remainder = input[len(s.name):]
if s.key.Equal(localStoreSuggestedCompactionSuffix) {
panic(&errUglifyUnsupported{errors.New("cannot parse suggested compaction key")})
}
output = MakeStoreKey(s.key, nil)
return
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func TestPrettyPrint(t *testing.T) {
{StoreIdentKey(), "/Local/Store/storeIdent"},
{StoreGossipKey(), "/Local/Store/gossipBootstrap"},
{StoreClusterVersionKey(), "/Local/Store/clusterVersion"},
{StoreSuggestedCompactionKey(roachpb.RKey(MinKey), roachpb.RKey("b")), `/Local/Store/suggestedCompaction/{/Min-"b"}`},
{StoreSuggestedCompactionKey(roachpb.RKey("a"), roachpb.RKey("b")), `/Local/Store/suggestedCompaction/{"a"-"b"}`},
{StoreSuggestedCompactionKey(roachpb.RKey("a"), roachpb.RKey(MaxKey)), `/Local/Store/suggestedCompaction/{"a"-/Max}`},

{AbortSpanKey(roachpb.RangeID(1000001), txnID), fmt.Sprintf(`/Local/RangeID/1000001/r/AbortSpan/%q`, txnID)},
{RaftTombstoneKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RaftTombstone"},
Expand Down
Loading

0 comments on commit b8439f3

Please sign in to comment.