Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
25789: ts: Implement new on-disk format for time series r=mrtracy a=mrtracy

The first commit is cockroachdb#25587 and can be ignored for this PR.


Implement a new *columnar* on-disk format for time series samples,
replacing the previous row-like format.

Previously, each slab of time series data contained a collection of
"samples", where each sample was a message containing a number of
fields; the timestamp (offset), and a number of value fields intended to
contain "rolled-up" data for long-term, low resolution storage.

The new format is columnar; the top-level slab contains multiple
parallel arrays, with each array containing the ordered values for the
individual samples.

This gives us the following advantages:

This has a columnar layout, made up of parallel arrays. This gives us
all of the advantages we were missing in the previous layout:

+ High-resolution data can leave the aggregate fields completely empty;
only the "last" aggregate fields. This will reduce the in-memory size of
each sample from the full Sample structure down to a int32 and a
float64. This also means we can add several more aggregates without
inflating the size of each sample.
+ The columnar format takes advantage of protobuffer repeated field
packing, which should save considerable space for the encoded on-disk
format.
+ When querying, we can iterate directly over the data fields we need,
which may improve data locality (and thus cache-miss performance) or
allow us to more aggressively release memory for data that is not
needed.

This commit does the following:

+ Define new columnar fields in roachpb/internal.proto.
+ Write new C++ merge logic for the columnar fields. This does not
replace the existing row logic; it lives alongside it.
+ Create an upgrade path in the merge logic for row-formatted slabs;
this occurs whenever columnar data is merged into an existing key with
row-formatted data. This ensures than any individual slab of data will
contain only row-formatted samples or column-formatted samples, but
never both.

Release note: none.

Co-authored-by: Matt Tracy <[email protected]>
  • Loading branch information
craig[bot] and Matt Tracy committed Jun 5, 2018
2 parents 24bc982 + b705333 commit a9498cc
Show file tree
Hide file tree
Showing 12 changed files with 2,161 additions and 151 deletions.
1 change: 1 addition & 0 deletions c-deps/libroach/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ set(tests
db_test.cc
encoding_test.cc
file_registry_test.cc
merge_test.cc
ccl/db_test.cc
ccl/key_manager_test.cc
)
Expand Down
299 changes: 226 additions & 73 deletions c-deps/libroach/merge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// implied. See the License for the specific language governing
// permissions and limitations under the License.

#include <numeric>
#include "merge.h"
#include <rocksdb/env.h>
#include "db.h"
Expand Down Expand Up @@ -115,58 +116,81 @@ WARN_UNUSED_RESULT bool MergeTimeSeriesValues(std::string* left, const std::stri
return true;
}

// Initialize new_ts and its primitive data fields. Values from the left and
// right collections will be merged into the new collection.
cockroach::roachpb::InternalTimeSeriesData new_ts;
new_ts.set_start_timestamp_nanos(left_ts.start_timestamp_nanos());
new_ts.set_sample_duration_nanos(left_ts.sample_duration_nanos());

// Sort values in right_ts. Assume values in left_ts have been sorted.
std::stable_sort(right_ts.mutable_samples()->pointer_begin(),
right_ts.mutable_samples()->pointer_end(), TimeSeriesSampleOrdering);

// Merge sample values of left and right into new_ts.
auto left_front = left_ts.samples().begin();
auto left_end = left_ts.samples().end();
auto right_front = right_ts.samples().begin();
auto right_end = right_ts.samples().end();

