From 287f8f781cb5564d86a3d55d3c9eb879374d3217 Mon Sep 17 00:00:00 2001 From: Spencer Kimball Date: Fri, 16 Jan 2015 16:08:02 -0500 Subject: [PATCH 1/2] Reorganized all local keys into two sections/range: by ID & by Key. This groups all of the range-local data contiguously into two sections, making range traversals require less seeks and more importantly, less maintenance as new range-local data is added going forward. The trickiest part of this was upgrading the C++ code to properly account for encoded keys, since it now has to decode the key and skip the range ID or range Key to determine whether the tuple being compacted is a transaction record or response cache entry. --- TODO.md | 5 - roachlib/Makefile | 2 +- roachlib/db.cc | 77 ++++++--- roachlib/db.h | 4 - roachlib/encoding.cc | 75 ++++++++ roachlib/encoding.h | 33 ++++ storage/engine/keys.go | 294 ++++++++++++++++++++------------ storage/engine/keys_test.go | 29 +++- storage/engine/rocksdb.go | 10 -- storage/engine/rocksdb_test.go | 20 +-- storage/engine/stat.go | 30 ++-- storage/range.go | 12 +- storage/range_data_iter.go | 20 +-- storage/range_data_iter_test.go | 11 +- storage/response_cache.go | 59 +++---- storage/response_cache_test.go | 4 +- storage/store.go | 30 ++-- storage/store_test.go | 19 ++- util/encoding/key_encoding.go | 2 +- 19 files changed, 455 insertions(+), 281 deletions(-) create mode 100644 roachlib/encoding.cc create mode 100644 roachlib/encoding.h diff --git a/TODO.md b/TODO.md index 16db3d6651da..c4eb9f23123f 100644 --- a/TODO.md +++ b/TODO.md @@ -65,8 +65,3 @@ utilized set may have rebalances in effect. * Cleanup proto files to adhere to proto capitalization instead of go's. - -* Consider moving all local keys into two sections, each prefixed by either - the Raft ID of the range or the start key of the range. This will allow - a less error-prone iteration over the data for a range, instead of having - to include each section of local data separately. \ No newline at end of file diff --git a/roachlib/Makefile b/roachlib/Makefile index 49b5f6532439..d9352a3c59d4 100644 --- a/roachlib/Makefile +++ b/roachlib/Makefile @@ -17,7 +17,7 @@ # Author: Andrew Bonventre (andybons@gmail.com) ROACH_LIB := libroach.a -SOURCES := db.cc +SOURCES := db.cc encoding.cc OBJECTS := $(SOURCES:.cc=.o) CXXFLAGS += -std=c++11 -I../proto/lib -I../_vendor/rocksdb/include diff --git a/roachlib/db.cc b/roachlib/db.cc index c34ddc58717e..9024d3e73043 100644 --- a/roachlib/db.cc +++ b/roachlib/db.cc @@ -29,6 +29,7 @@ #include "data.pb.h" #include "internal.pb.h" #include "db.h" +#include "encoding.h" extern "C" { @@ -53,6 +54,13 @@ struct DBSnapshot { namespace { +// NOTE: these constants must be kept in sync with the values +// in storage/engine/keys.go. +const rocksdb::Slice kKeyLocalRangeIDPrefix("\x00\x00\x00i", 4); +const rocksdb::Slice kKeyLocalRangeKeyPrefix("\x00\x00\x00k", 4); +const rocksdb::Slice kKeyLocalResponseCacheSuffix("res-"); +const rocksdb::Slice kKeyLocalTransactionSuffix("txn-"); + const DBStatus kSuccess = { NULL, 0 }; std::string ToString(DBSlice s) { @@ -141,16 +149,47 @@ const proto::ResponseHeader* GetResponseHeader(const proto::ReadWriteCmdResponse // all ranges in the map. class DBCompactionFilter : public rocksdb::CompactionFilter { public: - DBCompactionFilter(const std::string& txn_prefix, - const std::string& rcache_prefix, - int64_t min_txn_ts, + DBCompactionFilter(int64_t min_txn_ts, int64_t min_rcache_ts) - : txn_prefix_(txn_prefix), - rcache_prefix_(rcache_prefix), - min_txn_ts_(min_txn_ts), + : min_txn_ts_(min_txn_ts), min_rcache_ts_(min_rcache_ts) { } + // IsKeyOfType determines whether key, when binary-decoded, matches + // the format: [enc-value]\x00[remainder]. + bool IsKeyOfType(const rocksdb::Slice& key, const rocksdb::Slice& prefix, const rocksdb::Slice& suffix) const { + std::string decStr; + if (!DecodeBinary(key, &decStr, NULL)) { + return false; + } + rocksdb::Slice decKey(decStr); + if (!decKey.starts_with(prefix)) { + return false; + } + decKey.remove_prefix(prefix.size()); + + // Remove bytes up to including the first null byte. + int i = 0; + for (; i < decKey.size(); i++) { + if (decKey[i] == 0x0) { + break; + } + } + if (i == decKey.size()) { + return false; + } + decKey.remove_prefix(i+1); + return decKey.starts_with(suffix); + } + + bool IsResponseCacheEntry(const rocksdb::Slice& key) const { + return IsKeyOfType(key, kKeyLocalRangeIDPrefix, kKeyLocalResponseCacheSuffix); + } + + bool IsTransactionRecord(const rocksdb::Slice& key) const { + return IsKeyOfType(key, kKeyLocalRangeKeyPrefix, kKeyLocalTransactionSuffix); + } + virtual bool Filter(int level, const rocksdb::Slice& key, const rocksdb::Slice& existing_value, @@ -159,7 +198,9 @@ class DBCompactionFilter : public rocksdb::CompactionFilter { *value_changed = false; // Only filter response cache entries and transaction rows. - if (!key.starts_with(rcache_prefix_) && !key.starts_with(txn_prefix_)) { + bool is_rcache = IsResponseCacheEntry(key); + bool is_txn = IsTransactionRecord(key); + if (!is_rcache && !is_txn) { return false; } // Parse MVCC metadata for inlined value. @@ -174,7 +215,7 @@ class DBCompactionFilter : public rocksdb::CompactionFilter { } // Response cache rows are GC'd if their timestamp is older than the // response cache GC timeout. - if (key.starts_with(rcache_prefix_)) { + if (is_rcache) { proto::ReadWriteCmdResponse rwResp; if (!rwResp.ParseFromArray(meta.value().bytes().data(), meta.value().bytes().size())) { // *error_msg = (char*)"failed to parse response cache entry"; @@ -188,7 +229,7 @@ class DBCompactionFilter : public rocksdb::CompactionFilter { if (header->timestamp().wall_time() <= min_rcache_ts_) { return true; } - } else if (key.starts_with(txn_prefix_)) { + } else if (is_txn) { // Transaction rows are GC'd if their timestamp is older than // the system-wide minimum write intent timestamp. This // system-wide minimum write intent is periodically computed via @@ -210,26 +251,19 @@ class DBCompactionFilter : public rocksdb::CompactionFilter { } private: - const std::string txn_prefix_; - const std::string rcache_prefix_; const int64_t min_txn_ts_; const int64_t min_rcache_ts_; }; class DBCompactionFilterFactory : public rocksdb::CompactionFilterFactory { public: - DBCompactionFilterFactory(const std::string& txn_prefix, - const std::string& rcache_prefix) - : txn_prefix_(txn_prefix), - rcache_prefix_(rcache_prefix) { - } + DBCompactionFilterFactory() {} virtual std::unique_ptr CreateCompactionFilter( const rocksdb::CompactionFilter::Context& context) override { google::protobuf::MutexLock l(&mu_); // Protect access to gc timeouts. return std::unique_ptr( - new DBCompactionFilter(txn_prefix_, rcache_prefix_, - min_txn_ts_, min_rcache_ts_)); + new DBCompactionFilter(min_txn_ts_, min_rcache_ts_)); } virtual const char* Name() const override { @@ -243,9 +277,6 @@ class DBCompactionFilterFactory : public rocksdb::CompactionFilterFactory { } private: - const std::string txn_prefix_; - const std::string rcache_prefix_; - google::protobuf::Mutex mu_; // Protects values below. int64_t min_txn_ts_; int64_t min_rcache_ts_; @@ -592,9 +623,7 @@ DBStatus DBOpen(DBEngine **db, DBSlice dir, DBOptions db_opts) { rocksdb::Options options; options.block_cache = rocksdb::NewLRUCache(db_opts.cache_size); options.allow_os_buffer = db_opts.allow_os_buffer; - options.compaction_filter_factory.reset(new DBCompactionFilterFactory( - ToString(db_opts.txn_prefix), - ToString(db_opts.rcache_prefix))); + options.compaction_filter_factory.reset(new DBCompactionFilterFactory()); options.create_if_missing = true; options.info_log.reset(new DBLogger(db_opts.logger)); options.merge_operator.reset(new DBMergeOperator); diff --git a/roachlib/db.h b/roachlib/db.h index 155cf439f1ce..ab2596991218 100644 --- a/roachlib/db.h +++ b/roachlib/db.h @@ -53,10 +53,6 @@ typedef void (*DBLoggerFunc)(void* state, const char* str, int len); typedef struct { int64_t cache_size; int allow_os_buffer; - // The key prefix for transaction keys. - DBSlice txn_prefix; - // The key prefix for response cache keys. - DBSlice rcache_prefix; // A function pointer to direct log messages to. DBLoggerFunc logger; } DBOptions; diff --git a/roachlib/encoding.cc b/roachlib/encoding.cc new file mode 100644 index 000000000000..e45b31ce4ae6 --- /dev/null +++ b/roachlib/encoding.cc @@ -0,0 +1,75 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Spencer Kimball (spencer.kimball@gmail.com) + +#include "rocksdb/slice.h" + +namespace { + +const unsigned char kOrderedEncodingBinary = 0x25; +const unsigned char kOrderedEncodingTerminator = 0x00; + +} + +bool DecodeBinary(const rocksdb::Slice& buf, std::string* decoded, std::string* remainder) { + if (buf[0] != kOrderedEncodingBinary) { + fprintf(stderr, "%s doesn't begin with binary encoding byte\n", buf.ToString().c_str()); + return false; + } + decoded->clear(); + int s = 6; + int i = 1; + if (buf[i] == kOrderedEncodingTerminator) { + if (remainder != NULL) { + rocksdb::Slice remSlice(buf); + remSlice.remove_prefix(2); + *remainder = remSlice.ToString(); + } + return true; + } + + int t = (buf[i] << 1) & 0xff; + for (i = 2; buf[i] != kOrderedEncodingTerminator; i++) { + if (s == 7) { + decoded->push_back(t | (buf[i] & 0x7f)); + i++; + } else { + decoded->push_back(t | ((buf[i] & 0x7f) >> s)); + } + + t = (buf[i] << (8 - s)) & 0xff; + + if (buf[i] == kOrderedEncodingTerminator) { + break; + } + + if (s == 1) { + s = 7; + } else { + s--; + } + } + if (t != 0) { + fprintf(stderr, "%s doesn't begin with binary encoding byte\n", buf.ToString().c_str()); + return false; + } + if (remainder != NULL) { + rocksdb::Slice remSlice(buf); + remSlice.remove_prefix(i+1); + *remainder = remSlice.ToString(); + } + return true; +} diff --git a/roachlib/encoding.h b/roachlib/encoding.h new file mode 100644 index 000000000000..d331b4b0e463 --- /dev/null +++ b/roachlib/encoding.h @@ -0,0 +1,33 @@ +// Copyright 2015 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Spencer Kimball (spencer.kimball@gmail.com) + +#ifndef ROACHLIB_ENCODING_H +#define ROACHLIB_ENCODING_H + +#include + +// DecodeBinary decodes the given key-encoded buf slice, returning +// true on a successful decode. The the unencoded bytes are returned +// in *decoded, and if not NULL, any remaining bytes are returned in +// *remainder. +bool DecodeBinary(const rocksdb::Slice& buf, std::string* decoded, std::string* remainder); + +#endif // ROACHLIB_ENCODING_H + +// local variables: +// mode: c++ +// end: diff --git a/storage/engine/keys.go b/storage/engine/keys.go index 08c912baf542..07e6a9830029 100644 --- a/storage/engine/keys.go +++ b/storage/engine/keys.go @@ -33,16 +33,111 @@ func MakeKey(keys ...proto.Key) proto.Key { return proto.MakeKey(keys...) } -// MakeLocalKey is a simple passthrough to MakeKey, with verification -// that the first key has length KeyLocalPrefixLength. -func MakeLocalKey(keys ...proto.Key) proto.Key { - if len(keys) == 0 { - log.Fatal("no key components specified in call to MakeLocalKey") +// MakeStoreKey creates a store-local key based on the metadata key +// suffix, and optional detail. +func MakeStoreKey(suffix, detail proto.Key) proto.Key { + return MakeKey(KeyLocalStorePrefix, suffix, detail) +} + +// StoreIdentKey returns a store-local key for the store metadata. +func StoreIdentKey() proto.Key { + return MakeStoreKey(KeyLocalStoreIdentSuffix, proto.Key{}) +} + +// MakeRangeIDKey creates a range-local key based on the range's +// Raft ID, metadata key suffix, and optional detail (e.g. the +// encoded command ID for a response cache entry, etc.). +func MakeRangeIDKey(raftID int64, suffix, detail proto.Key) proto.Key { + if len(suffix) != KeyLocalSuffixLength { + panic(fmt.Sprintf("suffix len(%q) != %d", suffix, KeyLocalSuffixLength)) } - if len(keys[0]) != KeyLocalPrefixLength { - log.Fatalf("local key prefix length must be %d: %q", KeyLocalPrefixLength, keys[0]) + return MakeKey(KeyLocalRangeIDPrefix, encoding.EncodeInt(nil, raftID), suffix, detail) +} + +// RaftLogKey returns a system-local key for a Raft log entry. +func RaftLogKey(raftID int64, logIndex uint64) proto.Key { + // The log is stored "backwards" so we can easily find the highest index stored. + return MakeRangeIDKey(raftID, KeyLocalRaftLogSuffix, encoding.EncodeUint64Decreasing(nil, logIndex)) +} + +// RaftLogPrefix returns the system-local prefix shared by all entries in a Raft log. +func RaftLogPrefix(raftID int64) proto.Key { + return MakeKey(KeyLocalRangeIDPrefix, encoding.EncodeInt(nil, raftID), KeyLocalRaftLogSuffix) +} + +// RaftStateKey returns a system-local key for a Raft HardState. +func RaftStateKey(raftID int64) proto.Key { + return MakeRangeIDKey(raftID, KeyLocalRaftStateSuffix, proto.Key{}) +} + +// DecodeRaftStateKey extracts the Raft ID from a RaftStateKey. +func DecodeRaftStateKey(key proto.Key) int64 { + if !bytes.HasPrefix(key, KeyLocalRangeIDPrefix) { + panic(fmt.Sprintf("key %q does not have %q prefix", key, KeyLocalRangeIDPrefix)) } - return proto.MakeKey(keys...) + // Cut the prefix and the Raft ID. + b := key[len(KeyLocalRangeIDPrefix):] + _, raftID := encoding.DecodeInt(b) + return raftID +} + +// ResponseCacheKey returns a range-local key by Raft ID for a +// response cache entry, with detail specified by encoding the +// supplied client command ID. +func ResponseCacheKey(raftID int64, cmdID *proto.ClientCmdID) proto.Key { + detail := proto.Key{} + if cmdID != nil { + detail = encoding.EncodeInt(nil, cmdID.WallTime) // wall time helps sort for locality + detail = encoding.EncodeInt(detail, cmdID.Random) // TODO(spencer): encode as Fixed64 + } + return MakeRangeIDKey(raftID, KeyLocalResponseCacheSuffix, detail) +} + +// MakeRangeKey creates a range-local key based on the range +// start key, metadata key suffix, and optional detail (e.g. the +// transaction UUID for a txn record, etc.). +func MakeRangeKey(key, suffix, detail proto.Key) proto.Key { + if len(suffix) != KeyLocalSuffixLength { + panic(fmt.Sprintf("suffix len(%q) != %d", suffix, KeyLocalSuffixLength)) + } + return MakeKey(KeyLocalRangeKeyPrefix, encoding.EncodeBinary(nil, key), suffix, detail) +} + +// DecodeRangeKey decodes the range key into range start key, +// suffix and optional detail (may be nil). +func DecodeRangeKey(key proto.Key) (startKey, suffix, detail proto.Key) { + if !bytes.HasPrefix(key, KeyLocalRangeKeyPrefix) { + panic(fmt.Sprintf("key %q does not have %q prefix", key, KeyLocalRangeKeyPrefix)) + } + // Cut the prefix and the Raft ID. + b := key[len(KeyLocalRangeKeyPrefix):] + b, startKey = encoding.DecodeBinary(b) + if len(b) < KeyLocalSuffixLength { + panic(fmt.Sprintf("key %q does not have suffix of length %d", key, KeyLocalSuffixLength)) + } + // Cut the response cache suffix. + suffix = b[:KeyLocalSuffixLength] + detail = b[KeyLocalSuffixLength:] + return +} + +// RangeScanMetadataKey returns a range-local key for range scan +// metadata. +func RangeScanMetadataKey(key proto.Key) proto.Key { + return MakeRangeKey(key, KeyLocalRangeScanMetadataSuffix, proto.Key{}) +} + +// RangeDescriptorKey returns a range-local key for the descriptor +// for the range with specified key. +func RangeDescriptorKey(key proto.Key) proto.Key { + return MakeRangeKey(key, KeyLocalRangeDescriptorSuffix, proto.Key{}) +} + +// TransactionKey returns a transaction key based on the provided +// transaction key and ID. The base key is encoded in order to +// guarantee that all transaction records for a range sort together. +func TransactionKey(key proto.Key, id []byte) proto.Key { + return MakeRangeKey(key, KeyLocalTransactionSuffix, proto.Key(id)) } // KeyAddress returns the address for the key, used to lookup the @@ -54,31 +149,22 @@ func MakeLocalKey(keys ...proto.Key) proto.Key { // as non-local keys, but are stored separately so that they don't // collide with user-space or global system keys. // -// However, not all local keys are addressable in the global map. -// Range metadata, response cache entries, and various other keys are -// strictly local, as the non-local suffix is not itself a key -// (e.g. in the case of range metadata, it's the encoded range ID) and -// so are not globally addressable. +// However, not all local keys are addressable in the global map. Only +// range local keys incorporating a range key (start key or transaction +// key) are addressable (e.g. range metadata and txn records). Range +// local keys incorporating the Raft ID are not (e.g. response cache +// entries, and range stats). func KeyAddress(k proto.Key) proto.Key { if !bytes.HasPrefix(k, KeyLocalPrefix) { return k } - if len(k) < KeyLocalPrefixLength { - log.Fatalf("local key %q malformed; should contain prefix %q and four-character designation", k, KeyLocalPrefix) + if bytes.HasPrefix(k, KeyLocalRangeKeyPrefix) { + k = k[len(KeyLocalRangeKeyPrefix):] + _, k = encoding.DecodeBinary(k) + return k } - return k[KeyLocalPrefixLength:] -} - -// RangeScanMetadataKey returns a system-local key for range scan -// metadata. -func RangeScanMetadataKey(startKey proto.Key) proto.Key { - return MakeLocalKey(KeyLocalRangeScanMetadataPrefix, startKey) -} - -// RangeDescriptorKey returns a system-local key for the descriptor -// for the range with specified start key. -func RangeDescriptorKey(startKey proto.Key) proto.Key { - return MakeLocalKey(KeyLocalRangeDescriptorPrefix, startKey) + log.Fatalf("local key %q malformed; should contain prefix %q", k, KeyLocalRangeKeyPrefix) + return nil } // RangeMetaKey returns a range metadata (meta1, meta2) indexing key @@ -106,13 +192,6 @@ func RangeMetaLookupKey(r *proto.RangeDescriptor) proto.Key { return RangeMetaKey(r.EndKey) } -// TransactionKey returns a transaction key based on the provided -// transaction key and ID. The base key is encoded in order to -// guarantee that all transaction records for a range sort together. -func TransactionKey(key proto.Key, id []byte) proto.Key { - return MakeKey(KeyLocalTransactionPrefix, encoding.EncodeBinary(nil, key), id) -} - // ValidateRangeMetaKey validates that the given key is a valid Range Metadata // key. It must have an appropriate metadata range prefix, and the original key // value must be less than KeyMax. As a special case, KeyMin is considered a @@ -144,40 +223,6 @@ func ValidateRangeMetaKey(key proto.Key) error { return nil } -// RaftLogKey returns a system-local key for a raft log entry. -func RaftLogKey(raftID, logIndex uint64) proto.Key { - b := RaftLogPrefix(raftID) - // The log is stored "backwards" so we can easily find the highest index stored. - b = encoding.EncodeUint64Decreasing(b, logIndex) - return b -} - -// RaftLogPrefix returns the system-local prefix shared by all entries in a raft log. -func RaftLogPrefix(raftID uint64) proto.Key { - b := MakeLocalKey(KeyLocalRaftLogPrefix) - b = encoding.EncodeUint64(b, raftID) - return b -} - -// RaftStateKey returns a system-local key for a raft HardState. -func RaftStateKey(raftID uint64) proto.Key { - b := MakeLocalKey(KeyLocalRaftStatePrefix) - b = encoding.EncodeUint64(b, raftID) - return b -} - -// DecodeRaftStateKey extracts the raft ID from a RaftStateKey. -func DecodeRaftStateKey(k proto.Key) uint64 { - _, raftID := encoding.DecodeUint64(k[len(KeyLocalRaftStatePrefix):]) - return raftID -} - -func init() { - if KeyLocalPrefixLength%7 != 0 { - log.Fatalf("local key prefix is not a multiple of 7: %d", KeyLocalPrefixLength) - } -} - // Constants for system-reserved keys in the KV map. var ( // KeyMaxLength is the maximum key length in bytes. This value is @@ -194,56 +239,81 @@ var ( KeyMax = proto.KeyMax // KeyLocalPrefix is the prefix for keys which hold data local to a - // RocksDB instance, such as range accounting information - // (e.g. range metadata, range-spanning binary tree node pointers), - // response cache values, transaction records, and message - // queues. Some local data are replicated, such as transaction rows, - // but are located in the local area so that they remain in - // proximity to one or more keys which they affect, but without - // unnecessarily polluting the key space. Further, some local data - // are stored with MVCC and contribute to distributed transactions, - // such as range metadata, range-spanning binary tree node pointers, - // and message queues. + // RocksDB instance, such as store and range-specific metadata which + // must not pollute the user key space, but must be colocated with + // the store and/or ranges which they refer to. Storing this + // information in the normal system keyspace would place the data on + // an arbitrary set of stores, with no guarantee of colocation. + // Local data includes store metadata, range metadata, response + // cache values, transaction records, range-spanning binary tree + // node pointers, and message queues. // // The local key prefix has been deliberately chosen to sort before // the KeySystemPrefix, because these local keys are not addressable // via the meta range addressing indexes. + // + // Some local data are not replicated, such as the store's 'ident' + // record. Most local data are replicated, such as response cache + // entries and transaction rows, but are not addressable as normal + // MVCC values as part of transactions. Finally, some local data are + // stored as MVCC values and are addressable as part of distributed + // transactions, such as range metadata, range-spanning binary tree + // node pointers, and message queues. KeyLocalPrefix = proto.Key("\x00\x00\x00") - // KeyLocalPrefixLength is the maximum length of the local prefix. - // It includes both the standard prefix and an additional four - // characters to designate the type of local data. - // - // NOTE: this is very important! In order to support prefix matches - // (e.g. for garbage collection of transaction and response cache - // rows), the number of bytes in the key local prefix must be a - // multiple of 7. This provides an encoded binary string with no - // leftover bits to "bleed" into the next byte in the non-prefix - // part of the local key. - KeyLocalPrefixLength = len(KeyLocalPrefix) + 4 - - // KeyLocalIdent stores an immutable identifier for this store, - // created when the store is first bootstrapped. - KeyLocalIdent = MakeKey(KeyLocalPrefix, proto.Key("iden")) - // KeyLocalRangeDescriptorPrefix is the prefix for keys storing + // KeyLocalSuffixLength specifies the length in bytes of all local + // key suffixes. + KeyLocalSuffixLength = 4 + + // There are three types of local key data enumerated below: + // store-local, range-local by ID, and range-local by key. + + // KeyLocalStorePrefix is the prefix identifying per-store data. + KeyLocalStorePrefix = MakeKey(KeyLocalPrefix, proto.Key("s")) + // KeyLocalStoreIdentSuffix stores an immutable identifier for this + // store, created when the store is first bootstrapped. + KeyLocalStoreIdentSuffix = proto.Key("iden") + // KeyLocalStoreStatSuffix is the suffix for store statistics. + KeyLocalStoreStatSuffix = proto.Key("sst-") + + // KeyLocalRangeIDPrefix is the prefix identifying per-range data + // indexed by Raft ID. The Raft ID is appended to this prefix, + // encoded using EncodeInt. The specific sort of per-range metadata + // is identified by one of the suffixes listed below, along with + // potentially additional encoded key info, such as a command ID in + // the case of response cache entry. + KeyLocalRangeIDPrefix = MakeKey(KeyLocalPrefix, proto.Key("i")) + // KeyLocalRaftLogSuffix is the suffix for the raft log. + KeyLocalRaftLogSuffix = proto.Key("rftl") + // KeyLocalRaftStateSuffix is the Suffix for the raft HardState. + KeyLocalRaftStateSuffix = proto.Key("rfts") + // KeyLocalRangeStatSuffix is the suffix for range statistics. + KeyLocalRangeStatSuffix = proto.Key("rst-") + // KeyLocalResponseCacheSuffix is the suffix for keys storing + // command responses used to guarantee idempotency (see + // ResponseCache). + // NOTE: if this value changes, it must be updated in C++ + // (roachlib/db.cc). + KeyLocalResponseCacheSuffix = proto.Key("res-") + + // KeyLocalRangeKeyPrefix is the prefix identifying per-range data + // indexed by range key (either start key, or some key in the + // range). The key is appended to this prefix, encoded using + // EncodeBinary. The specific sort of per-range metadata is + // identified by one of the suffixes listed below, along with + // potentially additional encoded key info, such as the txn UUID in + // the case of a transaction record. + KeyLocalRangeKeyPrefix = MakeKey(KeyLocalPrefix, proto.Key("k")) + // KeyLocalRangeDescriptorSuffix is the suffix for keys storing // range descriptors. The value is a struct of type RangeDescriptor. - KeyLocalRangeDescriptorPrefix = MakeKey(KeyLocalPrefix, proto.Key("rng-")) - // KeyLocalRangeScanMetadataPrefix is the prefix for a range's scan metadata. - KeyLocalRangeScanMetadataPrefix = MakeKey(KeyLocalPrefix, proto.Key("rsm-")) - // KeyLocalRangeStatPrefix is the prefix for range statistics. - KeyLocalRangeStatPrefix = MakeKey(KeyLocalPrefix, proto.Key("rst-")) - // KeyLocalResponseCachePrefix is the prefix for keys storing command - // responses used to guarantee idempotency (see ResponseCache). - KeyLocalResponseCachePrefix = MakeKey(KeyLocalPrefix, proto.Key("res-")) - // KeyLocalStoreStatPrefix is the prefix for store statistics. - KeyLocalStoreStatPrefix = MakeKey(KeyLocalPrefix, proto.Key("sst-")) - // KeyLocalTransactionPrefix specifies the key prefix for - // transaction records. The suffix is the transaction id. - KeyLocalTransactionPrefix = MakeKey(KeyLocalPrefix, proto.Key("txn-")) - // KeyLocalRaftLogPrefix is the prefix for the raft log. - KeyLocalRaftLogPrefix = MakeKey(KeyLocalPrefix, proto.Key("rftl")) - // KeyLocalRaftStatePrefix is the prefix for the raft HardState. - KeyLocalRaftStatePrefix = MakeKey(KeyLocalPrefix, proto.Key("rfts")) + KeyLocalRangeDescriptorSuffix = proto.Key("rdsc") + // KeyLocalRangeScanMetadataSuffix is the suffix for a range's scan metadata. + KeyLocalRangeScanMetadataSuffix = proto.Key("rscm") + // KeyLocalTransactionSuffix specifies the key suffix for + // transaction records. The additional detail is the transaction id. + // NOTE: if this value changes, it must be updated in C++ + // (roachlib/db.cc). + KeyLocalTransactionSuffix = proto.Key("txn-") // KeyLocalMax is the end of the local key range. KeyLocalMax = KeyLocalPrefix.PrefixEnd() diff --git a/storage/engine/keys_test.go b/storage/engine/keys_test.go index c0ef046c08ab..c8392718970a 100644 --- a/storage/engine/keys_test.go +++ b/storage/engine/keys_test.go @@ -21,6 +21,7 @@ import ( "bytes" "testing" + "code.google.com/p/go-uuid/uuid" "github.com/cockroachdb/cockroach/proto" ) @@ -48,6 +49,26 @@ func TestMakeKey(t *testing.T) { } } +func TestKeyAddress(t *testing.T) { + testCases := []struct { + key, expAddress proto.Key + }{ + {proto.Key{}, KeyMin}, + {proto.Key("123"), proto.Key("123")}, + {MakeKey(KeyConfigAccountingPrefix, proto.Key("foo")), proto.Key("\x00acctfoo")}, + {RangeDescriptorKey(proto.Key("foo")), proto.Key("foo")}, + {RangeScanMetadataKey(proto.Key("bar")), proto.Key("bar")}, + {TransactionKey(proto.Key("baz"), proto.Key(uuid.New())), proto.Key("baz")}, + {TransactionKey(KeyMax, proto.Key(uuid.New())), KeyMax}, + } + for i, test := range testCases { + result := KeyAddress(test.key) + if !result.Equal(test.expAddress) { + t.Errorf("%d: expected address for key %q doesn't match %q", i, test.key, test.expAddress) + } + } +} + func TestRangeMetaKey(t *testing.T) { testCases := []struct { key, expKey proto.Key @@ -56,14 +77,6 @@ func TestRangeMetaKey(t *testing.T) { key: proto.Key{}, expKey: KeyMin, }, - { - key: MakeLocalKey(KeyLocalTransactionPrefix, proto.Key("foo")), - expKey: proto.Key("\x00\x00meta2foo"), - }, - { - key: MakeLocalKey(KeyLocalResponseCachePrefix, proto.Key("bar")), - expKey: proto.Key("\x00\x00meta2bar"), - }, { key: MakeKey(KeyConfigAccountingPrefix, proto.Key("foo")), expKey: proto.Key("\x00\x00meta2\x00acctfoo"), diff --git a/storage/engine/rocksdb.go b/storage/engine/rocksdb.go index 1b42b0e62ec3..b2701249ee73 100644 --- a/storage/engine/rocksdb.go +++ b/storage/engine/rocksdb.go @@ -73,20 +73,10 @@ func (r *RocksDB) Start() error { return nil } - // Encoded keys have a nul-byte suffix as part of their encoding. We - // need to trim this suffix in order to get the prefix that is - // common to transaction and response cache keys. - txnPrefix := goToCSlice(MVCCEncodeKey(KeyLocalTransactionPrefix)) - txnPrefix.len-- // Trim nul-byte suffix - rcachePrefix := goToCSlice(MVCCEncodeKey(KeyLocalResponseCachePrefix)) - rcachePrefix.len-- // Trim nul-byte suffix - status := C.DBOpen(&r.rdb, goToCSlice([]byte(r.dir)), C.DBOptions{ cache_size: C.int64_t(*cacheSize), allow_os_buffer: C.int(1), - txn_prefix: txnPrefix, - rcache_prefix: rcachePrefix, logger: C.DBLoggerFunc(nil), }) err := statusToError(status) diff --git a/storage/engine/rocksdb_test.go b/storage/engine/rocksdb_test.go index a4f0456d840a..c912206c0ff3 100644 --- a/storage/engine/rocksdb_test.go +++ b/storage/engine/rocksdb_test.go @@ -25,6 +25,7 @@ import ( "reflect" "testing" + "code.google.com/p/go-uuid/uuid" "github.com/cockroachdb/cockroach/proto" "github.com/cockroachdb/cockroach/util" "github.com/cockroachdb/cockroach/util/encoding" @@ -63,8 +64,8 @@ func encodeTransaction(timestamp proto.Timestamp, t *testing.T) []byte { } // TestRocksDBCompaction verifies that a garbage collector can be -// installed on a RocksDB engine and will properly compact entries -// response cache and transaction entries. +// installed on a RocksDB engine and will properly compact response +// cache and transaction entries. func TestRocksDBCompaction(t *testing.T) { gob.Register(proto.Timestamp{}) loc := util.CreateTempDirectory() @@ -81,26 +82,25 @@ func TestRocksDBCompaction(t *testing.T) { } }(t) - rcPre := KeyLocalResponseCachePrefix - txnPre := KeyLocalTransactionPrefix + cmdID := &proto.ClientCmdID{WallTime: 1, Random: 1} // Write two transaction values and two response cache values such // that exactly one of each should be GC'd based on our GC timeouts. kvs := []proto.KeyValue{ proto.KeyValue{ - Key: MakeLocalKey(rcPre, proto.Key("a")), + Key: ResponseCacheKey(1, cmdID), Value: proto.Value{Bytes: encodePutResponse(makeTS(2, 0), t)}, }, proto.KeyValue{ - Key: MakeLocalKey(rcPre, proto.Key("b")), + Key: ResponseCacheKey(2, cmdID), Value: proto.Value{Bytes: encodePutResponse(makeTS(3, 0), t)}, }, proto.KeyValue{ - Key: MakeLocalKey(txnPre, proto.Key("a")), + Key: TransactionKey(proto.Key("a"), proto.Key(uuid.New())), Value: proto.Value{Bytes: encodeTransaction(makeTS(1, 0), t)}, }, proto.KeyValue{ - Key: MakeLocalKey(txnPre, proto.Key("b")), + Key: TransactionKey(proto.Key("b"), proto.Key(uuid.New())), Value: proto.Value{Bytes: encodeTransaction(makeTS(2, 0), t)}, }, } @@ -121,8 +121,8 @@ func TestRocksDBCompaction(t *testing.T) { keys = append(keys, kv.Key) } expKeys := []proto.Key{ - MakeLocalKey(rcPre, proto.Key("b")), - MakeLocalKey(txnPre, proto.Key("b")), + kvs[1].Key, + kvs[3].Key, } if !reflect.DeepEqual(expKeys, keys) { t.Errorf("expected keys %+v, got keys %+v", expKeys, keys) diff --git a/storage/engine/stat.go b/storage/engine/stat.go index 22745196c0fd..b5d4d757bf36 100644 --- a/storage/engine/stat.go +++ b/storage/engine/stat.go @@ -19,7 +19,6 @@ package engine import ( "github.com/cockroachdb/cockroach/proto" - "github.com/cockroachdb/cockroach/util/encoding" gogoproto "github.com/gogo/protobuf/proto" ) @@ -53,25 +52,23 @@ var ( StatIntentCount = proto.Key("intent-count") ) -// MakeRangeStatKey returns the key for accessing the named stat +// RangeStatKey returns the key for accessing the named stat // for the specified Raft ID. -func MakeRangeStatKey(raftID int64, stat proto.Key) proto.Key { - encRaftID := encoding.EncodeInt(nil, raftID) - return MakeKey(KeyLocalRangeStatPrefix, encRaftID, stat) +func RangeStatKey(raftID int64, stat proto.Key) proto.Key { + return MakeRangeIDKey(raftID, KeyLocalRangeStatSuffix, stat) } -// MakeStoreStatKey returns the key for accessing the named stat +// StoreStatKey returns the key for accessing the named stat // for the specified store ID. -func MakeStoreStatKey(storeID int32, stat proto.Key) proto.Key { - encStoreID := encoding.EncodeInt(nil, int64(storeID)) - return MakeKey(KeyLocalStoreStatPrefix, encStoreID, stat) +func StoreStatKey(storeID int32, stat proto.Key) proto.Key { + return MakeStoreKey(KeyLocalStoreStatSuffix, stat) } // GetRangeStat fetches the specified stat from the provided engine. // If the stat could not be found, returns 0. An error is returned // on stat decode error. func GetRangeStat(engine Engine, raftID int64, stat proto.Key) (int64, error) { - val, err := MVCCGet(engine, MakeRangeStatKey(raftID, stat), proto.ZeroTimestamp, nil) + val, err := MVCCGet(engine, RangeStatKey(raftID, stat), proto.ZeroTimestamp, nil) if err != nil || val == nil { return 0, err } @@ -87,12 +84,12 @@ func MergeStat(engine Engine, raftID int64, storeID int32, stat proto.Key, statV } value := proto.Value{Integer: gogoproto.Int64(statVal)} if raftID != 0 { - if err := MVCCMerge(engine, nil, MakeRangeStatKey(raftID, stat), value); err != nil { + if err := MVCCMerge(engine, nil, RangeStatKey(raftID, stat), value); err != nil { return err } } if storeID != 0 { - if err := MVCCMerge(engine, nil, MakeStoreStatKey(storeID, stat), value); err != nil { + if err := MVCCMerge(engine, nil, StoreStatKey(storeID, stat), value); err != nil { return err } } @@ -105,12 +102,12 @@ func MergeStat(engine Engine, raftID int64, storeID int32, stat proto.Key, statV func SetStat(engine Engine, raftID int64, storeID int32, stat proto.Key, statVal int64) error { value := proto.Value{Integer: gogoproto.Int64(statVal)} if raftID != 0 { - if err := MVCCPut(engine, nil, MakeRangeStatKey(raftID, stat), proto.ZeroTimestamp, value, nil); err != nil { + if err := MVCCPut(engine, nil, RangeStatKey(raftID, stat), proto.ZeroTimestamp, value, nil); err != nil { return err } } if storeID != 0 { - if err := MVCCPut(engine, nil, MakeStoreStatKey(storeID, stat), proto.ZeroTimestamp, value, nil); err != nil { + if err := MVCCPut(engine, nil, StoreStatKey(storeID, stat), proto.ZeroTimestamp, value, nil); err != nil { return err } } @@ -133,7 +130,8 @@ func GetRangeSize(engine Engine, raftID int64) (int64, error) { // ClearRangeStats clears stats for the specified range. func ClearRangeStats(engine Engine, raftID int64) error { - statStartKey := MakeKey(KeyLocalRangeStatPrefix, encoding.EncodeInt(nil, raftID)) - _, err := ClearRange(engine, MVCCEncodeKey(statStartKey), MVCCEncodeKey(statStartKey.PrefixEnd())) + statStartKey := RangeStatKey(raftID, proto.Key{}) + statEndKey := RangeStatKey(raftID+1, proto.Key{}) + _, err := ClearRange(engine, MVCCEncodeKey(statStartKey), MVCCEncodeKey(statEndKey)) return err } diff --git a/storage/range.go b/storage/range.go index 55fcd28efd99..520cb6f5b0d1 100644 --- a/storage/range.go +++ b/storage/range.go @@ -1401,7 +1401,7 @@ func (r *Range) InitialState() (raftpb.HardState, raftpb.ConfState, error) { cs := raftpb.ConfState{ Nodes: []uint64{1}, } - _, err := engine.MVCCGetProto(r.rm.Engine(), engine.RaftStateKey(uint64(r.Desc.RaftID)), + _, err := engine.MVCCGetProto(r.rm.Engine(), engine.RaftStateKey(r.Desc.RaftID), proto.ZeroTimestamp, nil, &hs) if err != nil { return hs, cs, err @@ -1412,7 +1412,7 @@ func (r *Range) InitialState() (raftpb.HardState, raftpb.ConfState, error) { // loadLastIndex looks in the engine to find the last log index. func (r *Range) loadLastIndex() error { - logKey := engine.RaftLogPrefix(uint64(r.Desc.RaftID)) + logKey := engine.RaftLogPrefix(r.Desc.RaftID) kvs, err := engine.MVCCScan(r.rm.Engine(), logKey, logKey.PrefixEnd(), 1, // only the first (i.e. newest) result) @@ -1440,8 +1440,8 @@ func (r *Range) Entries(lo, hi uint64) ([]raftpb.Entry, error) { // MVCCScan is inclusive in the other direction we must increment both the // start and end keys. kvs, err := engine.MVCCScan(r.rm.Engine(), - engine.RaftLogKey(uint64(r.Desc.RaftID), hi).Next(), - engine.RaftLogKey(uint64(r.Desc.RaftID), lo).Next(), + engine.RaftLogKey(r.Desc.RaftID, hi).Next(), + engine.RaftLogKey(r.Desc.RaftID, lo).Next(), 0, proto.ZeroTimestamp, nil) if err != nil { return nil, err @@ -1501,7 +1501,7 @@ func (r *Range) Snapshot() (raftpb.Snapshot, error) { func (r *Range) Append(entries []raftpb.Entry) error { batch := r.rm.Engine().NewBatch() for _, ent := range entries { - err := engine.MVCCPutProto(batch, nil, engine.RaftLogKey(uint64(r.Desc.RaftID), ent.Index), + err := engine.MVCCPutProto(batch, nil, engine.RaftLogKey(r.Desc.RaftID, ent.Index), proto.ZeroTimestamp, nil, &ent) if err != nil { return err @@ -1518,6 +1518,6 @@ func (r *Range) Append(entries []raftpb.Entry) error { // SetHardState implements the multiraft.WriteableGroupStorage interface. func (r *Range) SetHardState(st raftpb.HardState) error { - return engine.MVCCPutProto(r.rm.Engine(), nil, engine.RaftStateKey(uint64(r.Desc.RaftID)), + return engine.MVCCPutProto(r.rm.Engine(), nil, engine.RaftStateKey(r.Desc.RaftID), proto.ZeroTimestamp, nil, &st) } diff --git a/storage/range_data_iter.go b/storage/range_data_iter.go index a7cd28a4b126..8239ae52a042 100644 --- a/storage/range_data_iter.go +++ b/storage/range_data_iter.go @@ -49,24 +49,12 @@ func newRangeDataIterator(r *Range, e engine.Engine) *rangeDataIterator { ri := &rangeDataIterator{ ranges: []keyRange{ { - start: engine.MVCCEncodeKey(responseCacheKeyPrefix(r.Desc.RaftID)), - end: engine.MVCCEncodeKey(responseCacheKeyPrefix(r.Desc.RaftID + 1)), + start: engine.MVCCEncodeKey(engine.MakeKey(engine.KeyLocalRangeIDPrefix, encoding.EncodeInt(nil, r.Desc.RaftID))), + end: engine.MVCCEncodeKey(engine.MakeKey(engine.KeyLocalRangeIDPrefix, encoding.EncodeInt(nil, r.Desc.RaftID+1))), }, { - start: engine.MVCCEncodeKey(engine.RangeDescriptorKey(r.Desc.StartKey)), - end: engine.MVCCEncodeKey(engine.RangeDescriptorKey(r.Desc.StartKey).Next()), - }, - { - start: engine.MVCCEncodeKey(engine.RangeScanMetadataKey(r.Desc.StartKey)), - end: engine.MVCCEncodeKey(engine.RangeScanMetadataKey(r.Desc.StartKey).Next()), - }, - { - start: engine.MVCCEncodeKey(engine.MakeKey(engine.KeyLocalRangeStatPrefix, encoding.EncodeInt(nil, r.Desc.RaftID))), - end: engine.MVCCEncodeKey(engine.MakeKey(engine.KeyLocalRangeStatPrefix, encoding.EncodeInt(nil, r.Desc.RaftID+1))), - }, - { - start: engine.MVCCEncodeKey(engine.TransactionKey(r.Desc.StartKey, []byte(nil))), - end: engine.MVCCEncodeKey(engine.TransactionKey(r.Desc.EndKey, []byte(nil))), + start: engine.MVCCEncodeKey(engine.MakeKey(engine.KeyLocalRangeKeyPrefix, encoding.EncodeBinary(nil, r.Desc.StartKey))), + end: engine.MVCCEncodeKey(engine.MakeKey(engine.KeyLocalRangeKeyPrefix, encoding.EncodeBinary(nil, r.Desc.EndKey))), }, { start: engine.MVCCEncodeKey(startKey), diff --git a/storage/range_data_iter_test.go b/storage/range_data_iter_test.go index 84dea5395c2d..660831841073 100644 --- a/storage/range_data_iter_test.go +++ b/storage/range_data_iter_test.go @@ -34,12 +34,15 @@ func createRangeData(r *Range, t *testing.T) []proto.EncodedKey { key proto.Key ts proto.Timestamp }{ - {responseCacheKey(r.Desc.RaftID, proto.ClientCmdID{WallTime: 1, Random: 1}), ts0}, - {responseCacheKey(r.Desc.RaftID, proto.ClientCmdID{WallTime: 2, Random: 2}), ts0}, + {engine.ResponseCacheKey(r.Desc.RaftID, &proto.ClientCmdID{WallTime: 1, Random: 1}), ts0}, + {engine.ResponseCacheKey(r.Desc.RaftID, &proto.ClientCmdID{WallTime: 2, Random: 2}), ts0}, + {engine.RaftLogKey(r.Desc.RaftID, 2), ts0}, + {engine.RaftLogKey(r.Desc.RaftID, 1), ts0}, + {engine.RaftStateKey(r.Desc.RaftID), ts0}, + {engine.RangeStatKey(r.Desc.RaftID, engine.StatKeyBytes), ts0}, + {engine.RangeStatKey(r.Desc.RaftID, engine.StatKeyCount), ts0}, {engine.RangeDescriptorKey(r.Desc.StartKey), ts}, {engine.RangeScanMetadataKey(r.Desc.StartKey), ts0}, - {engine.MakeRangeStatKey(r.Desc.RaftID, engine.StatKeyBytes), ts0}, - {engine.MakeRangeStatKey(r.Desc.RaftID, engine.StatKeyCount), ts0}, {engine.TransactionKey(r.Desc.StartKey, []byte("1234")), ts0}, {engine.TransactionKey(r.Desc.StartKey.Next(), []byte("5678")), ts0}, {engine.TransactionKey(r.Desc.EndKey.Prev(), []byte("2468")), ts0}, diff --git a/storage/response_cache.go b/storage/response_cache.go index 8fe7775f7c51..1fc367ca38ce 100644 --- a/storage/response_cache.go +++ b/storage/response_cache.go @@ -18,6 +18,7 @@ package storage import ( + "bytes" "fmt" "sync" @@ -45,8 +46,7 @@ func makeCmdIDKey(cmdID proto.ClientCmdID) cmdIDKey { // machine and the results are stored in the ResponseCache. // // The ResponseCache stores responses in the underlying engine, using -// keys derived from KeyLocalResponseCachePrefix, Raft ID and the -// ClientCmdID. +// keys derived from the Raft ID and the ClientCmdID. // // A ResponseCache is safe for concurrent access. type ResponseCache struct { @@ -82,7 +82,7 @@ func (rc *ResponseCache) ClearInflight() { // ClearData removes all items stored in the persistent cache. It does not alter // the inflight map. func (rc *ResponseCache) ClearData() error { - p := responseCacheKeyPrefix(rc.raftID) + p := engine.ResponseCacheKey(rc.raftID, nil) // prefix for all response cache entries with this raft ID end := p.PrefixEnd() _, err := engine.ClearRange(rc.engine, engine.MVCCEncodeKey(p), engine.MVCCEncodeKey(end)) return err @@ -119,7 +119,7 @@ func (rc *ResponseCache) GetResponse(cmdID proto.ClientCmdID, reply proto.Respon // If the response is in the cache or we experienced an error, return. rwResp := proto.ReadWriteCmdResponse{} - key := responseCacheKey(rc.raftID, cmdID) + key := engine.ResponseCacheKey(rc.raftID, &cmdID) if ok, err := engine.MVCCGetProto(rc.engine, key, proto.ZeroTimestamp, nil, &rwResp); ok || err != nil { rc.Lock() // Take lock after fetching response from cache. defer rc.Unlock() @@ -142,18 +142,18 @@ func (rc *ResponseCache) CopyInto(e engine.Engine, destRaftID int64) error { rc.Lock() defer rc.Unlock() - prefix := responseCacheKeyPrefix(rc.raftID) + prefix := engine.ResponseCacheKey(rc.raftID, nil) // response cache prefix start := engine.MVCCEncodeKey(prefix) end := engine.MVCCEncodeKey(prefix.PrefixEnd()) return rc.engine.Iterate(start, end, func(kv proto.RawKeyValue) (bool, error) { // Decode the key into a cmd, skipping on error. Otherwise, // write it to the corresponding key in the new cache. - cmdID, err := rc.decodeKey(kv.Key) + cmdID, err := rc.decodeResponseCacheKey(kv.Key) if err != nil { return false, util.Errorf("could not decode a response cache key %q: %s", kv.Key, err) } - encKey := engine.MVCCEncodeKey(responseCacheKey(destRaftID, cmdID)) + encKey := engine.MVCCEncodeKey(engine.ResponseCacheKey(destRaftID, &cmdID)) return false, e.Put(encKey, kv.Value) }) } @@ -164,18 +164,18 @@ func (rc *ResponseCache) CopyInto(e engine.Engine, destRaftID int64) error { // error. The copy is done directly using the engine instead of interpreting // values through MVCC for efficiency. func (rc *ResponseCache) CopyFrom(e engine.Engine, originRaftID int64) error { - prefix := responseCacheKeyPrefix(originRaftID) + prefix := engine.ResponseCacheKey(originRaftID, nil) // response cache prefix start := engine.MVCCEncodeKey(prefix) end := engine.MVCCEncodeKey(prefix.PrefixEnd()) return e.Iterate(start, end, func(kv proto.RawKeyValue) (bool, error) { // Decode the key into a cmd, skipping on error. Otherwise, // write it to the corresponding key in the new cache. - cmdID, err := rc.decodeKey(kv.Key) + cmdID, err := rc.decodeResponseCacheKey(kv.Key) if err != nil { return false, util.Errorf("could not decode a response cache key %q: %s", kv.Key, err) } - encKey := engine.MVCCEncodeKey(responseCacheKey(rc.raftID, cmdID)) + encKey := engine.MVCCEncodeKey(engine.ResponseCacheKey(rc.raftID, &cmdID)) return false, rc.engine.Put(encKey, kv.Value) }) } @@ -193,7 +193,7 @@ func (rc *ResponseCache) PutResponse(cmdID proto.ClientCmdID, reply proto.Respon // Write the response value to the engine. var err error if rc.shouldCacheResponse(reply) { - key := responseCacheKey(rc.raftID, cmdID) + key := engine.ResponseCacheKey(rc.raftID, &cmdID) rwResp := &proto.ReadWriteCmdResponse{} rwResp.SetValue(reply) err = engine.MVCCPutProto(rc.engine, nil, key, proto.ZeroTimestamp, nil, rwResp) @@ -243,40 +243,25 @@ func (rc *ResponseCache) removeInflightLocked(cmdID proto.ClientCmdID) { } } -// responseCacheKeyPrefix generates the prefix under which all entries -// for the given range are stored in the engine. -func responseCacheKeyPrefix(raftID int64) proto.Key { - b := append([]byte(nil), engine.KeyLocalResponseCachePrefix...) - return encoding.EncodeInt(b, raftID) -} - -// responseCacheKey encodes the Raft ID and client command ID into a -// key for storage in the underlying engine. Note that the prefix for -// response cache keys sorts them at the very top of the engine's -// keyspace. -func responseCacheKey(raftID int64, cmdID proto.ClientCmdID) proto.Key { - b := responseCacheKeyPrefix(raftID) - b = encoding.EncodeInt(b, cmdID.WallTime) // wall time helps sort for locality - b = encoding.EncodeInt(b, cmdID.Random) // TODO(spencer): encode as Fixed64 - return b -} - -func (rc *ResponseCache) decodeKey(encKey []byte) (proto.ClientCmdID, error) { +func (rc *ResponseCache) decodeResponseCacheKey(encKey proto.EncodedKey) (proto.ClientCmdID, error) { ret := proto.ClientCmdID{} key, _, isValue := engine.MVCCDecodeKey(encKey) if isValue { return ret, util.Errorf("key %q is not a raw MVCC value", encKey) } - minLen := len(engine.KeyLocalResponseCachePrefix) - if len(key) < minLen { - return ret, util.Errorf("key not long enough to be decoded: %q", key) + if !bytes.HasPrefix(key, engine.KeyLocalRangeIDPrefix) { + return ret, util.Errorf("key %q does not have %q prefix", key, engine.KeyLocalRangeIDPrefix) } - // First, Cut the prefix and the Raft ID. - b := key[minLen:] + // Cut the prefix and the Raft ID. + b := key[len(engine.KeyLocalRangeIDPrefix):] b, _ = encoding.DecodeInt(b) - // Second, read the wall time. + if !bytes.HasPrefix(b, engine.KeyLocalResponseCacheSuffix) { + return ret, util.Errorf("key %q does not contain the response cache suffix %q", key, engine.KeyLocalResponseCacheSuffix) + } + // Cut the response cache suffix. + b = b[len(engine.KeyLocalResponseCacheSuffix):] + // Now, decode the command ID. b, wt := encoding.DecodeInt(b) - // Third, read the Random component. b, rd := encoding.DecodeInt(b) if len(b) > 0 { return ret, util.Errorf("key %q has leftover bytes after decode: %q; indicates corrupt key", encKey, b) diff --git a/storage/response_cache_test.go b/storage/response_cache_test.go index d5c2344c9958..2933fcb8a0eb 100644 --- a/storage/response_cache_test.go +++ b/storage/response_cache_test.go @@ -92,7 +92,7 @@ func TestResponseCacheEmptyCmdID(t *testing.T) { // TestResponseCacheCopyInto tests that responses cached in one cache get // transferred correctly to another cache using CopyInto(). -func TestResposeCacheCopyInto(t *testing.T) { +func TestResponseCacheCopyInto(t *testing.T) { rc1, rc2 := createTestResponseCache(t, 1), createTestResponseCache(t, 2) cmdID := makeCmdID(1, 1) // Store an increment with new value one in the first cache. @@ -115,7 +115,7 @@ func TestResposeCacheCopyInto(t *testing.T) { // TestResponseCacheCopyFrom tests that responses cached in one cache get // transferred correctly to another cache using CopyFrom(). -func TestResposeCacheCopyFrom(t *testing.T) { +func TestResponseCacheCopyFrom(t *testing.T) { rc1, rc2 := createTestResponseCache(t, 1), createTestResponseCache(t, 2) cmdID := makeCmdID(1, 1) // Store an increment with new value one in the first cache. diff --git a/storage/store.go b/storage/store.go index e29945cfd062..b33524c5cf7d 100644 --- a/storage/store.go +++ b/storage/store.go @@ -46,10 +46,6 @@ const ( GCResponseCacheExpiration = 1 * time.Hour // raftIDAllocCount is the number of Raft IDs to allocate per allocation. raftIDAllocCount = 10 - // uuidLength is the length of a UUID string, used to allot extra - // key length to transaction records, which have a UUID appended. - // UUID has the format "759b7562-d2c8-4977-a949-22d8084dade2". - uuidLength = 36 // defaultScanInterval is the default value for the scan interval // command line flag. defaultScanInterval = 10 * time.Minute @@ -76,15 +72,9 @@ var ( // special case for both key-local AND meta1 or meta2 addressing prefixes. func verifyKeyLength(key proto.Key) error { maxLength := engine.KeyMaxLength - if bytes.HasPrefix(key, engine.KeyLocalTransactionPrefix) { - key = key[engine.KeyLocalPrefixLength:] - var remaining []byte - remaining, key = encoding.DecodeBinary(key) - if len(remaining) > uuidLength { - return util.Errorf("maximum uuid length in txn key exceeded: len(%s) > %d", remaining, uuidLength) - } - } else if bytes.HasPrefix(key, engine.KeyLocalPrefix) { - key = key[engine.KeyLocalPrefixLength:] + if bytes.HasPrefix(key, engine.KeyLocalRangeKeyPrefix) { + key = key[len(engine.KeyLocalRangeKeyPrefix):] + _, key = encoding.DecodeBinary(key) } if bytes.HasPrefix(key, engine.KeyMetaPrefix) { key = key[len(engine.KeyMeta1Prefix):] @@ -307,15 +297,16 @@ func (s *Store) Start() error { s.engine.SetGCTimeouts(minTxnTS, minRCacheTS) // Read store ident and return a not-bootstrapped error if necessary. - ok, err := engine.MVCCGetProto(s.engine, engine.KeyLocalIdent, proto.ZeroTimestamp, nil, &s.Ident) + ok, err := engine.MVCCGetProto(s.engine, engine.StoreIdentKey(), proto.ZeroTimestamp, nil, &s.Ident) if err != nil { return err } else if !ok { return &NotBootstrappedError{} } - start := engine.KeyLocalRangeDescriptorPrefix - end := start.PrefixEnd() + // Iterator over all range-local key-based data. + start := engine.RangeDescriptorKey(engine.KeyMin) + end := engine.RangeDescriptorKey(engine.KeyMax) s.raft = newSingleNodeRaft(s) // Start Raft processing goroutine. @@ -326,6 +317,11 @@ func (s *Store) Start() error { // split crashing halfway will simply be resolved on the next split // attempt. They can otherwise be ignored. if err := engine.MVCCIterateCommitted(s.engine, start, end, func(kv proto.KeyValue) (bool, error) { + // Only consider range metadata entries; ignore others. + _, suffix, _ := engine.DecodeRangeKey(kv.Key) + if !suffix.Equal(engine.KeyLocalRangeDescriptorSuffix) { + return false, nil + } var desc proto.RangeDescriptor if err := gogoproto.Unmarshal(kv.Value.Bytes, &desc); err != nil { return false, err @@ -458,7 +454,7 @@ func (s *Store) Bootstrap(ident proto.StoreIdent) error { } else if len(kvs) > 0 { return util.Errorf("non-empty engine %s (first key: %q)", s.engine, kvs[0].Key) } - err = engine.MVCCPutProto(s.engine, nil, engine.KeyLocalIdent, proto.ZeroTimestamp, nil, &s.Ident) + err = engine.MVCCPutProto(s.engine, nil, engine.StoreIdentKey(), proto.ZeroTimestamp, nil, &s.Ident) return err } diff --git a/storage/store_test.go b/storage/store_test.go index 1f09d3baf6cf..e633afe35d06 100644 --- a/storage/store_test.go +++ b/storage/store_test.go @@ -25,7 +25,6 @@ import ( "log" "math" "sort" - "strings" "testing" "time" @@ -401,18 +400,22 @@ func TestStoreVerifyKeys(t *testing.T) { if err := store.ExecuteCmd(proto.Put, pArgs, pReply); err != nil { t.Fatalf("unexpected error on put to meta2 value: %s", err) } - // Try a put to txn record for a meta2 key. + // Try to put a range descriptor record for a start key which is + // maximum length. + key := append([]byte{}, engine.KeyMax...) + key[len(key)-1] = 0x01 + pArgs, pReply = putArgs(engine.RangeDescriptorKey(key), []byte("value"), 1, store.StoreID()) + if err := store.ExecuteCmd(proto.Put, pArgs, pReply); err != nil { + t.Fatalf("unexpected error on put to range descriptor for KeyMax value: %s", err) + } + // Try a put to txn record for a meta2 key (note that this doesn't + // actually happen in practice, as txn records are not put directly, + // but are instead manipulated only through txn methods). pArgs, pReply = putArgs(engine.TransactionKey(meta2KeyMax, []byte(uuid.New())), []byte("value"), 1, store.StoreID()) if err := store.ExecuteCmd(proto.Put, pArgs, pReply); err != nil { t.Fatalf("unexpected error on put to txn meta2 value: %s", err) } - // Verify a UUID on txn key which is too long. - pArgs, pReply = putArgs(engine.TransactionKey(engine.KeyMax, []byte(strings.Repeat("x", 37))), - []byte("value"), 1, store.StoreID()) - if err := store.ExecuteCmd(proto.Put, pArgs, pReply); err == nil { - t.Fatalf("expected error on put to txn key with extra character in uuid") - } } // TestStoreExecuteCmdUpdateTime verifies that the node clock is updated. diff --git a/util/encoding/key_encoding.go b/util/encoding/key_encoding.go index cc49bddbba06..1af57772145b 100644 --- a/util/encoding/key_encoding.go +++ b/util/encoding/key_encoding.go @@ -173,7 +173,7 @@ func DecodeBinary(buf []byte) ([]byte, []byte) { } t = (buf[i] << (8 - s)) & 0xff - + if buf[i] == orderedEncodingTerminator { break } From e06091d13f8239165b91617bb4d507f30c17b246 Mon Sep 17 00:00:00 2001 From: Spencer Kimball Date: Fri, 16 Jan 2015 17:14:50 -0500 Subject: [PATCH 2/2] Respond to Ben's comments. --- storage/engine/keys.go | 2 +- storage/store_split_test.go | 17 +++++++---------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/storage/engine/keys.go b/storage/engine/keys.go index 07e6a9830029..4bf5e785a2d6 100644 --- a/storage/engine/keys.go +++ b/storage/engine/keys.go @@ -62,7 +62,7 @@ func RaftLogKey(raftID int64, logIndex uint64) proto.Key { // RaftLogPrefix returns the system-local prefix shared by all entries in a Raft log. func RaftLogPrefix(raftID int64) proto.Key { - return MakeKey(KeyLocalRangeIDPrefix, encoding.EncodeInt(nil, raftID), KeyLocalRaftLogSuffix) + return MakeRangeIDKey(raftID, KeyLocalRaftLogSuffix, proto.Key{}) } // RaftStateKey returns a system-local key for a Raft HardState. diff --git a/storage/store_split_test.go b/storage/store_split_test.go index 59aabe95a618..95f41477ec7a 100644 --- a/storage/store_split_test.go +++ b/storage/store_split_test.go @@ -32,7 +32,6 @@ import ( "github.com/cockroachdb/cockroach/storage/engine" "github.com/cockroachdb/cockroach/util" "github.com/cockroachdb/cockroach/util/log" - gogoproto "github.com/gogo/protobuf/proto" ) func adminSplitArgs(key, splitKey []byte, raftID int64, storeID int32) (*proto.AdminSplitRequest, *proto.AdminSplitResponse) { @@ -405,15 +404,13 @@ func TestStoreRangeSplitOnConfigs(t *testing.T) { acctConfig := &proto.AcctConfig{} zoneConfig := &proto.ZoneConfig{} - // Write accounting configs for db1 & db2 and zone configs for db3 & db4. - for _, k := range []string{"db4", "db3", "db2", "db1"} { - prefix := engine.KeyConfigAccountingPrefix - var config gogoproto.Message = acctConfig - if k == "db3" || k == "db4" { - prefix = engine.KeyConfigZonePrefix - config = zoneConfig - } - store.DB().PreparePutProto(engine.MakeKey(prefix, proto.Key(k)), config) + // Write zone configs for db3 & db4. + for _, k := range []string{"db4", "db3"} { + store.DB().PreparePutProto(engine.MakeKey(engine.KeyConfigZonePrefix, proto.Key(k)), zoneConfig) + } + // Write accounting configs for db1 & db2. + for _, k := range []string{"db2", "db1"} { + store.DB().PreparePutProto(engine.MakeKey(engine.KeyConfigAccountingPrefix, proto.Key(k)), acctConfig) } if err := store.DB().Flush(); err != nil { t.Fatal(err)