Skip to content

Commit

Permalink
Merge pull request #264 from cockroachdb/spencerkimball/local-keyspace
Browse files Browse the repository at this point in the history
Reorganized all local keys into two sections/range: by ID & by Key.
  • Loading branch information
spencerkimball committed Jan 16, 2015
2 parents 3afbfc7 + e06091d commit a69a950
Show file tree
Hide file tree
Showing 20 changed files with 462 additions and 291 deletions.
5 changes: 0 additions & 5 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,3 @@
utilized set may have rebalances in effect.

* Cleanup proto files to adhere to proto capitalization instead of go's.

* Consider moving all local keys into two sections, each prefixed by either
the Raft ID of the range or the start key of the range. This will allow
a less error-prone iteration over the data for a range, instead of having
to include each section of local data separately.
2 changes: 1 addition & 1 deletion roachlib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Author: Andrew Bonventre ([email protected])

ROACH_LIB := libroach.a
SOURCES := db.cc
SOURCES := db.cc encoding.cc
OBJECTS := $(SOURCES:.cc=.o)

CXXFLAGS += -std=c++11 -I../proto/lib -I../_vendor/rocksdb/include
Expand Down
77 changes: 53 additions & 24 deletions roachlib/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "data.pb.h"
#include "internal.pb.h"
#include "db.h"
#include "encoding.h"