// Loop until samples from both sides have been exhausted.
while (left_front != left_end || right_front != right_end) {
// Select the lowest offset from either side.
long next_offset;
if (left_front == left_end) {
next_offset = right_front->offset();
} else if (right_front == right_end) {
next_offset = left_front->offset();
} else if (left_front->offset() <= right_front->offset()) {
next_offset = left_front->offset();
} else {
next_offset = right_front->offset();
}
// Determine if we are using row or columnar format, by checking if either
// format has a "last" column.
bool use_column_format = left_ts.last_size() > 0 || right_ts.last_size() > 0;

if (use_column_format) {
// Convert from row format to column format if necessary.
convertToColumnar(&left_ts);
convertToColumnar(&right_ts);

// Find the minimum offset of the right collection, and find the highest
// index in the left collection which is greater than or equal to that
// minimum. This determines how many elements of the left collection will
// need to be re-sorted and de-duplicated.
auto min_offset = std::min_element(right_ts.offset().begin(), right_ts.offset().end());
auto first_unsorted_index = std::distance(
left_ts.offset().begin(),
std::lower_bound(left_ts.offset().begin(), left_ts.offset().end(), *min_offset)
);
left_ts.MergeFrom(right_ts);
sortAndDeduplicateColumns(&left_ts, first_unsorted_index);
SerializeTimeSeriesToValue(left, left_ts);
} else {
// Initialize new_ts and its primitive data fields. Values from the left and
// right collections will be merged into the new collection.
cockroach::roachpb::InternalTimeSeriesData new_ts;
new_ts.set_start_timestamp_nanos(left_ts.start_timestamp_nanos());
new_ts.set_sample_duration_nanos(left_ts.sample_duration_nanos());

// Sort values in right_ts. Assume values in left_ts have been sorted.
std::stable_sort(right_ts.mutable_samples()->pointer_begin(),
right_ts.mutable_samples()->pointer_end(), TimeSeriesSampleOrdering);

// Merge sample values of left and right into new_ts.
auto left_front = left_ts.samples().begin();
auto left_end = left_ts.samples().end();
auto right_front = right_ts.samples().begin();
auto right_end = right_ts.samples().end();

// Loop until samples from both sides have been exhausted.
while (left_front != left_end || right_front != right_end) {
// Select the lowest offset from either side.
long next_offset;
if (left_front == left_end) {
next_offset = right_front->offset();
} else if (right_front == right_end) {
next_offset = left_front->offset();
} else if (left_front->offset() <= right_front->offset()) {
next_offset = left_front->offset();
} else {
next_offset = right_front->offset();
}

// Create an empty sample in the output collection.
cockroach::roachpb::InternalTimeSeriesSample* ns = new_ts.add_samples();

// Only the most recently merged value with a given sample offset is kept;
// samples merged earlier at the same offset are discarded. We will now
// parse through the left and right sample sets, finding the most recently
// merged sample at the current offset.
cockroach::roachpb::InternalTimeSeriesSample src;
while (left_front != left_end && left_front->offset() == next_offset) {
src = *left_front;
left_front++;
}
while (right_front != right_end && right_front->offset() == next_offset) {
src = *right_front;
right_front++;
// Create an empty sample in the output collection.
cockroach::roachpb::InternalTimeSeriesSample* ns = new_ts.add_samples();

// Only the most recently merged value with a given sample offset is kept;
// samples merged earlier at the same offset are discarded. We will now
// parse through the left and right sample sets, finding the most recently
// merged sample at the current offset.
cockroach::roachpb::InternalTimeSeriesSample src;
while (left_front != left_end && left_front->offset() == next_offset) {
src = *left_front;
left_front++;
}
while (right_front != right_end && right_front->offset() == next_offset) {
src = *right_front;
right_front++;
}

ns->CopyFrom(src);
}

ns->CopyFrom(src);
// Serialize the new TimeSeriesData into the left value's byte field.
SerializeTimeSeriesToValue(left, new_ts);
}

// Serialize the new TimeSeriesData into the left value's byte field.
SerializeTimeSeriesToValue(left, new_ts);
return true;
}

Expand All @@ -184,34 +208,26 @@ WARN_UNUSED_RESULT bool ConsolidateTimeSeriesValue(std::string* val, rocksdb::Lo
return false;
}

// Initialize new_ts and its primitive data fields.
cockroach::roachpb::InternalTimeSeriesData new_ts;
new_ts.set_start_timestamp_nanos(val_ts.start_timestamp_nanos());
new_ts.set_sample_duration_nanos(val_ts.sample_duration_nanos());

// Sort values in the ts value.
std::stable_sort(val_ts.mutable_samples()->pointer_begin(),
val_ts.mutable_samples()->pointer_end(), TimeSeriesSampleOrdering);

// Consolidate sample values from the ts value with duplicate offsets.
auto front = val_ts.samples().begin();
auto end = val_ts.samples().end();

// Loop until samples have been exhausted.
while (front != end) {
// Create an empty sample in the output collection.
cockroach::roachpb::InternalTimeSeriesSample* ns = new_ts.add_samples();
ns->set_offset(front->offset());
while (front != end && front->offset() == ns->offset()) {
// Only the last sample in the value's repeated samples field with a given
// offset is kept in the case of multiple samples with identical offsets.
ns->CopyFrom(*front);
++front;
}
// Detect if the value is in columnar or row format. Columnar format is
// detected by the presence of a non-zero-length offset field.
if (val_ts.offset_size() > 0) {
sortAndDeduplicateColumns(&val_ts, 0);
} else {
std::stable_sort(val_ts.mutable_samples()->pointer_begin(),
val_ts.mutable_samples()->pointer_end(), TimeSeriesSampleOrdering);

// Deduplicate values, keeping only the *last* sample merged with a given index.
using sample = cockroach::roachpb::InternalTimeSeriesSample;
auto it = std::unique(val_ts.mutable_samples()->rbegin(),
val_ts.mutable_samples()->rend(),
[](const sample& a, const sample& b) {
return a.offset() == b.offset();
});
val_ts.mutable_samples()->DeleteSubrange(0, std::distance(val_ts.mutable_samples()->begin(), it.base()));
}

// Serialize the new TimeSeriesData into the value's byte field.
SerializeTimeSeriesToValue(val, new_ts);
SerializeTimeSeriesToValue(val, val_ts);
return true;
}

Expand Down Expand Up @@ -293,6 +309,143 @@ class DBMergeOperator : public rocksdb::MergeOperator {

} // namespace


// convertToColumnar detects time series data which is in the old row format,
// converting the data within into the new columnar format.
void convertToColumnar(cockroach::roachpb::InternalTimeSeriesData* data) {
if (data->samples_size() > 0) {
for (auto sample : data->samples()) {
// While the row format contains other values (such as min and max), these
// were not stored in actual usage. Furthermore, the new columnar format
// has been designed to be "sparse", with high resolutions containing
// values only for the "offset" and "last" columns. Thus, the other row
// fields are ignored.
data->add_offset(sample.offset());
data->add_last(sample.sum());
}
data->clear_samples();
}
}

// sortAndDeduplicateColumns sorts all column fields of the time series data
// structure according to the values "offset" column. At the same time,
// duplicate offset values are removed - only the last instance of an offset in
// the collection is retained.
//
// "first_unsorted" is an optimization which only sorts data rows with an index
// greater than or equal to the supplied index. This is used because the
// supplied data is often the result of merging an already-sorted collection
// with a smaller unsorted collection, and thus only a portion of the end of the
// data needs to be sorted.
void sortAndDeduplicateColumns(cockroach::roachpb::InternalTimeSeriesData* data, int first_unsorted) {
// Create an auxiliary array of array indexes, and sort that array according
// to the corresponding offset value in the data.offset() collection. This
// yields the permutation of the current array indexes that will place the
// offsets into sorted order.
auto order = std::vector<int>(data->offset_size() - first_unsorted);
std::iota(order.begin(), order.end(), first_unsorted);
std::stable_sort(order.begin(), order.end(), [&](const int a, const int b) {
return data->offset(a) < data->offset(b);
});

// Remove any duplicates from the permutation, keeping the *last* element
// merged for any given offset. Note the number of duplicates removed so that
// the columns can be resized later.
auto it = std::unique(order.rbegin(), order.rend(), [&](const int a, const int b) {
return data->offset(a) == data->offset(b);
});
int duplicates = std::distance(order.begin(), it.base());
order.erase(order.begin(), it.base());

// Apply the permutation in the auxiliary array to all of the relevant column
// arrays in the data set.
for (int i = 0; i < order.size(); i++) {
// "dest_idx" is the current index which is being operated on; for each
// column, we will be replacing the value at this index with the correct
// sorted-order value for that index.
//
// "src_idx" is the current location of the value that is being moved to
// dest_idx, found by consulting the "order" auxiliary array. Its value
// will be *swapped* with the current value at "dest_idx".
//
// Because we are swapping values, and because we iterate through
// destinations from front to back, it is possible that value that was
// originally in "src_idx" has already been swapped to another location;
// specifically, if "src_idx" is earlier than "dest_idx", then its value is
// guaranteed to have been swapped. To find its current location, we
// "follow" the indexes in the order array until we arrive at a src_idx
// which is greater than the current dest_idx, which will be the correct
// location of the source value.
//
// An example of this situation:
//
// initial:
// data = [3 1 4 2]
// order = [1 3 0 2]
//
// dest = 0
// src = order[0] // 1
// data.swap(dest, src) // 0 <-> 1
// data == [1 3 4 2]
//
// dest = 1
// src = order[1] // 3
// data.swap(dest, src) // 1 <-> 3
// data == [1 2 4 3]
//
// dest = 2
// src = order[2] // 0
// // src < dest, so follow the trail
// src = order[src] // 1
// // src < dest, so follow the trail
// src = order[src] // 3
// data.swap(dest, src) // 2 <-> 3
// data == [1 2 3 4]
int dest_idx = i + first_unsorted;
int src_idx = order[i];
while (src_idx < dest_idx) {
src_idx = order[src_idx - first_unsorted];
}
// If the source is equal to the destination, then this value is already
// at its correct sorted location.
if (src_idx == dest_idx) {
continue;
}

data->mutable_offset()->SwapElements(src_idx, dest_idx);
data->mutable_last()->SwapElements(src_idx, dest_idx);

// These columns are only present at resolutions generated as rollups. We
// detect this by checking if there are any count columns present (the
// choice of "count" is arbitrary, all of these columns will be present or
// not).
if (data->count_size() > 0) {
data->mutable_count()->SwapElements(src_idx, dest_idx);
data->mutable_sum()->SwapElements(src_idx, dest_idx);
data->mutable_min()->SwapElements(src_idx, dest_idx);
data->mutable_max()->SwapElements(src_idx, dest_idx);
data->mutable_first()->SwapElements(src_idx, dest_idx);
data->mutable_variance()->SwapElements(src_idx, dest_idx);
}
}

// Resize each column to account for any duplicate values which were removed -
// the swapping algorithm will have moved these to the very end of the
// collection.
auto new_size = data->offset_size() - duplicates;
data->mutable_offset()->Truncate(new_size);
data->mutable_last()->Truncate(new_size);
if (data->count_size() > 0) {
data->mutable_count()->Truncate(new_size);
data->mutable_sum()->Truncate(new_size);
data->mutable_min()->Truncate(new_size);
data->mutable_max()->Truncate(new_size);
data->mutable_first()->Truncate(new_size);
data->mutable_variance()->Truncate(new_size);
}
}


WARN_UNUSED_RESULT bool MergeValues(cockroach::storage::engine::enginepb::MVCCMetadata* left,
const cockroach::storage::engine::enginepb::MVCCMetadata& right,
bool full_merge, rocksdb::Logger* logger) {
Expand Down
3 changes: 3 additions & 0 deletions c-deps/libroach/merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <libroach.h>
#include <rocksdb/merge_operator.h>
#include "defines.h"
#include "protos/roachpb/internal.pb.h"
#include "protos/storage/engine/enginepb/mvcc.pb.h"

namespace cockroach {
Expand All @@ -26,5 +27,7 @@ WARN_UNUSED_RESULT bool MergeValues(cockroach::storage::engine::enginepb::MVCCMe
bool full_merge, rocksdb::Logger* logger);
DBStatus MergeResult(cockroach::storage::engine::enginepb::MVCCMetadata* meta, DBString* result);
rocksdb::MergeOperator* NewMergeOperator();
void sortAndDeduplicateColumns(roachpb::InternalTimeSeriesData* data, int first_unsorted);
void convertToColumnar(roachpb::InternalTimeSeriesData* data);

} // namespace cockroach
Loading

0 comments on commit a9498cc

Please sign in to comment.