Skip to content

Commit

Permalink
XORFilter output datatype and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shaunrd0 committed Jul 12, 2023
1 parent 1345f00 commit f8268b6
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 24 deletions.
32 changes: 28 additions & 4 deletions test/src/unit-filter-pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3428,6 +3428,7 @@ void testing_float_scaling_filter() {
CHECK(tile.filtered_buffer().size() != 0);

auto unfiltered_tile = create_tile_for_unfiltering(nelts, tile);
unfiltered_tile.set_datatype(t);
run_reverse(config, tp, unfiltered_tile, pipeline);
for (uint64_t i = 0; i < nelts; i++) {
FloatingType elt = 0.0f;
Expand Down Expand Up @@ -3527,6 +3528,11 @@ TEST_CASE("Filter: Test XOR", "[filter][xor]") {

TEST_CASE("Filter: Pipeline filtered output types", "[filter][pipeline]") {
FilterPipeline pipeline;
SECTION("- DoubleDelta filter reinterprets float->int32") {
pipeline.add_filter(CompressionFilter(
tiledb::sm::Compressor::DOUBLE_DELTA, 0, Datatype::INT32));
pipeline.add_filter(BitWidthReductionFilter());
}
SECTION("- Delta filter reinterprets float->int32") {
pipeline.add_filter(
CompressionFilter(tiledb::sm::Compressor::DELTA, 0, Datatype::INT32));
Expand All @@ -3541,7 +3547,22 @@ TEST_CASE("Filter: Pipeline filtered output types", "[filter][pipeline]") {
pipeline.add_filter(ByteshuffleFilter());
pipeline.add_filter(BitWidthReductionFilter());
}

size_t byte_width = 0;
SECTION("- XOR filter expected output types") {
byte_width = GENERATE(
sizeof(int8_t), sizeof(int16_t), sizeof(int32_t), sizeof(int64_t));
pipeline.add_filter(FloatScalingFilter(byte_width, 1.0f, 0.0f));
pipeline.add_filter(XORFilter());
}
SECTION("- XOR filter expected output types large pipeline") {
byte_width = GENERATE(
sizeof(int8_t), sizeof(int16_t), sizeof(int32_t), sizeof(int64_t));
pipeline.add_filter(FloatScalingFilter(byte_width, 1.0f, 0.0f));
pipeline.add_filter(PositiveDeltaFilter());
pipeline.add_filter(BitshuffleFilter());
pipeline.add_filter(ByteshuffleFilter());
pipeline.add_filter(XORFilter());
}
// Initial type of tile is float.
std::vector<float> data = {
1.0f, 2.1f, 3.2f, 4.3f, 5.4f, 6.5f, 7.6f, 8.7f, 9.8f, 10.9f};
Expand All @@ -3559,9 +3580,12 @@ TEST_CASE("Filter: Pipeline filtered output types", "[filter][pipeline]") {
pipeline.run_forward(&test::g_helper_stats, &tile, nullptr, &tp).ok());
CHECK(tile.size() == 0);
CHECK(tile.filtered_buffer().size() != 0);
// TODO: Remove before merge.
std::cout << datatype_str(tile.type()) << std::endl;
// CHECK(tile.type() == Datatype::FLOAT32);
if (pipeline.has_filter(tiledb::sm::FilterType::FILTER_XOR)) {
// Test the final tile type is expected for XORFilter byte_width.
CHECK(datatype_size(tile.type()) == byte_width);
} else {
CHECK(tile.type() == Datatype::INT32);
}

auto unfiltered_tile = create_tile_for_unfiltering(data.size(), tile);
unfiltered_tile.set_datatype(Datatype::FLOAT32);
Expand Down
4 changes: 2 additions & 2 deletions tiledb/sm/compressors/delta_compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ class Delta {
* @param output_buffer Output buffer to write to the compressed data.
*
* @note The function will fail with an error if:
* Unsupported datatype is used.
* Failure to write / read from allocated buffers. TODO: Finish
* Float or otherwise unsupported datatype is used.
* Failure to write / read from allocated buffers.
*/

static void compress(
Expand Down
15 changes: 14 additions & 1 deletion tiledb/sm/filter/compression_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,10 @@ Status CompressionFilter::decompress_part(
&output_buffer);
break;
case Compressor::DOUBLE_DELTA:
st = DoubleDelta::decompress(type, &input_buffer, &output_buffer);
st = DoubleDelta::decompress(
reinterpret_datatype_ == Datatype::ANY ? type : reinterpret_datatype_,
&input_buffer,
&output_buffer);
break;
case Compressor::DICTIONARY_ENCODING:
return LOG_STATUS(
Expand Down Expand Up @@ -716,5 +719,15 @@ void CompressionFilter::init_decompression_resource_pool(uint64_t size) {
}
}

Datatype CompressionFilter::output_datatype(Datatype) const {
switch (compressor_) {
case Compressor::DOUBLE_DELTA:
case Compressor::DELTA:
return reinterpret_datatype_;
default:
return Datatype::ANY;
}
}

} // namespace sm
} // namespace tiledb
9 changes: 9 additions & 0 deletions tiledb/sm/filter/compression_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,15 @@ class CompressionFilter : public Filter {
* @return Number of bytes required to store the input number
*/
static uint8_t compute_bytesize(uint64_t param_length);

/**
* @brief Returns the filter output type
*
* @param input_type Expected type used for input. Used for filters which
* change output type based on input data. e.g. XORFilter output type is
* based on byte width of input type.
*/
Datatype output_datatype(Datatype input_type = Datatype::ANY) const override;
};

} // namespace sm
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/filter/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void Filter::ensure_compatible_output(const Filter& filter) const {
this->ensure_accepts_datatype(filter.output_datatype());
}

Datatype Filter::output_datatype() const {
Datatype Filter::output_datatype(Datatype) const {
return Datatype::ANY;
}

Expand Down
7 changes: 5 additions & 2 deletions tiledb/sm/filter/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "tiledb/common/common.h"
#include "tiledb/common/status.h"
#include "tiledb/sm/config/config.h"
#include "tiledb/sm/enums/datatype.h"
#include "tiledb/storage_format/serialization/serializers.h"

using namespace tiledb::common;
Expand All @@ -51,7 +52,6 @@ class WriterTile;

enum class FilterOption : uint8_t;
enum class FilterType : uint8_t;
enum class Datatype : uint8_t;

/**
* A Filter processes or modifies a byte region, modifying it in place, or
Expand Down Expand Up @@ -89,8 +89,11 @@ class Filter {
/**
* @brief Returns the filter output type
*
* @param input_type Expected type used for input. Used for filters which
* change output type based on input data. e.g. XORFilter output type is
* based on byte width of input type.
*/
virtual Datatype output_datatype() const;
virtual Datatype output_datatype(Datatype input_type = Datatype::ANY) const;

/**
* @brief Throws if given filter's output *cannot* be handled by this filter.
Expand Down
13 changes: 3 additions & 10 deletions tiledb/sm/filter/filter_pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ Status FilterPipeline::filter_chunks_forward(
std::vector<uint64_t>& chunk_offsets,
FilteredBuffer& output,
ThreadPool* const compute_tp) const {
Datatype tile_type = tile.type();
bool var_sizes = chunk_offsets.size() > 0;
uint64_t last_buffer_size = chunk_size;
uint64_t nchunks = 1;
Expand Down Expand Up @@ -240,7 +239,6 @@ Status FilterPipeline::filter_chunks_forward(
// Apply the filters sequentially.
for (auto it = filters_.begin(), ite = filters_.end(); it != ite; ++it) {
auto& f = *it;
bool last_filter = it == filters_.end() - 1;

// Clear and reset I/O buffers
input_data.reset_offset();
Expand All @@ -261,13 +259,8 @@ Status FilterPipeline::filter_chunks_forward(
&output_metadata,
&output_data));

if (last_filter) {
// Set WriterTile to initial schema type.
tile.set_datatype(tile_type);
} else {
// Update WriterTile to use filter output datatype on next filter.
tile.set_datatype(f->output_datatype());
}
// Final tile type will be the output type of last filter in pipeline.
tile.set_datatype(f->output_datatype(tile.type()));
input_data.set_read_only(false);
throw_if_not_ok(input_data.swap(output_data));
input_metadata.set_read_only(false);
Expand Down Expand Up @@ -505,7 +498,7 @@ Status FilterPipeline::run_reverse(
// There could be N filters with ANY output type ahead of last
// conversion.
for (int64_t j = filter_idx - 1; j >= 0; j--) {
auto type = filters_[j]->output_datatype();
auto type = filters_[j]->output_datatype(tile->type());
if (type != Datatype::ANY && type != tile->type()) {
// Update Tile type if a previous filter modified the datatype.
tile->set_datatype(type);
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/filter/float_scaling_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ Status FloatScalingFilter::get_option_impl(
return Status::Ok();
}

Datatype FloatScalingFilter::output_datatype() const {
Datatype FloatScalingFilter::output_datatype(Datatype) const {
if (byte_width_ == sizeof(int8_t)) {
return Datatype::INT8;
} else if (byte_width_ == sizeof(int16_t)) {
Expand Down
10 changes: 8 additions & 2 deletions tiledb/sm/filter/float_scaling_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,14 @@ class FloatScalingFilter : public Filter {
/** The byte width of the compressed representation. */
uint64_t byte_width_;

/** Return the output datatype of this filter. */
Datatype output_datatype() const override;
/**
* @brief Returns the filter output type
*
* @param input_type Expected type used for input. Used for filters which
* change output type based on input data. e.g. XORFilter output type is
* based on byte width of input type.
*/
Datatype output_datatype(Datatype input_type = Datatype::ANY) const override;

/** Returns a new clone of this filter. */
FloatScalingFilter* clone_impl() const override;
Expand Down
18 changes: 17 additions & 1 deletion tiledb/sm/filter/xor_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ void XORFilter::dump(FILE* out) const {
fprintf(out, "XORFilter");
}

// TODO: define output datatype for XOR filter.
bool XORFilter::accepts_input_datatype(Datatype datatype) const {
switch (datatype_size(datatype)) {
case sizeof(int8_t):
Expand All @@ -61,6 +60,23 @@ bool XORFilter::accepts_input_datatype(Datatype datatype) const {
}
}

Datatype XORFilter::output_datatype(tiledb::sm::Datatype input_type) const {
switch (datatype_size(input_type)) {
case sizeof(int8_t):
return Datatype::INT8;
case sizeof(int16_t):
return Datatype::INT16;
case sizeof(int32_t):
return Datatype::INT32;
case sizeof(int64_t):
return Datatype::INT64;
default:
throw StatusException(Status_FilterError(
"XORFilter::output_datatype: datatype size cannot be converted to "
"integer type."));
}
}

Status XORFilter::run_forward(
const WriterTile& tile,
WriterTile* const,
Expand Down
10 changes: 10 additions & 0 deletions tiledb/sm/filter/xor_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ class XORFilter : public Filter {
*/
bool accepts_input_datatype(Datatype datatype) const override;

/**
* @brief Returns the filter output type
*
* @param input_type Expected type used for input. Used for filters which
* change output type based on input data. e.g. XORFilter output type is
* based on byte width of input type.
*/
virtual Datatype output_datatype(
Datatype input_type = Datatype::ANY) const override;

/**
* Run forward. Takes input data parts, and per part it stores the first
* element in the part, and then the differences of each consecutive pair
Expand Down

0 comments on commit f8268b6

Please sign in to comment.