Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganized all local keys into two sections/range: by ID & by Key. #264

Merged
merged 2 commits into from
Jan 16, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,3 @@
utilized set may have rebalances in effect.

* Cleanup proto files to adhere to proto capitalization instead of go's.

* Consider moving all local keys into two sections, each prefixed by either
the Raft ID of the range or the start key of the range. This will allow
a less error-prone iteration over the data for a range, instead of having
to include each section of local data separately.
2 changes: 1 addition & 1 deletion roachlib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Author: Andrew Bonventre ([email protected])

ROACH_LIB := libroach.a
SOURCES := db.cc
SOURCES := db.cc encoding.cc
OBJECTS := $(SOURCES:.cc=.o)

CXXFLAGS += -std=c++11 -I../proto/lib -I../_vendor/rocksdb/include
Expand Down
77 changes: 53 additions & 24 deletions roachlib/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "data.pb.h"
#include "internal.pb.h"
#include "db.h"
#include "encoding.h"

extern "C" {

Expand All @@ -53,6 +54,13 @@ struct DBSnapshot {

namespace {

// NOTE: these constants must be kept in sync with the values
// in storage/engine/keys.go.
const rocksdb::Slice kKeyLocalRangeIDPrefix("\x00\x00\x00i", 4);
const rocksdb::Slice kKeyLocalRangeKeyPrefix("\x00\x00\x00k", 4);
const rocksdb::Slice kKeyLocalResponseCacheSuffix("res-");
const rocksdb::Slice kKeyLocalTransactionSuffix("txn-");

const DBStatus kSuccess = { NULL, 0 };

std::string ToString(DBSlice s) {
Expand Down Expand Up @@ -141,16 +149,47 @@ const proto::ResponseHeader* GetResponseHeader(const proto::ReadWriteCmdResponse
// all ranges in the map.
class DBCompactionFilter : public rocksdb::CompactionFilter {
public:
DBCompactionFilter(const std::string& txn_prefix,
const std::string& rcache_prefix,
int64_t min_txn_ts,
DBCompactionFilter(int64_t min_txn_ts,
int64_t min_rcache_ts)
: txn_prefix_(txn_prefix),
rcache_prefix_(rcache_prefix),
min_txn_ts_(min_txn_ts),
: min_txn_ts_(min_txn_ts),
min_rcache_ts_(min_rcache_ts) {
}

// IsKeyOfType determines whether key, when binary-decoded, matches
// the format: <prefix>[enc-value]\x00<suffix>[remainder].
bool IsKeyOfType(const rocksdb::Slice& key, const rocksdb::Slice& prefix, const rocksdb::Slice& suffix) const {
std::string decStr;
if (!DecodeBinary(key, &decStr, NULL)) {
return false;
}
rocksdb::Slice decKey(decStr);
if (!decKey.starts_with(prefix)) {
return false;
}
decKey.remove_prefix(prefix.size());

// Remove bytes up to including the first null byte.
int i = 0;
for (; i < decKey.size(); i++) {
if (decKey[i] == 0x0) {
break;
}
}
if (i == decKey.size()) {
return false;
}
decKey.remove_prefix(i+1);
return decKey.starts_with(suffix);
}

bool IsResponseCacheEntry(const rocksdb::Slice& key) const {
return IsKeyOfType(key, kKeyLocalRangeIDPrefix, kKeyLocalResponseCacheSuffix);
}

bool IsTransactionRecord(const rocksdb::Slice& key) const {
return IsKeyOfType(key, kKeyLocalRangeKeyPrefix, kKeyLocalTransactionSuffix);
}

virtual bool Filter(int level,
const rocksdb::Slice& key,
const rocksdb::Slice& existing_value,
Expand All @@ -159,7 +198,9 @@ class DBCompactionFilter : public rocksdb::CompactionFilter {
*value_changed = false;

// Only filter response cache entries and transaction rows.
if (!key.starts_with(rcache_prefix_) && !key.starts_with(txn_prefix_)) {
bool is_rcache = IsResponseCacheEntry(key);
bool is_txn = IsTransactionRecord(key);
if (!is_rcache && !is_txn) {
return false;
}
// Parse MVCC metadata for inlined value.
Expand All @@ -174,7 +215,7 @@ class DBCompactionFilter : public rocksdb::CompactionFilter {
}
// Response cache rows are GC'd if their timestamp is older than the
// response cache GC timeout.
if (key.starts_with(rcache_prefix_)) {
if (is_rcache) {
proto::ReadWriteCmdResponse rwResp;
if (!rwResp.ParseFromArray(meta.value().bytes().data(), meta.value().bytes().size())) {
// *error_msg = (char*)"failed to parse response cache entry";
Expand All @@ -188,7 +229,7 @@ class DBCompactionFilter : public rocksdb::CompactionFilter {
if (header->timestamp().wall_time() <= min_rcache_ts_) {
return true;
}
} else if (key.starts_with(txn_prefix_)) {
} else if (is_txn) {
// Transaction rows are GC'd if their timestamp is older than
// the system-wide minimum write intent timestamp. This
// system-wide minimum write intent is periodically computed via
Expand All @@ -210,26 +251,19 @@ class DBCompactionFilter : public rocksdb::CompactionFilter {
}

private:
const std::string txn_prefix_;
const std::string rcache_prefix_;
const int64_t min_txn_ts_;
const int64_t min_rcache_ts_;
};

class DBCompactionFilterFactory : public rocksdb::CompactionFilterFactory {
public:
DBCompactionFilterFactory(const std::string& txn_prefix,
const std::string& rcache_prefix)
: txn_prefix_(txn_prefix),
rcache_prefix_(rcache_prefix) {
}
DBCompactionFilterFactory() {}

virtual std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
const rocksdb::CompactionFilter::Context& context) override {
google::protobuf::MutexLock l(&mu_); // Protect access to gc timeouts.
return std::unique_ptr<rocksdb::CompactionFilter>(
new DBCompactionFilter(txn_prefix_, rcache_prefix_,
min_txn_ts_, min_rcache_ts_));
new DBCompactionFilter(min_txn_ts_, min_rcache_ts_));
}

virtual const char* Name() const override {
Expand All @@ -243,9 +277,6 @@ class DBCompactionFilterFactory : public rocksdb::CompactionFilterFactory {
}

private:
const std::string txn_prefix_;
const std::string rcache_prefix_;

google::protobuf::Mutex mu_; // Protects values below.
int64_t min_txn_ts_;
int64_t min_rcache_ts_;
Expand Down Expand Up @@ -592,9 +623,7 @@ DBStatus DBOpen(DBEngine **db, DBSlice dir, DBOptions db_opts) {
rocksdb::Options options;
options.block_cache = rocksdb::NewLRUCache(db_opts.cache_size);
options.allow_os_buffer = db_opts.allow_os_buffer;
options.compaction_filter_factory.reset(new DBCompactionFilterFactory(
ToString(db_opts.txn_prefix),
ToString(db_opts.rcache_prefix)));
options.compaction_filter_factory.reset(new DBCompactionFilterFactory());
options.create_if_missing = true;
options.info_log.reset(new DBLogger(db_opts.logger));
options.merge_operator.reset(new DBMergeOperator);
Expand Down
4 changes: 0 additions & 4 deletions roachlib/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ typedef void (*DBLoggerFunc)(void* state, const char* str, int len);
typedef struct {
int64_t cache_size;
int allow_os_buffer;
// The key prefix for transaction keys.
DBSlice txn_prefix;
// The key prefix for response cache keys.
DBSlice rcache_prefix;
// A function pointer to direct log messages to.
DBLoggerFunc logger;
} DBOptions;
Expand Down
75 changes: 75 additions & 0 deletions roachlib/encoding.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Spencer Kimball ([email protected])

#include "rocksdb/slice.h"

namespace {

const unsigned char kOrderedEncodingBinary = 0x25;
const unsigned char kOrderedEncodingTerminator = 0x00;

}

bool DecodeBinary(const rocksdb::Slice& buf, std::string* decoded, std::string* remainder) {
if (buf[0] != kOrderedEncodingBinary) {
fprintf(stderr, "%s doesn't begin with binary encoding byte\n", buf.ToString().c_str());
return false;
}
decoded->clear();
int s = 6;
int i = 1;
if (buf[i] == kOrderedEncodingTerminator) {
if (remainder != NULL) {
rocksdb::Slice remSlice(buf);
remSlice.remove_prefix(2);
*remainder = remSlice.ToString();
}
return true;
}

int t = (buf[i] << 1) & 0xff;
for (i = 2; buf[i] != kOrderedEncodingTerminator; i++) {
if (s == 7) {
decoded->push_back(t | (buf[i] & 0x7f));
i++;
} else {
decoded->push_back(t | ((buf[i] & 0x7f) >> s));
}

t = (buf[i] << (8 - s)) & 0xff;

if (buf[i] == kOrderedEncodingTerminator) {
break;
}

if (s == 1) {
s = 7;
} else {
s--;
}
}
if (t != 0) {
fprintf(stderr, "%s doesn't begin with binary encoding byte\n", buf.ToString().c_str());
return false;
}
if (remainder != NULL) {
rocksdb::Slice remSlice(buf);
remSlice.remove_prefix(i+1);
*remainder = remSlice.ToString();
}
return true;
}
33 changes: 33 additions & 0 deletions roachlib/encoding.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Spencer Kimball ([email protected])

#ifndef ROACHLIB_ENCODING_H
#define ROACHLIB_ENCODING_H

#include <stdint.h>

// DecodeBinary decodes the given key-encoded buf slice, returning
// true on a successful decode. The the unencoded bytes are returned
// in *decoded, and if not NULL, any remaining bytes are returned in
// *remainder.
bool DecodeBinary(const rocksdb::Slice& buf, std::string* decoded, std::string* remainder);

#endif // ROACHLIB_ENCODING_H

// local variables:
// mode: c++
// end:
Loading