extern "C" {

Expand All @@ -53,6 +54,13 @@ struct DBSnapshot {

namespace {

// NOTE: these constants must be kept in sync with the values
// in storage/engine/keys.go.
const rocksdb::Slice kKeyLocalRangeIDPrefix("\x00\x00\x00i", 4);
const rocksdb::Slice kKeyLocalRangeKeyPrefix("\x00\x00\x00k", 4);
const rocksdb::Slice kKeyLocalResponseCacheSuffix("res-");
const rocksdb::Slice kKeyLocalTransactionSuffix("txn-");

const DBStatus kSuccess = { NULL, 0 };

std::string ToString(DBSlice s) {
Expand Down Expand Up @@ -141,16 +149,47 @@ const proto::ResponseHeader* GetResponseHeader(const proto::ReadWriteCmdResponse
// all ranges in the map.
class DBCompactionFilter : public rocksdb::CompactionFilter {
public:
DBCompactionFilter(const std::string& txn_prefix,
const std::string& rcache_prefix,
int64_t min_txn_ts,
DBCompactionFilter(int64_t min_txn_ts,
int64_t min_rcache_ts)
: txn_prefix_(txn_prefix),
rcache_prefix_(rcache_prefix),
min_txn_ts_(min_txn_ts),
: min_txn_ts_(min_txn_ts),
min_rcache_ts_(min_rcache_ts) {
}

// IsKeyOfType determines whether key, when binary-decoded, matches
// the format: <prefix>[enc-value]\x00<suffix>[remainder].
bool IsKeyOfType(const rocksdb::Slice& key, const rocksdb::Slice& prefix, const rocksdb::Slice& suffix) const {
std::string decStr;
if (!DecodeBinary(key, &decStr, NULL)) {
return false;
}
rocksdb::Slice decKey(decStr);
if (!decKey.starts_with(prefix)) {
return false;
}
decKey.remove_prefix(prefix.size());

// Remove bytes up to including the first null byte.
int i = 0;
for (; i < decKey.size(); i++) {
if (decKey[i] == 0x0) {
break;
}
}
if (i == decKey.size()) {
return false;
}
decKey.remove_prefix(i+1);
return decKey.starts_with(suffix);
}

bool IsResponseCacheEntry(const rocksdb::Slice& key) const {
return IsKeyOfType(key, kKeyLocalRangeIDPrefix, kKeyLocalResponseCacheSuffix);
}

bool IsTransactionRecord(const rocksdb::Slice& key) const {
return IsKeyOfType(key, kKeyLocalRangeKeyPrefix, kKeyLocalTransactionSuffix);
}

virtual bool Filter(int level,
const rocksdb::Slice& key,
const rocksdb::Slice& existing_value,
Expand All @@ -159,7 +198,9 @@ class DBCompactionFilter : public rocksdb::CompactionFilter {
*value_changed = false;

// Only filter response cache entries and transaction rows.
if (!key.starts_with(rcache_prefix_) && !key.starts_with(txn_prefix_)) {
bool is_rcache = IsResponseCacheEntry(key);
bool is_txn = IsTransactionRecord(key);
if (!is_rcache && !is_txn) {
return false;
}
// Parse MVCC metadata for inlined value.
Expand All @@ -174,7 +215,7 @@ class DBCompactionFilter : public rocksdb::CompactionFilter {
}
// Response cache rows are GC'd if their timestamp is older than the
// response cache GC timeout.
if (key.starts_with(rcache_prefix_)) {
if (is_rcache) {
proto::ReadWriteCmdResponse rwResp;
if (!rwResp.ParseFromArray(meta.value().bytes().data(), meta.value().bytes().size())) {
// *error_msg = (char*)"failed to parse response cache entry";
Expand All @@ -188,7 +229,7 @@ class DBCompactionFilter : public rocksdb::CompactionFilter {
if (header->timestamp().wall_time() <= min_rcache_ts_) {
return true;
}
} else if (key.starts_with(txn_prefix_)) {
} else if (is_txn) {
// Transaction rows are GC'd if their timestamp is older than
// the system-wide minimum write intent timestamp. This
// system-wide minimum write intent is periodically computed via
Expand All @@ -210,26 +251,19 @@ class DBCompactionFilter : public rocksdb::CompactionFilter {
}

private:
const std::string txn_prefix_;
const std::string rcache_prefix_;
const int64_t min_txn_ts_;
const int64_t min_rcache_ts_;
};

class DBCompactionFilterFactory : public rocksdb::CompactionFilterFactory {
public:
DBCompactionFilterFactory(const std::string& txn_prefix,
const std::string& rcache_prefix)
: txn_prefix_(txn_prefix),
rcache_prefix_(rcache_prefix) {
}
DBCompactionFilterFactory() {}

virtual std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
const rocksdb::CompactionFilter::Context& context) override {
google::protobuf::MutexLock l(&mu_); // Protect access to gc timeouts.
return std::unique_ptr<rocksdb::CompactionFilter>(
new DBCompactionFilter(txn_prefix_, rcache_prefix_,
min_txn_ts_, min_rcache_ts_));
new DBCompactionFilter(min_txn_ts_, min_rcache_ts_));
}

virtual const char* Name() const override {
Expand All @@ -243,9 +277,6 @@ class DBCompactionFilterFactory : public rocksdb::CompactionFilterFactory {
}

private:
const std::string txn_prefix_;
const std::string rcache_prefix_;

google::protobuf::Mutex mu_; // Protects values below.
int64_t min_txn_ts_;
int64_t min_rcache_ts_;
Expand Down Expand Up @@ -592,9 +623,7 @@ DBStatus DBOpen(DBEngine **db, DBSlice dir, DBOptions db_opts) {
rocksdb::Options options;
options.block_cache = rocksdb::NewLRUCache(db_opts.cache_size);
options.allow_os_buffer = db_opts.allow_os_buffer;
options.compaction_filter_factory.reset(new DBCompactionFilterFactory(
ToString(db_opts.txn_prefix),
ToString(db_opts.rcache_prefix)));
options.compaction_filter_factory.reset(new DBCompactionFilterFactory());
options.create_if_missing = true;
options.info_log.reset(new DBLogger(db_opts.logger));
options.merge_operator.reset(new DBMergeOperator);
Expand Down
4 changes: 0 additions & 4 deletions roachlib/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ typedef void (*DBLoggerFunc)(void* state, const char* str, int len);
typedef struct {
int64_t cache_size;
int allow_os_buffer;
// The key prefix for transaction keys.
DBSlice txn_prefix;
// The key prefix for response cache keys.
DBSlice rcache_prefix;
// A function pointer to direct log messages to.
DBLoggerFunc logger;
} DBOptions;
Expand Down
75 changes: 75 additions & 0 deletions roachlib/encoding.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Spencer Kimball ([email protected])

#include "rocksdb/slice.h"

namespace {

const unsigned char kOrderedEncodingBinary = 0x25;
const unsigned char kOrderedEncodingTerminator = 0x00;

}

bool DecodeBinary(const rocksdb::Slice& buf, std::string* decoded, std::string* remainder) {
if (buf[0] != kOrderedEncodingBinary) {
fprintf(stderr, "%s doesn't begin with binary encoding byte\n", buf.ToString().c_str());
return false;
}
decoded->clear();
int s = 6;
int i = 1;
if (buf[i] == kOrderedEncodingTerminator) {
if (remainder != NULL) {
rocksdb::Slice remSlice(buf);
remSlice.remove_prefix(2);
*remainder = remSlice.ToString();
}
return true;
}

int t = (buf[i] << 1) & 0xff;
for (i = 2; buf[i] != kOrderedEncodingTerminator; i++) {
if (s == 7) {
decoded->push_back(t | (buf[i] & 0x7f));
i++;
} else {
decoded->push_back(t | ((buf[i] & 0x7f) >> s));
}

t = (buf[i] << (8 - s)) & 0xff;

if (buf[i] == kOrderedEncodingTerminator) {
break;
}

if (s == 1) {
s = 7;
} else {
s--;
}
}
if (t != 0) {
fprintf(stderr, "%s doesn't begin with binary encoding byte\n", buf.ToString().c_str());
return false;
}
if (remainder != NULL) {
rocksdb::Slice remSlice(buf);
remSlice.remove_prefix(i+1);
*remainder = remSlice.ToString();
}
return true;
}
33 changes: 33 additions & 0 deletions roachlib/encoding.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Spencer Kimball ([email protected])

#ifndef ROACHLIB_ENCODING_H
#define ROACHLIB_ENCODING_H

#include <stdint.h>

// DecodeBinary decodes the given key-encoded buf slice, returning
// true on a successful decode. The the unencoded bytes are returned
// in *decoded, and if not NULL, any remaining bytes are returned in
// *remainder.
bool DecodeBinary(const rocksdb::Slice& buf, std::string* decoded, std::string* remainder);

#endif // ROACHLIB_ENCODING_H

// local variables:
// mode: c++
// end:
Loading

0 comments on commit a69a950

Please sign in to comment.