-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
db.cc
2586 lines (2251 loc) · 90.7 KB
/
db.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
#include "db.h"
#include <algorithm>
#include <google/protobuf/stubs/stringprintf.h>
#include <mutex>
#include <rocksdb/cache.h>
#include <rocksdb/convenience.h>
#include <rocksdb/db.h>
#include <rocksdb/env.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/ldb_tool.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/options.h>
#include <rocksdb/slice_transform.h>
#include <rocksdb/sst_file_writer.h>
#include <rocksdb/statistics.h>
#include <rocksdb/table.h>
#include <rocksdb/utilities/write_batch_with_index.h>
#include "encoding.h"
#include "env_switching.h"
#include "eventlistener.h"
#include "keys.h"
#include "protos/roachpb/data.pb.h"
#include "protos/roachpb/internal.pb.h"
#include "protos/storage/engine/enginepb/mvcc.pb.h"
#include "protos/storage/engine/enginepb/rocksdb.pb.h"
extern "C" {
static void __attribute__((noreturn)) die_missing_symbol(const char* name) {
fprintf(stderr, "%s symbol missing; expected to be supplied by Go\n", name);
abort();
}
// These are Go functions exported by storage/engine. We provide these stubs,
// which simply panic if called, to to allow intermediate build products to link
// successfully. Otherwise, when building ccl/storageccl/engineccl, Go will
// complain that these symbols are undefined. Because these stubs are marked
// "weak", they will be replaced by their proper implementation in
// storage/engine when the final cockroach binary is linked.
void __attribute__((weak)) rocksDBLog(char*, int) { die_missing_symbol(__func__); }
char* __attribute__((weak)) prettyPrintKey(DBKey) { die_missing_symbol(__func__); }
} // extern "C"
#if defined(COMPILER_GCC) || defined(__clang__)
#define WARN_UNUSED_RESULT __attribute__((warn_unused_result))
#else
#define WARN_UNUSED_RESULT
#endif
struct DBCache {
std::mutex mu;
std::shared_ptr<rocksdb::Cache> rep;
};
struct DBEngine {
rocksdb::DB* const rep;
DBEngine(rocksdb::DB* r) : rep(r) {}
virtual ~DBEngine() {}
virtual DBStatus Put(DBKey key, DBSlice value) = 0;
virtual DBStatus Merge(DBKey key, DBSlice value) = 0;
virtual DBStatus Delete(DBKey key) = 0;
virtual DBStatus DeleteRange(DBKey start, DBKey end) = 0;
virtual DBStatus CommitBatch(bool sync) = 0;
virtual DBStatus ApplyBatchRepr(DBSlice repr, bool sync) = 0;
virtual DBSlice BatchRepr() = 0;
virtual DBStatus Get(DBKey key, DBString* value) = 0;
virtual DBIterator* NewIter(rocksdb::ReadOptions*) = 0;
virtual DBStatus GetStats(DBStatsResult* stats) = 0;
virtual DBString GetCompactionStats() = 0;
virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents) = 0;
DBSSTable* GetSSTables(int* n);
DBString GetUserProperties();
};
struct DBImpl : public DBEngine {
std::unique_ptr<rocksdb::Env> switching_env;
std::unique_ptr<rocksdb::Env> memenv;
std::unique_ptr<rocksdb::DB> rep_deleter;
std::shared_ptr<rocksdb::Cache> block_cache;
std::shared_ptr<DBEventListener> event_listener;
// Construct a new DBImpl from the specified DB.
// The DB and passed Envs will be deleted when the DBImpl is deleted.
// Either env can be NULL.
DBImpl(rocksdb::DB* r, rocksdb::Env* m, std::shared_ptr<rocksdb::Cache> bc,
std::shared_ptr<DBEventListener> event_listener, rocksdb::Env* s_env)
: DBEngine(r), switching_env(s_env), memenv(m), rep_deleter(r), block_cache(bc), event_listener(event_listener) {}
virtual ~DBImpl() {
const rocksdb::Options& opts = rep->GetOptions();
const std::shared_ptr<rocksdb::Statistics>& s = opts.statistics;
rocksdb::Info(opts.info_log, "bloom filter utility: %0.1f%%",
(100.0 * s->getTickerCount(rocksdb::BLOOM_FILTER_PREFIX_USEFUL)) /
s->getTickerCount(rocksdb::BLOOM_FILTER_PREFIX_CHECKED));
}
virtual DBStatus Put(DBKey key, DBSlice value);
virtual DBStatus Merge(DBKey key, DBSlice value);
virtual DBStatus Delete(DBKey key);
virtual DBStatus DeleteRange(DBKey start, DBKey end);
virtual DBStatus CommitBatch(bool sync);
virtual DBStatus ApplyBatchRepr(DBSlice repr, bool sync);
virtual DBSlice BatchRepr();
virtual DBStatus Get(DBKey key, DBString* value);
virtual DBIterator* NewIter(rocksdb::ReadOptions*);
virtual DBStatus GetStats(DBStatsResult* stats);
virtual DBString GetCompactionStats();
virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents);
};
struct DBBatch : public DBEngine {
int updates;
bool has_delete_range;
rocksdb::WriteBatchWithIndex batch;
DBBatch(DBEngine* db);
virtual ~DBBatch() {}
virtual DBStatus Put(DBKey key, DBSlice value);
virtual DBStatus Merge(DBKey key, DBSlice value);
virtual DBStatus Delete(DBKey key);
virtual DBStatus DeleteRange(DBKey start, DBKey end);
virtual DBStatus CommitBatch(bool sync);
virtual DBStatus ApplyBatchRepr(DBSlice repr, bool sync);
virtual DBSlice BatchRepr();
virtual DBStatus Get(DBKey key, DBString* value);
virtual DBIterator* NewIter(rocksdb::ReadOptions*);
virtual DBStatus GetStats(DBStatsResult* stats);
virtual DBString GetCompactionStats();
virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents);
};
struct DBWriteOnlyBatch : public DBEngine {
int updates;
rocksdb::WriteBatch batch;
DBWriteOnlyBatch(DBEngine* db);
virtual ~DBWriteOnlyBatch() {}
virtual DBStatus Put(DBKey key, DBSlice value);
virtual DBStatus Merge(DBKey key, DBSlice value);
virtual DBStatus Delete(DBKey key);
virtual DBStatus DeleteRange(DBKey start, DBKey end);
virtual DBStatus CommitBatch(bool sync);
virtual DBStatus ApplyBatchRepr(DBSlice repr, bool sync);
virtual DBSlice BatchRepr();
virtual DBStatus Get(DBKey key, DBString* value);
virtual DBIterator* NewIter(rocksdb::ReadOptions*);
virtual DBStatus GetStats(DBStatsResult* stats);
virtual DBString GetCompactionStats();
virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents);
};
struct DBSnapshot : public DBEngine {
const rocksdb::Snapshot* snapshot;
DBSnapshot(DBEngine* db) : DBEngine(db->rep), snapshot(db->rep->GetSnapshot()) {}
virtual ~DBSnapshot() { rep->ReleaseSnapshot(snapshot); }
virtual DBStatus Put(DBKey key, DBSlice value);
virtual DBStatus Merge(DBKey key, DBSlice value);
virtual DBStatus Delete(DBKey key);
virtual DBStatus DeleteRange(DBKey start, DBKey end);
virtual DBStatus CommitBatch(bool sync);
virtual DBStatus ApplyBatchRepr(DBSlice repr, bool sync);
virtual DBSlice BatchRepr();
virtual DBStatus Get(DBKey key, DBString* value);
virtual DBIterator* NewIter(rocksdb::ReadOptions*);
virtual DBStatus GetStats(DBStatsResult* stats);
virtual DBString GetCompactionStats();
virtual DBStatus EnvWriteFile(DBSlice path, DBSlice contents);
};
struct DBIterator {
std::unique_ptr<rocksdb::Iterator> rep;
};
const DBStatus kSuccess = {NULL, 0};
std::string ToString(DBSlice s) { return std::string(s.data, s.len); }
rocksdb::Slice ToSlice(DBSlice s) { return rocksdb::Slice(s.data, s.len); }
rocksdb::Slice ToSlice(DBString s) { return rocksdb::Slice(s.data, s.len); }
const int kMVCCVersionTimestampSize = 12;
void EncodeTimestamp(std::string& s, int64_t wall_time, int32_t logical) {
EncodeUint64(&s, uint64_t(wall_time));
if (logical != 0) {
EncodeUint32(&s, uint32_t(logical));
}
}
std::string EncodeTimestamp(DBTimestamp ts) {
std::string s;
s.reserve(kMVCCVersionTimestampSize);
EncodeTimestamp(s, ts.wall_time, ts.logical);
return s;
}
// MVCC keys are encoded as <key>[<wall_time>[<logical>]]<#timestamp-bytes>. A
// custom RocksDB comparator (DBComparator) is used to maintain the desired
// ordering as these keys do not sort lexicographically correctly.
std::string EncodeKey(DBKey k) {
std::string s;
const bool ts = k.wall_time != 0 || k.logical != 0;
s.reserve(k.key.len + 1 + (ts ? 1 + kMVCCVersionTimestampSize : 0));
s.append(k.key.data, k.key.len);
if (ts) {
// Add a NUL prefix to the timestamp data. See DBPrefixExtractor.Transform
// for more details.
s.push_back(0);
EncodeTimestamp(s, k.wall_time, k.logical);
}
s.push_back(char(s.size() - k.key.len));
return s;
}
// When we're performing a prefix scan, we want to limit the scan to
// the keys that have the matching prefix. Prefix in this case refers
// to an exact match on the non-timestamp portion of a key. We do this
// by constructing an encoded mvcc key which has a zero timestamp
// (hence the trailing 0) and is the "next" key (thus the additional
// 0). See EncodeKey and SplitKey for more details on the encoded key
// format.
std::string EncodePrefixNextKey(DBSlice k) {
std::string s;
if (k.len > 0) {
s.reserve(k.len + 2);
s.append(k.data, k.len);
s.push_back(0);
s.push_back(0);
}
return s;
}
WARN_UNUSED_RESULT bool SplitKey(rocksdb::Slice buf, rocksdb::Slice* key, rocksdb::Slice* timestamp) {
if (buf.empty()) {
return false;
}
const char ts_size = buf[buf.size() - 1];
if (ts_size >= buf.size()) {
return false;
}
*key = rocksdb::Slice(buf.data(), buf.size() - ts_size - 1);
*timestamp = rocksdb::Slice(key->data() + key->size(), ts_size);
return true;
}
WARN_UNUSED_RESULT bool DecodeTimestamp(rocksdb::Slice* timestamp, int64_t* wall_time, int32_t* logical) {
uint64_t w;
if (!DecodeUint64(timestamp, &w)) {
return false;
}
*wall_time = int64_t(w);
*logical = 0;
if (timestamp->size() > 0) {
// TODO(peter): Use varint decoding here.
uint32_t l;
if (!DecodeUint32(timestamp, &l)) {
return false;
}
*logical = int32_t(l);
}
return true;
}
WARN_UNUSED_RESULT bool DecodeHLCTimestamp(rocksdb::Slice buf, cockroach::util::hlc::Timestamp* timestamp) {
int64_t wall_time;
int32_t logical;
if (!DecodeTimestamp(&buf, &wall_time, &logical)) {
return false;
}
timestamp->set_wall_time(wall_time);
timestamp->set_logical(logical);
return true;
}
WARN_UNUSED_RESULT bool DecodeKey(rocksdb::Slice buf, rocksdb::Slice* key, int64_t* wall_time, int32_t* logical) {
key->clear();
rocksdb::Slice timestamp;
if (!SplitKey(buf, key, ×tamp)) {
return false;
}
if (timestamp.size() > 0) {
timestamp.remove_prefix(1); // The NUL prefix.
if (!DecodeTimestamp(×tamp, wall_time, logical)) {
return false;
}
}
return timestamp.empty();
}
rocksdb::Slice KeyPrefix(const rocksdb::Slice& src) {
rocksdb::Slice key;
rocksdb::Slice ts;
if (!SplitKey(src, &key, &ts)) {
return src;
}
// RocksDB requires that keys generated via Transform be comparable with
// normal encoded MVCC keys. Encoded MVCC keys have a suffix indicating the
// number of bytes of timestamp data. MVCC keys without a timestamp have a
// suffix of 0. We're careful in EncodeKey to make sure that the user-key
// always has a trailing 0. If there is no timestamp this falls out
// naturally. If there is a timestamp we prepend a 0 to the encoded
// timestamp data.
assert(src.size() > key.size() && src[key.size()] == 0);
return rocksdb::Slice(key.data(), key.size() + 1);
}
DBSlice ToDBSlice(const rocksdb::Slice& s) {
DBSlice result;
result.data = const_cast<char*>(s.data());
result.len = s.size();
return result;
}
DBSlice ToDBSlice(const DBString& s) {
DBSlice result;
result.data = s.data;
result.len = s.len;
return result;
}
DBString ToDBString(const rocksdb::Slice& s) {
DBString result;
result.len = s.size();
result.data = static_cast<char*>(malloc(result.len));
memcpy(result.data, s.data(), s.size());
return result;
}
DBKey ToDBKey(const rocksdb::Slice& s) {
DBKey key;
memset(&key, 0, sizeof(key));
rocksdb::Slice tmp;
if (DecodeKey(s, &tmp, &key.wall_time, &key.logical)) {
key.key = ToDBSlice(tmp);
}
return key;
}
DBStatus ToDBStatus(const rocksdb::Status& status) {
if (status.ok()) {
return kSuccess;
}
return ToDBString(status.ToString());
}
DBStatus FmtStatus(const char* fmt, ...) {
va_list ap;
va_start(ap, fmt);
std::string str;
google::protobuf::StringAppendV(&str, fmt, ap);
va_end(ap);
return ToDBString(str);
}
namespace {
DBIterState DBIterGetState(DBIterator* iter) {
DBIterState state = {};
state.valid = iter->rep->Valid();
state.status = ToDBStatus(iter->rep->status());
if (state.valid) {
rocksdb::Slice key;
state.valid = DecodeKey(iter->rep->key(), &key, &state.key.wall_time, &state.key.logical);
if (state.valid) {
state.key.key = ToDBSlice(key);
state.value = ToDBSlice(iter->rep->value());
}
}
return state;
}
const int kChecksumSize = 4;
const int kTagPos = kChecksumSize;
const int kHeaderSize = kTagPos + 1;
rocksdb::Slice ValueDataBytes(const std::string& val) {
if (val.size() < kHeaderSize) {
return rocksdb::Slice();
}
return rocksdb::Slice(val.data() + kHeaderSize, val.size() - kHeaderSize);
}
cockroach::roachpb::ValueType GetTag(const std::string& val) {
if (val.size() < kHeaderSize) {
return cockroach::roachpb::UNKNOWN;
}
return cockroach::roachpb::ValueType(val[kTagPos]);
}
void SetTag(std::string* val, cockroach::roachpb::ValueType tag) { (*val)[kTagPos] = tag; }
WARN_UNUSED_RESULT bool ParseProtoFromValue(const std::string& val, google::protobuf::MessageLite* msg) {
if (val.size() < kHeaderSize) {
return false;
}
const rocksdb::Slice d = ValueDataBytes(val);
return msg->ParseFromArray(d.data(), d.size());
}
void SerializeProtoToValue(std::string* val, const google::protobuf::MessageLite& msg) {
val->resize(kHeaderSize);
std::fill(val->begin(), val->end(), 0);
SetTag(val, cockroach::roachpb::BYTES);
msg.AppendToString(val);
}
bool IsValidSplitKey(const rocksdb::Slice& key, bool allow_meta2_splits) {
if (key == kMeta2KeyMax) {
// We do not allow splits at Meta2KeyMax. The reason for this is that the
// last range is the keyspace will always end at KeyMax, which will be
// stored at Meta2KeyMax because RangeMetaKey(KeyMax) = Meta2KeyMax. If we
// allowed splits at this key then the last descriptor would be stored on a
// non-meta range since the meta ranges would span from [KeyMin,Meta2KeyMax)
// and the first non-meta range would span [Meta2KeyMax,...).
return false;
}
const auto& no_split_spans = allow_meta2_splits ? kSortedNoSplitSpans : kSortedNoSplitSpansWithoutMeta2Splits;
for (auto span : no_split_spans) {
// kSortedNoSplitSpans and kSortedNoSplitSpansWithoutMeta2Splits are
// both reverse sorted (largest to smallest) on the span end key which
// allows us to early exit if our key to check is above the end of the
// last no-split span.
if (key.compare(span.second) >= 0) {
return true;
}
if (key.compare(span.first) > 0) {
return false;
}
}
return true;
}
class DBComparator : public rocksdb::Comparator {
public:
DBComparator() {}
virtual const char* Name() const override { return "cockroach_comparator"; }
virtual int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const override {
rocksdb::Slice key_a, key_b;
rocksdb::Slice ts_a, ts_b;
if (!SplitKey(a, &key_a, &ts_a) || !SplitKey(b, &key_b, &ts_b)) {
// This should never happen unless there is some sort of corruption of
// the keys.
return a.compare(b);
}
const int c = key_a.compare(key_b);
if (c != 0) {
return c;
}
if (ts_a.empty()) {
if (ts_b.empty()) {
return 0;
}
return -1;
} else if (ts_b.empty()) {
return +1;
}
return ts_b.compare(ts_a);
}
virtual bool Equal(const rocksdb::Slice& a, const rocksdb::Slice& b) const override { return a == b; }
// The RocksDB docs say it is safe to leave these two methods unimplemented.
virtual void FindShortestSeparator(std::string* start, const rocksdb::Slice& limit) const override {}
virtual void FindShortSuccessor(std::string* key) const override {}
};
const DBComparator kComparator;
class DBPrefixExtractor : public rocksdb::SliceTransform {
public:
DBPrefixExtractor() {}
virtual const char* Name() const { return "cockroach_prefix_extractor"; }
// MVCC keys are encoded as <user-key>/<timestamp>. Extract the <user-key>
// prefix which will allow for more efficient iteration over the keys
// matching a particular <user-key>. Specifically, the <user-key> will be
// added to the per table bloom filters and will be used to skip tables
// which do not contain the <user-key>.
virtual rocksdb::Slice Transform(const rocksdb::Slice& src) const { return KeyPrefix(src); }
virtual bool InDomain(const rocksdb::Slice& src) const { return true; }
virtual bool InRange(const rocksdb::Slice& dst) const { return Transform(dst) == dst; }
};
class DBBatchInserter : public rocksdb::WriteBatch::Handler {
public:
DBBatchInserter(rocksdb::WriteBatchBase* batch) : batch_(batch) {}
virtual void Put(const rocksdb::Slice& key, const rocksdb::Slice& value) { batch_->Put(key, value); }
virtual void Delete(const rocksdb::Slice& key) { batch_->Delete(key); }
virtual void Merge(const rocksdb::Slice& key, const rocksdb::Slice& value) { batch_->Merge(key, value); }
virtual rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice& begin_key,
const rocksdb::Slice& end_key) {
if (column_family_id == 0) {
batch_->DeleteRange(begin_key, end_key);
return rocksdb::Status::OK();
}
return rocksdb::Status::InvalidArgument("DeleteRangeCF not implemented");
}
private:
rocksdb::WriteBatchBase* const batch_;
};
// Method used to sort InternalTimeSeriesSamples.
bool TimeSeriesSampleOrdering(const cockroach::roachpb::InternalTimeSeriesSample* a,
const cockroach::roachpb::InternalTimeSeriesSample* b) {
return a->offset() < b->offset();
}
// IsTimeSeriesData returns true if the given protobuffer Value contains a
// TimeSeriesData message.
bool IsTimeSeriesData(const std::string& val) { return GetTag(val) == cockroach::roachpb::TIMESERIES; }
void SerializeTimeSeriesToValue(std::string* val, const cockroach::roachpb::InternalTimeSeriesData& ts) {
SerializeProtoToValue(val, ts);
SetTag(val, cockroach::roachpb::TIMESERIES);
}
// MergeTimeSeriesValues attempts to merge two Values which contain
// InternalTimeSeriesData messages. The messages cannot be merged if they have
// different start timestamps or sample durations. Returns true if the merge is
// successful.
WARN_UNUSED_RESULT bool MergeTimeSeriesValues(std::string* left, const std::string& right, bool full_merge,
rocksdb::Logger* logger) {
// Attempt to parse TimeSeriesData from both Values.
cockroach::roachpb::InternalTimeSeriesData left_ts;
cockroach::roachpb::InternalTimeSeriesData right_ts;
if (!ParseProtoFromValue(*left, &left_ts)) {
rocksdb::Warn(logger, "left InternalTimeSeriesData could not be parsed from bytes.");
return false;
}
if (!ParseProtoFromValue(right, &right_ts)) {
rocksdb::Warn(logger, "right InternalTimeSeriesData could not be parsed from bytes.");
return false;
}
// Ensure that both InternalTimeSeriesData have the same timestamp and
// sample_duration.
if (left_ts.start_timestamp_nanos() != right_ts.start_timestamp_nanos()) {
rocksdb::Warn(logger, "TimeSeries merge failed due to mismatched start timestamps");
return false;
}
if (left_ts.sample_duration_nanos() != right_ts.sample_duration_nanos()) {
rocksdb::Warn(logger, "TimeSeries merge failed due to mismatched sample durations.");
return false;
}
// If only a partial merge, do not sort and combine - instead, just quickly
// merge the two values together. Values will be processed later after a
// full merge.
if (!full_merge) {
left_ts.MergeFrom(right_ts);
SerializeTimeSeriesToValue(left, left_ts);
return true;
}
// Initialize new_ts and its primitive data fields. Values from the left and
// right collections will be merged into the new collection.
cockroach::roachpb::InternalTimeSeriesData new_ts;
new_ts.set_start_timestamp_nanos(left_ts.start_timestamp_nanos());
new_ts.set_sample_duration_nanos(left_ts.sample_duration_nanos());
// Sort values in right_ts. Assume values in left_ts have been sorted.
std::stable_sort(right_ts.mutable_samples()->pointer_begin(), right_ts.mutable_samples()->pointer_end(),
TimeSeriesSampleOrdering);
// Merge sample values of left and right into new_ts.
auto left_front = left_ts.samples().begin();
auto left_end = left_ts.samples().end();
auto right_front = right_ts.samples().begin();
auto right_end = right_ts.samples().end();
// Loop until samples from both sides have been exhausted.
while (left_front != left_end || right_front != right_end) {
// Select the lowest offset from either side.
long next_offset;
if (left_front == left_end) {
next_offset = right_front->offset();
} else if (right_front == right_end) {
next_offset = left_front->offset();
} else if (left_front->offset() <= right_front->offset()) {
next_offset = left_front->offset();
} else {
next_offset = right_front->offset();
}
// Create an empty sample in the output collection.
cockroach::roachpb::InternalTimeSeriesSample* ns = new_ts.add_samples();
// Only the most recently merged value with a given sample offset is kept;
// samples merged earlier at the same offset are discarded. We will now
// parse through the left and right sample sets, finding the most recently
// merged sample at the current offset.
cockroach::roachpb::InternalTimeSeriesSample src;
while (left_front != left_end && left_front->offset() == next_offset) {
src = *left_front;
left_front++;
}
while (right_front != right_end && right_front->offset() == next_offset) {
src = *right_front;
right_front++;
}
ns->CopyFrom(src);
}
// Serialize the new TimeSeriesData into the left value's byte field.
SerializeTimeSeriesToValue(left, new_ts);
return true;
}
// ConsolidateTimeSeriesValue processes a single value which contains
// InternalTimeSeriesData messages. This method will sort the sample collection
// of the value, keeping only the last of samples with duplicate offsets.
// This method is the single-value equivalent of MergeTimeSeriesValues, and is
// used in the case where the first value is merged into the key. Returns true
// if the merge is successful.
WARN_UNUSED_RESULT bool ConsolidateTimeSeriesValue(std::string* val, rocksdb::Logger* logger) {
// Attempt to parse TimeSeriesData from both Values.
cockroach::roachpb::InternalTimeSeriesData val_ts;
if (!ParseProtoFromValue(*val, &val_ts)) {
rocksdb::Warn(logger, "InternalTimeSeriesData could not be parsed from bytes.");
return false;
}
// Initialize new_ts and its primitive data fields.
cockroach::roachpb::InternalTimeSeriesData new_ts;
new_ts.set_start_timestamp_nanos(val_ts.start_timestamp_nanos());
new_ts.set_sample_duration_nanos(val_ts.sample_duration_nanos());
// Sort values in the ts value.
std::stable_sort(val_ts.mutable_samples()->pointer_begin(), val_ts.mutable_samples()->pointer_end(),
TimeSeriesSampleOrdering);
// Consolidate sample values from the ts value with duplicate offsets.
auto front = val_ts.samples().begin();
auto end = val_ts.samples().end();
// Loop until samples have been exhausted.
while (front != end) {
// Create an empty sample in the output collection.
cockroach::roachpb::InternalTimeSeriesSample* ns = new_ts.add_samples();
ns->set_offset(front->offset());
while (front != end && front->offset() == ns->offset()) {
// Only the last sample in the value's repeated samples field with a given
// offset is kept in the case of multiple samples with identical offsets.
ns->CopyFrom(*front);
++front;
}
}
// Serialize the new TimeSeriesData into the value's byte field.
SerializeTimeSeriesToValue(val, new_ts);
return true;
}
WARN_UNUSED_RESULT bool MergeValues(cockroach::storage::engine::enginepb::MVCCMetadata* left,
const cockroach::storage::engine::enginepb::MVCCMetadata& right, bool full_merge,
rocksdb::Logger* logger) {
if (left->has_raw_bytes()) {
if (!right.has_raw_bytes()) {
rocksdb::Warn(logger, "inconsistent value types for merge (left = bytes, right = ?)");
return false;
}
// Replay Advisory: Because merge commands pass through raft, it is possible
// for merging values to be "replayed". Currently, the only actual use of
// the merge system is for time series data, which is safe against replay;
// however, this property is not general for all potential mergeable types.
// If a future need arises to merge another type of data, replay protection
// will likely need to be a consideration.
if (IsTimeSeriesData(left->raw_bytes()) || IsTimeSeriesData(right.raw_bytes())) {
// The right operand must also be a time series.
if (!IsTimeSeriesData(left->raw_bytes()) || !IsTimeSeriesData(right.raw_bytes())) {
rocksdb::Warn(logger, "inconsistent value types for merging time "
"series data (type(left) != type(right))");
return false;
}
return MergeTimeSeriesValues(left->mutable_raw_bytes(), right.raw_bytes(), full_merge, logger);
} else {
const rocksdb::Slice rdata = ValueDataBytes(right.raw_bytes());
left->mutable_raw_bytes()->append(rdata.data(), rdata.size());
}
return true;
} else {
left->mutable_raw_bytes()->assign(right.raw_bytes());
if (right.has_merge_timestamp()) {
left->mutable_merge_timestamp()->CopyFrom(right.merge_timestamp());
}
if (full_merge && IsTimeSeriesData(left->raw_bytes())) {
if (!ConsolidateTimeSeriesValue(left->mutable_raw_bytes(), logger)) {
return false;
}
}
return true;
}
}
// MergeResult serializes the result MVCCMetadata value into a byte slice.
DBStatus MergeResult(cockroach::storage::engine::enginepb::MVCCMetadata* meta, DBString* result) {
// TODO(pmattis): Should recompute checksum here. Need a crc32
// implementation and need to verify the checksumming is identical
// to what is being done in Go. Zlib's crc32 should be sufficient.
result->len = meta->ByteSize();
result->data = static_cast<char*>(malloc(result->len));
if (!meta->SerializeToArray(result->data, result->len)) {
return ToDBString("serialization error");
}
return kSuccess;
}
class DBMergeOperator : public rocksdb::MergeOperator {
virtual const char* Name() const { return "cockroach_merge_operator"; }
virtual bool FullMerge(const rocksdb::Slice& key, const rocksdb::Slice* existing_value,
const std::deque<std::string>& operand_list, std::string* new_value,
rocksdb::Logger* logger) const WARN_UNUSED_RESULT {
// TODO(pmattis): Taken from the old merger code, below are some
// details about how errors returned by the merge operator are
// handled. Need to test various error scenarios and decide on
// desired behavior. Clear the key and it's gone. Corrupt it
// properly and RocksDB might refuse to work with it at all until
// you clear it manually, which may also not be what we want. The
// problem with merges is that RocksDB won't really carry them out
// while we have a chance to talk back to clients.
//
// If we indicate failure (*success = false), then the call to the
// merger via rocksdb_merge will not return an error, but simply
// remove or truncate the offending key (at least when the settings
// specify that missing keys should be created; otherwise a
// corruption error will be returned, but likely only after the next
// read of the key). In effect, there is no propagation of error
// information to the client.
cockroach::storage::engine::enginepb::MVCCMetadata meta;
if (existing_value != NULL) {
if (!meta.ParseFromArray(existing_value->data(), existing_value->size())) {
// Corrupted existing value.
rocksdb::Warn(logger, "corrupted existing value");
return false;
}
}
for (int i = 0; i < operand_list.size(); i++) {
if (!MergeOne(&meta, operand_list[i], true, logger)) {
return false;
}
}
if (!meta.SerializeToString(new_value)) {
rocksdb::Warn(logger, "serialization error");
return false;
}
return true;
}
virtual bool PartialMergeMulti(const rocksdb::Slice& key, const std::deque<rocksdb::Slice>& operand_list,
std::string* new_value, rocksdb::Logger* logger) const WARN_UNUSED_RESULT {
cockroach::storage::engine::enginepb::MVCCMetadata meta;
for (int i = 0; i < operand_list.size(); i++) {
if (!MergeOne(&meta, operand_list[i], false, logger)) {
return false;
}
}
if (!meta.SerializeToString(new_value)) {
rocksdb::Warn(logger, "serialization error");
return false;
}
return true;
}
private:
bool MergeOne(cockroach::storage::engine::enginepb::MVCCMetadata* meta, const rocksdb::Slice& operand,
bool full_merge, rocksdb::Logger* logger) const WARN_UNUSED_RESULT {
cockroach::storage::engine::enginepb::MVCCMetadata operand_meta;
if (!operand_meta.ParseFromArray(operand.data(), operand.size())) {
rocksdb::Warn(logger, "corrupted operand value");
return false;
}
return MergeValues(meta, operand_meta, full_merge, logger);
}
};
class DBLogger : public rocksdb::Logger {
public:
DBLogger(bool enabled) : enabled_(enabled) {}
virtual void Logv(const char* format, va_list ap) {
// TODO(pmattis): Benchmark calling Go exported methods from C++
// to determine if this is too slow.
if (!enabled_) {
return;
}
// First try with a small fixed size buffer.
char space[1024];
// It's possible for methods that use a va_list to invalidate the data in
// it upon use. The fix is to make a copy of the structure before using it
// and use that copy instead.
va_list backup_ap;
va_copy(backup_ap, ap);
int result = vsnprintf(space, sizeof(space), format, backup_ap);
va_end(backup_ap);
if ((result >= 0) && (result < sizeof(space))) {
rocksDBLog(space, result);
return;
}
// Repeatedly increase buffer size until it fits.
int length = sizeof(space);
while (true) {
if (result < 0) {
// Older behavior: just try doubling the buffer size.
length *= 2;
} else {
// We need exactly "result+1" characters.
length = result + 1;
}
char* buf = new char[length];
// Restore the va_list before we use it again
va_copy(backup_ap, ap);
result = vsnprintf(buf, length, format, backup_ap);
va_end(backup_ap);
if ((result >= 0) && (result < length)) {
// It fit
rocksDBLog(buf, result);
delete[] buf;
return;
}
delete[] buf;
}
}
private:
const bool enabled_;
};
// Getter defines an interface for retrieving a value from either an
// iterator or an engine. It is used by ProcessDeltaKey to abstract
// whether the "base" layer is an iterator or an engine.
struct Getter {
virtual DBStatus Get(DBString* value) = 0;
};
// IteratorGetter is an implementation of the Getter interface which
// retrieves the value currently pointed to by the supplied
// iterator. It is ok for the supplied iterator to be NULL in which
// case no value will be retrieved.
struct IteratorGetter : public Getter {
rocksdb::Iterator* const base;
IteratorGetter(rocksdb::Iterator* iter) : base(iter) {}
virtual DBStatus Get(DBString* value) {
if (base == NULL) {
value->data = NULL;
value->len = 0;
} else {
*value = ToDBString(base->value());
}
return kSuccess;
}
};
// DBGetter is an implementation of the Getter interface which
// retrieves the value for the supplied key from a rocksdb::DB.
struct DBGetter : public Getter {
rocksdb::DB* const rep;
rocksdb::ReadOptions const options;
std::string const key;
DBGetter(rocksdb::DB* const r, rocksdb::ReadOptions opts, std::string&& k)
: rep(r), options(opts), key(std::move(k)) {}
virtual DBStatus Get(DBString* value) {
std::string tmp;
rocksdb::Status s = rep->Get(options, key, &tmp);
if (!s.ok()) {
if (s.IsNotFound()) {
// This mirrors the logic in rocksdb_get(). It doesn't seem like
// a good idea, but some code in engine_test.go depends on it.
value->data = NULL;
value->len = 0;
return kSuccess;
}
return ToDBStatus(s);
}
*value = ToDBString(tmp);
return kSuccess;
}
};
// ProcessDeltaKey performs the heavy lifting of processing the deltas
// for "key" contained in a batch and determining what the resulting
// value is. "delta" should have been seeked to "key", but may not be
// pointing to "key" if no updates existing for that key in the batch.
//
// Note that RocksDB WriteBatches append updates
// internally. WBWIIterator maintains an index for these updates on
// <key, seq-num>. Looping over the entries in WBWIIterator will
// return the keys in sorted order and, for each key, the updates as
// they were added to the batch.
//
// Upon return, the delta iterator will point to the next entry past
// key. The delta iterator may not be valid if the end of iteration
// was reached.
DBStatus ProcessDeltaKey(Getter* base, rocksdb::WBWIIterator* delta, rocksdb::Slice key, DBString* value) {
if (value->data != NULL) {
free(value->data);
}
value->data = NULL;
value->len = 0;
int count = 0;
for (; delta->Valid() && delta->Entry().key == key; ++count, delta->Next()) {
rocksdb::WriteEntry entry = delta->Entry();
switch (entry.type) {
case rocksdb::kPutRecord:
if (value->data != NULL) {
free(value->data);
}
*value = ToDBString(entry.value);
break;
case rocksdb::kMergeRecord: {
DBString existing;
if (count == 0) {
// If this is the first record for the key, then we need to
// merge with the record in base.
DBStatus status = base->Get(&existing);
if (status.data != NULL) {
if (value->data != NULL) {
free(value->data);
value->data = NULL;
value->len = 0;
}
return status;
}
} else {
// Merge with the value we've built up so far.
existing = *value;
value->data = NULL;
value->len = 0;
}
if (existing.data != NULL) {
DBStatus status = DBMergeOne(ToDBSlice(existing), ToDBSlice(entry.value), value);
free(existing.data);
if (status.data != NULL) {
return status;
}
} else {
*value = ToDBString(entry.value);
}
break;
}
case rocksdb::kDeleteRecord:
if (value->data != NULL) {
free(value->data);
}
// This mirrors the logic in DBGet(): a deleted entry is
// indicated by a value with NULL data.
value->data = NULL;
value->len = 0;
break;
default:
break;
}